[Subversion] / Trellis / test_trellis.py  

View of /Trellis/test_trellis.py

Parent Directory | Revision Log
Revision: 2599 - (download) (as text)
Sun Feb 22 04:20:24 2009 UTC (15 years, 1 month ago) by pje
File size: 47268 byte(s)
Implement Grant's suggested sorting fixes
from test_sets import *
from peak import context
from peak.events.activity import EventLoop, TwistedEventLoop, Time, NOT_YET
from peak.events import trellis, stm, collections, activity
from peak.util.decorators import rewrap, decorate as d
from peak.util.extremes import Max
import unittest, heapq, mocker, types, sys

try:
    import testreactor
except ImportError:
    testreactor = None  # either twisted or testreactor are missing
try:
    import wx
except ImportError:
    wx = None

class EventLoopTestCase(unittest.TestCase):
    def setUp(self):
        self.state = context.new()
        self.state.__enter__()
        super(EventLoopTestCase, self).setUp()
        self.configure_context()

    def tearDown(self):
        super(EventLoopTestCase, self).tearDown()
        self.state.__exit__(None, None, None)

    def configure_context(self):
        pass

class TestListener(stm.AbstractListener):
    def __repr__(self): return self.name
class TestSubject(stm.AbstractSubject):
    def __repr__(self): return self.name
class DummyError(Exception): pass
class UndirtyListener(TestListener):
    def dirty(self):
        return False


try:
    set
except NameError:
    from sets import Set as set

if wx:
    class TestWxEventLoop(EventLoopTestCase):
        def configure_context(self):
            from peak.events.activity import EventLoop, WXEventLoop
            EventLoop <<= WXEventLoop
            self.app = wx.PySimpleApp(redirect=False)
            self.frame = wx.Frame(None)

        def testSequentialCalls(self):
            log = []
            EventLoop.call(log.append, 1)
            EventLoop.call(log.append, 2)
            EventLoop.call(log.append, 3)
            EventLoop.call(log.append, 4)
            event = Time[0.00001]
            def c():
                if event:
                    # events aren't
                    EventLoop.call(EventLoop.stop)
            c = trellis.Cell(c)
            c.value

            # This will loop indefinitely, if sub-millisecond events aren't
            # rounded up to the next millisecond.
            EventLoop.run()
            self.frame.Destroy()
            self.assertEqual(log, [1,2,3,4])

            # XXX this should test more timing stuff, but the only way to do it
            #     is with a wx mock, which I haven't time for as yet.






if testreactor:

    class TestReactorEventLoop(EventLoopTestCase, testreactor.ReactorTestCase):

        def configure_context(self):
            from peak.events.activity import Time, EventLoop
            from twisted.internet import reactor
            Time <<= lambda: Time()
            Time.time = reactor.getTime
            EventLoop <<= TwistedEventLoop

        def testSequentialCalls(self):
            log = []
            EventLoop.call(log.append, 1)
            EventLoop.call(log.append, 2)
            EventLoop.call(log.append, 3)
            EventLoop.call(log.append, 4)

            class IdleTimer(trellis.Component):
                trellis.attrs(
                    idle_timeout = 20,
                    busy = False,
                )
                idle_for = trellis.maintain(
                    lambda self: self.idle_for.begins_with(not self.busy),
                    initially=NOT_YET
                )
                trellis.maintain()  # XXX should be perform
                def alarm(self):
                    if self.idle_for[self.idle_timeout] and EventLoop.running:
                        log.append(5)
                        EventLoop.stop()

            it = IdleTimer()
            EventLoop.run()
            self.assertEqual(log, [1,2,3,4,5])





class TestLinks(unittest.TestCase):

    def setUp(self):
        self.l1 = TestListener(); self.l1.name = 'l1'
        self.l2 = TestListener(); self.l1.name = 'l2'
        self.s1 = TestSubject(); self.s1.name = 's1'
        self.s2 = TestSubject(); self.s2.name = 's2'
        self.lk11 = stm.Link(self.s1, self.l1)
        self.lk12 = stm.Link(self.s1, self.l2)
        self.lk21 = stm.Link(self.s2, self.l1)
        self.lk22 = stm.Link(self.s2, self.l2)

    def verify_subjects(self, items):
        for link, nxt, prev in items:
            self.failUnless(link.next_subject is nxt)
            if isinstance(link,stm.Link):
                self.failUnless(link.prev_subject is prev)

    def verify_listeners(self, items):
        for link, nxt, prev in items:
            self.failUnless(link.next_listener is nxt)
            if isinstance(link,stm.Link):
                self.failUnless(link.prev_listener is prev)

    def testBreakIterSubjects(self):
        it = self.l1.iter_subjects()
        self.failUnless(it.next() is self.s2)
        self.lk21.unlink()
        self.failUnless(it.next() is self.s1)

    def testBreakIterListeners(self):
        it = self.s1.iter_listeners()
        self.failUnless(it.next() is self.l2)
        self.lk11.unlink()
        self.failUnless(it.next() is self.l1)






    def testLinkSetup(self):
        self.verify_subjects([
            (self.l1, self.lk21, None),   (self.l2, self.lk22, None),
            (self.lk21, self.lk11, None), (self.lk11, None, self.lk21),
            (self.lk22, self.lk12, None), (self.lk12, None, self.lk22),
        ])
        self.verify_listeners([
            (self.s1, self.lk12, None),      (self.s2, self.lk22, None),
            (self.lk22, self.lk21, self.s2), (self.lk21, None, self.lk22),
            (self.lk12, self.lk11, self.s1), (self.lk11, None, self.lk12),
        ])

    def testUnlinkListenerHeadSubjectTail(self):
        self.lk21.unlink()
        self.verify_subjects([
            (self.l1, self.lk11, None), (self.lk11, None, None)
        ])
        self.verify_listeners([
            (self.s2, self.lk22, None), (self.lk22, None, self.s2)
        ])

    def testUnlinkListenerTailSubjectHead(self):
        self.lk12.unlink()
        self.verify_subjects([
            (self.l2, self.lk22, None), (self.lk22, None, None),
        ])
        self.verify_listeners([
            (self.s1, self.lk11, None), (self.lk11, None, self.s1),
        ])



def a(f):
    def g(self):
        return self.ctrl.atomically(f, self)
    return rewrap(f, g)





class TestController(unittest.TestCase):

    def setUp(self):
        self.ctrl = stm.Controller()
        self.t0 = TestListener(); self.t0.name='t0';
        self.t1 = TestListener(); self.t1.name='t1'; self.t1.layer = 1
        self.t2 = TestListener(); self.t2.name='t2'; self.t2.layer = 2
        self.t3 = UndirtyListener(); self.t3.name='t3'
        self.s1 = TestSubject(); self.s2 = TestSubject()
        self.s1.name = 's1'; self.s2.name = 's2'

    def tearDown(self):
        # Verify correct cleanup in all scenarios
        for k,v in dict(
            undo=[], managers={}, queues={}, layers=[], reads={}, writes={},
            has_run={}, destinations=None, routes=None,
            current_listener=None, readonly=False, in_cleanup=False,
            active=False, at_commit=[], to_retry={}
        ).items():
            val = getattr(self.ctrl, k)
            self.assertEqual(val, v, '%s: %r' % (k,val))

    def testScheduleSimple(self):
        t1 = TestListener(); t1.name='t1'
        t2 = TestListener(); t2.name='t2'
        self.assertEqual(self.ctrl.layers, [])
        self.assertEqual(self.ctrl.queues, {})
        self.ctrl.schedule(t1)
        self.ctrl.schedule(t2)
        self.assertEqual(self.ctrl.layers, [0])
        self.assertEqual(self.ctrl.queues, {0: {t1:1, t2:1}})
        self.ctrl.cancel(t1)
        self.assertEqual(self.ctrl.layers, [0])
        self.assertEqual(self.ctrl.queues, {0: {t2:1}})
        self.ctrl.cancel(t2)
        # tearDown will assert that everything has been cleared

    def testThreadLocalController(self):
        self.failUnless(isinstance(trellis.ctrl, stm.Controller))
        self.failUnless(isinstance(trellis.ctrl, stm.threading.local))

    def testHeapingCancel(self):
        # verify that cancelling the last listener of a layer keeps
        # the 'layers' list in heap order
        self.ctrl.schedule(self.t0)
        self.ctrl.schedule(self.t2)
        self.ctrl.schedule(self.t1)
        layers = self.ctrl.layers
        self.assertEqual(layers, [0, 2, 1])
        self.ctrl.cancel(self.t0)
        self.assertEqual(heapq.heappop(layers), 1)
        self.assertEqual(heapq.heappop(layers), 2)
        self.assertEqual(self.ctrl.queues, {1: {self.t1:1}, 2: {self.t2:1}})
        self.ctrl.queues.clear()

    def testDoubleAndMissingCancelOrSchedule(self):
        self.ctrl.schedule(self.t2)
        self.ctrl.cancel(self.t0)
        self.ctrl.cancel(self.t2)
        self.ctrl.cancel(self.t2)
        self.ctrl.schedule(self.t1)
        self.assertEqual(self.ctrl.queues, {1: {self.t1:1}})
        self.ctrl.schedule(self.t1)
        self.assertEqual(self.ctrl.queues, {1: {self.t1:1}})
        self.ctrl.cancel(self.t1)

    def testScheduleLayerBump(self):
        # listener layer must be at least source layer + 1
        self.ctrl.schedule(self.t1)
        self.ctrl.schedule(self.t1, 0)
        self.assertEqual(self.ctrl.queues, {1: {self.t1:1}})
        self.ctrl.schedule(self.t1, 1)
        self.assertEqual(self.ctrl.queues, {2: {self.t1:1}})
        self.assertEqual(self.t1.layer, 2)
        self.ctrl.cancel(self.t1)

    d(a)
    def testScheduleRollback(self):
        # when running atomically, scheduling is an undo-logged operation
        self.ctrl.schedule(self.t1)
        self.ctrl.rollback_to(0)

    def testCleanup(self):
        self.ctrl.schedule(self.t0)
        def raiser():
            # XXX need to actually run one rule, plus start another w/error
            raise DummyError
        try:
            self.ctrl.atomically(self.runAs, self.t0, raiser)
        except DummyError:
            pass

    def testSubjectsMustBeAtomic(self):
        self.assertRaises(AssertionError, self.ctrl.lock, self.s1)
        self.assertRaises(AssertionError, self.ctrl.used, self.s1)
        self.assertRaises(AssertionError, self.ctrl.changed, self.s1)

    d(a)
    def testLockAcquiresManager(self):
        class Dummy:
            def __enter__(*args): pass
            def __exit__(*args): pass
        mgr = self.s1.manager = Dummy()
        self.ctrl.lock(self.s1)
        self.assertEqual(self.ctrl.managers, {mgr:0})
        self.ctrl.lock(self.s2)
        self.assertEqual(self.ctrl.managers, {mgr:0})

    d(a)
    def testReadWrite(self):
        self.ctrl.used(self.s1)
        self.ctrl.changed(self.s2)
        self.assertEqual(self.ctrl.reads, {})
        self.assertEqual(self.ctrl.writes, {})
        self.ctrl.current_listener = self.t0
        self.ctrl.used(self.s1)
        self.ctrl.changed(self.s2)
        self.assertEqual(self.ctrl.reads, {self.s1:1})
        self.assertEqual(self.ctrl.writes, {self.s2:self.t0})
        self.ctrl.reads.clear()     # these would normally be handled by
        self.ctrl.writes.clear()    # the run() method's try/finally
        self.ctrl.current_listener = None   # reset

    d(a)
    def testNoReadDuringCommit(self):
        self.ctrl.readonly = True
        self.assertRaises(RuntimeError, self.ctrl.changed, self.s1)
        self.ctrl.readonly = False  # normally reset by ctrl.run_rule()

    d(a)
    def testRecalcOnWrite(self):
        stm.Link(self.s1, self.t0)
        stm.Link(self.s2, self.t1)
        stm.Link(self.s2, self.t0)
        self.ctrl.current_listener = self.t1
        self.ctrl.changed(self.s1)
        self.ctrl.changed(self.s2)
        self.assertEqual(self.ctrl.writes, {self.s1:self.t1, self.s2:self.t1})
        sp = self.ctrl.savepoint(); self.ctrl.has_run[self.t1] = self.t1
        self.ctrl._process_writes(self.t1)
        # Only t0 is notified, not t1, since t1 is the listener
        self.assertEqual(self.ctrl.queues, {2: {self.t0:1}})
        self.ctrl.rollback_to(sp)
        self.ctrl.current_listener = None   # reset

    d(a)
    def testDependencyUpdatingAndUndo(self):
        stm.Link(self.s1, self.t0)
        s3 = TestSubject()
        stm.Link(s3, self.t0)
        self.assertEqual(list(self.t0.iter_subjects()), [s3, self.s1])
        self.ctrl.current_listener = self.t0
        self.ctrl.used(self.s1)
        self.ctrl.used(self.s2)
        sp = self.ctrl.savepoint()
        self.ctrl._process_reads(self.t0)
        self.assertEqual(list(self.t0.iter_subjects()), [self.s2, self.s1])
        self.ctrl.rollback_to(sp)
        self.assertEqual(list(self.t0.iter_subjects()), [s3, self.s1])
        self.ctrl.current_listener = None   # reset




    def runAs(self, listener, rule):
        listener.run = rule
        self.ctrl.run_rule(listener)

    d(a)
    def testIsRunningAndHasRan(self):
        def rule():
            self.assertEqual(self.ctrl.current_listener, self.t1)
            self.assertEqual(self.ctrl.has_run, {self.t1: 0})
        sp = self.ctrl.savepoint()
        self.runAs(self.t1, rule)
        self.assertEqual(self.ctrl.current_listener, None)
        self.assertEqual(self.ctrl.has_run, {self.t1: 0})

    d(a)
    def testIsRunningButHasNotRan(self):
        def rule():
            self.assertEqual(self.ctrl.current_listener, self.t1)
            self.assertEqual(self.ctrl.has_run, {})
        sp = self.ctrl.savepoint()
        self.t1.run = rule; self.ctrl.initialize(self.t1)    # uninit'd rule
        self.assertEqual(self.ctrl.current_listener, None)
        self.assertEqual(self.ctrl.has_run, {})

    d(a)
    def testScheduleUndo(self):
        sp = self.ctrl.savepoint()
        self.ctrl.schedule(self.t2)
        self.assertEqual(self.ctrl.queues, {2: {self.t2:1}})
        self.ctrl.rollback_to(sp)
        self.assertEqual(self.ctrl.queues, {})

    def testNestedReadOnly(self):
        log = []
        def aRule():
            log.append(trellis.ctrl.readonly); return 1
        c1 = trellis.Cell(aRule)
        c2 = trellis.Cell(lambda: c1.value * aRule())
        c3 = trellis.Performer(lambda: c2.value)
        self.assertEqual(log, [True, True])        

    d(a)
    def testWriteProcessingInRun(self):
        stm.Link(self.s1, self.t0)
        stm.Link(self.s2, self.t1)
        stm.Link(self.s2, self.t3)
        stm.Link(self.s2, self.t0)
        def rule():
            self.ctrl.changed(self.s1)
            self.ctrl.changed(self.s2)
            self.assertEqual(self.ctrl.writes, {self.s1:self.t1, self.s2:self.t1})
        self.runAs(self.t1, rule)
        # Only t0 is notified, not t1, since t1 is the listener & t3 is !dirty
        self.assertEqual(self.ctrl.writes, {})
        self.assertEqual(self.ctrl.queues, {2: {self.t0:1}})
        self.ctrl.cancel(self.t0)

    d(a)
    def testReadProcessingInRun(self):
        stm.Link(self.s1, self.t0)
        s3 = TestSubject()
        stm.Link(s3, self.t0)
        self.assertEqual(list(self.t0.iter_subjects()), [s3, self.s1])
        def rule():
            self.ctrl.used(self.s1)
            self.ctrl.used(self.s2)
            self.assertEqual(self.ctrl.reads, {self.s1:1, self.s2:1})
        self.runAs(self.t0, rule)
        self.assertEqual(self.ctrl.reads, {})
        self.assertEqual(list(self.t0.iter_subjects()), [self.s2, self.s1])

    d(a)
    def testReadOnlyDuringMax(self):
        def rule():
            self.assertEqual(self.ctrl.readonly, True)
        self.t0.layer = Max
        self.assertEqual(self.ctrl.readonly, False)
        self.runAs(self.t0, rule)
        self.assertEqual(self.ctrl.readonly, False)



    d(a)
    def testRunClearsReadWriteOnError(self):
        def rule():
            self.ctrl.used(self.s1)
            self.ctrl.changed(self.s2)
            self.assertEqual(self.ctrl.reads, {self.s1:1})
            self.assertEqual(self.ctrl.writes, {self.s2:1})
            try:
                self.runAs(self.t0, rule)
            except DummyError:
                pass
            else:
                raise AssertionError("Error should've propagated")
        self.assertEqual(self.ctrl.reads, {})
        self.assertEqual(self.ctrl.writes, {})

    d(a)
    def testSimpleCycle(self):
        stm.Link(self.s1, self.t1)
        stm.Link(self.s2, self.t2)
        def rule0():
            self.ctrl.used(self.s1)
            self.ctrl.changed(self.s1)
        def rule1():
            self.ctrl.used(self.s1)
            self.ctrl.changed(self.s2)
        def rule2():
            self.ctrl.used(self.s2)
            self.ctrl.changed(self.s1)
        self.runAs(self.t0, rule0)
        self.runAs(self.t1, rule1)
        self.runAs(self.t2, rule2)
        try:
            self.ctrl._retry()
        except stm.CircularityError, e:
            self.assertEqual(e.args[0], {self.t0: set([self.t1]),
                self.t1: set([self.t2]), self.t2: set([self.t0, self.t1])})
            self.assertEqual(e.args[1], (self.t1, self.t2, self.t0))
        else:
            raise AssertionError("Should've caught a cycle")

    d(a)
    def testSimpleRetry(self):
        def rule():
            pass
        self.runAs(self.t0, rule)
        self.runAs(self.t1, rule)
        self.runAs(self.t2, rule)
        self.assertEqual(set(self.ctrl.has_run),set([self.t0,self.t1,self.t2]))
        self.ctrl.to_retry[self.t1]=1
        self.ctrl._retry()
        self.assertEqual(set(self.ctrl.has_run), set([self.t0]))
        self.ctrl.to_retry[self.t0]=1
        self.ctrl._retry()


    d(a)
    def testNestedNoRetry(self):
        def rule0():
            self.t1.run=rule1; self.ctrl.initialize(self.t1)
        def rule1():
            pass
        self.runAs(self.t2, rule1)
        self.runAs(self.t0, rule0)
        self.ctrl.schedule(self.t1)
        self.assertEqual(self.ctrl.to_retry, {})
        self.assertEqual(
            set(self.ctrl.has_run), set([self.t0, self.t2])
        )
        self.assertEqual(self.ctrl.queues, {1: {self.t1:1}})


    def testRunScheduled(self):
        log = []
        self.t1.run = lambda: log.append(True)
        def go():
            self.ctrl.schedule(self.t1)
        self.ctrl.atomically(go)
        self.assertEqual(log, [True])



    def testRollbackReschedules(self):
        sp = []
        def rule0():
            self.ctrl.rollback_to(sp[0])
            self.assertEqual(self.ctrl.queues, {0: {self.t0:1}})
            self.ctrl.cancel(self.t0)
        self.t0.run = rule0
        def go():
            self.ctrl.schedule(self.t0)
            sp.append(self.ctrl.savepoint())
        self.ctrl.atomically(go)

    def testManagerCantCreateLoop(self):
        class Mgr:
            def __enter__(self): pass
            def __exit__(*args):
                self.ctrl.schedule(self.t1)
        log = []
        def rule1():
            log.append(True)
        self.t1.run = rule1
        self.t0.run = lambda:self.ctrl.manage(Mgr())
        self.ctrl.atomically(self.ctrl.schedule, self.t0)
        self.assertEqual(log, [])
        self.ctrl.atomically(lambda:None)
        self.assertEqual(log, [True])

    d(a)
    def testNotifyOnChange(self):
        stm.Link(self.s2, self.t2)
        stm.Link(self.s2, self.t3)
        self.ctrl.changed(self.s2)
        self.ctrl.current_listener = self.t0
        self.ctrl.changed(self.s2)
        self.assertEqual(self.ctrl.queues, {2: {self.t2:1}})
        self.ctrl.cancel(self.t2)
        self.ctrl.writes.clear()
        self.ctrl.current_listener = None   # reset



    def testCommitCanLoop(self):
        log=[]
        def go():
            log.append(True)
        self.t0.run = go
        self.ctrl.atomically(self.ctrl.on_commit, self.ctrl.schedule, self.t0)
        self.assertEqual(log,[True])

    d(a)
    def testNoUndoDuringUndo(self):
        def undo():
            self.ctrl.on_undo(redo)
        def redo():
            raise AssertionError("Should not be run")
        self.ctrl.on_undo(undo)
        self.ctrl.rollback_to(0)

    d(a)
    def testReentrantRollbackToMinimumTarget(self):
        sp = self.ctrl.savepoint()
        # these 2 rollbacks will be ignored, since they target a higher sp.
        # note that both are needed for testing, as one is there to potentially
        # set a new target, and the other is there to make the offset wrong if
        # the rollback stops prematurely.
        self.ctrl.on_undo(self.ctrl.rollback_to, sp+100)
        self.ctrl.on_undo(self.ctrl.rollback_to, sp+100)
        sp2 = self.ctrl.savepoint()

        # ensure that there's no way this test can pass unless rollback_to
        # notices re-entrant invocations (because it would overflow the stack)
        for i in range(sys.getrecursionlimit()*2):
            # request a rollback all the way to 0; this target should be used
            # in place of the sp2 target or sp+100 targets, since it will be
            # the lowest target encountered during the rollback.
            self.ctrl.on_undo(self.ctrl.rollback_to, sp)

        self.ctrl.rollback_to(sp2) # ask to rollback to posn 2
        self.assertEqual(self.ctrl.savepoint(), sp)  # but should rollback to 0



    d(a)
    def testNestedRule(self):
        def rule1():
            self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1]))
            self.assertEqual(self.ctrl.current_listener, self.t1)
            self.ctrl.used(self.s1)
            self.ctrl.changed(self.s2)
            self.assertEqual(self.ctrl.reads, {self.s1:1})
            self.assertEqual(self.ctrl.writes, {self.s2:self.t1})
            self.t2.run=rule2; self.ctrl.initialize(self.t2)
            self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1]))
            self.assertEqual(self.ctrl.current_listener, self.t1)
            self.assertEqual(self.ctrl.reads, {self.s1:1})
            self.assertEqual(self.ctrl.writes, {self.s2:self.t1, s3:self.t2})

        def rule2():
            self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1]))
            self.assertEqual(self.ctrl.current_listener, self.t2)
            self.assertEqual(self.ctrl.reads, {})
            self.assertEqual(self.ctrl.writes, {self.s2:self.t1})
            self.ctrl.used(self.s2)
            self.ctrl.changed(s3)

        def rule0():
            pass

        s3 = TestSubject(); s3.name = 's3'
        self.runAs(self.t0, rule0)
        self.runAs(self.t1, rule1)
        self.assertEqual(
            set(self.ctrl.has_run),
            set([self.t1, self.t0])  # t2 was new, so doesn't show
        )
        self.assertEqual(list(self.t1.iter_subjects()), [self.s1])
        self.assertEqual(list(self.t2.iter_subjects()), [self.s2])
        self.ctrl.rollback_to(self.ctrl.has_run[self.t1])  # should undo both t1/t2





    def testUndoLogSpansMultipleRecalcs(self):
        c1 = trellis.Value(False, discrete=True)
        c2 = trellis.Cell(lambda: (c1.value, log.append(trellis.savepoint())))
        log = []; c2.value; log = []; c1.value = True
        self.failUnless(len(log)==2 and log[1]>log[0], log)

    def testUndoPostCommitCancelsUndoOfCommitSchedule(self):
        c1 = trellis.Value(False, discrete=True)
        def c2():
            c1.value
            log.append(trellis.savepoint())
            if len(log)==2:
                raise DummyError
        c2 = trellis.Cell(c2)
        log = []; c2.value; log = [];
        # This will raise a different error if undoing the on-commit stack
        # causes an underflow:
        self.assertRaises(DummyError, setattr, c1, 'value', True)


class TestTime(unittest.TestCase):

    def testIndependentNextEventTime(self):
        # Ensure that next_event_time() never returns a *past* time
        t = Time()
        t.auto_update = False
        t20 = t[20]
        t40 = t[40]
        d(trellis.Cell)
        def check_reached():
            t.reached(t20)
            t.reached(t40)
            nt = t.next_event_time(True)
            self.failIf(nt is not None and nt<=0)
        check_reached.value
        t.advance(25)
        t.advance(15)




class TestCells(mocker.MockerTestCase):

    ctrl = stm.ctrl
    def tearDown(self):
        # make sure the old controller is back
        trellis.install_controller(self.ctrl)

    def testValueBasics(self):
        self.failUnless(issubclass(trellis.Value, trellis.AbstractCell))
        self.failUnless(issubclass(trellis.Value, stm.AbstractSubject))
        v = trellis.Value()
        self.assertEqual(v.value, None)
        self.assertEqual(v._set_by, trellis._sentinel)
        self.assertEqual(v._reset, trellis._sentinel)
        v.value = 21
        self.assertEqual(v._set_by, trellis._sentinel)

    d(a)
    def testValueUndo(self):
        v = trellis.Value(42)
        self.assertEqual(v.value, 42)
        sp = self.ctrl.savepoint()
        v.value = 43
        self.assertEqual(v.value, 43)
        self.ctrl.rollback_to(sp)
        self.assertEqual(v.value, 42)

    d(a)
    def testValueUsed(self):
        v = trellis.Value(42)
        ctrl = self.mocker.replace(self.ctrl) #'peak.events.stm.ctrl')
        ctrl.used(v)
        self.mocker.replay()
        trellis.install_controller(ctrl)
        self.assertEqual(v.value, 42)

    def testDiscrete(self):
        v = trellis.Value(None, True)
        v.value = 42
        self.assertEqual(v.value, None)

    def testValueChanged(self):
        v = trellis.Value(42)
        old_ctrl, ctrl = self.ctrl, self.mocker.replace(self.ctrl)
        ctrl.lock(v)
        ctrl.changed(v)
        self.mocker.replay()
        trellis.install_controller(ctrl)
        v.value = 43
        self.assertEqual(v.value, 43)

    def testValueUnchanged(self):
        v = trellis.Value(42)
        ctrl = self.mocker.replace(self.ctrl)
        ctrl.lock(v)
        mocker.expect(ctrl.changed(v)).count(0)
        self.mocker.replay()
        trellis.install_controller(ctrl)
        v.value = 42
        self.assertEqual(v.value, 42)

    d(a)
    def testValueSetLock(self):
        v = trellis.Value(42)
        v.value = 43
        self.assertEqual(v.value, 43)
        self.assertEqual(v._set_by, None)
        def go():
            v.value = 99
        t = TestListener(); t.name = 't'
        t.run = go
        self.assertRaises(trellis.InputConflict, self.ctrl.run_rule, t)
        self.assertEqual(v.value, 43)
        def go():
            v.value = 43
        t = TestListener(); t.name = 't'
        t.run = go
        self.ctrl.run_rule(t)
        self.assertEqual(v.value, 43)



    def testReadOnlyCellBasics(self):
        log = []
        c = trellis.Cell(lambda:log.append(1))
        self.failUnless(type(c) is trellis.ReadOnlyCell)
        c.value
        self.assertEqual(log,[1])
        c.value
        self.assertEqual(log,[1])

    def testDiscreteValue(self):
        log = []
        v = trellis.Value(False, True)
        c = trellis.Cell(lambda: log.append(v.value))
        self.assertEqual(log,[])
        c.value
        self.assertEqual(log,[False])
        del log[:]
        v.value = True
        self.assertEqual(log, [True, False])
        self.assertEqual(v.value, False)
        del log[:]
        v.value = False
        self.assertEqual(log, [])

    def testCellConstructor(self):
        self.failUnless(type(trellis.Cell(value=42)) is trellis.Value)
        self.failUnless(type(trellis.Cell(lambda:42)) is trellis.ReadOnlyCell)
        self.failUnless(type(trellis.Cell(lambda:42, value=42)) is trellis.Cell)

    def testRuleChain(self):
        v = trellis.Value(0)
        log = []
        c1 = trellis.Cell(lambda:int(v.value/2))
        c2 = trellis.Cell(lambda:log.append(c1.value))
        c2.value
        self.assertEqual(log, [0])
        v.value = 1
        self.assertEqual(log, [0])
        v.value = 2
        self.assertEqual(log, [0, 1])

    def testConstant(self):
        for v in (42, [57], "blah"):
            c = trellis.Constant(v)
            self.assertEqual(c.value, v)
            self.assertEqual(c.get_value(), v)
            self.failIf(hasattr(c,'set_value'))
            self.assertRaises(AttributeError, setattr, c, 'value', v)
            self.assertEqual(repr(c), "Constant(%r)" % (v,))

    def testRuleToConstant(self):
        log = []
        def go():
            log.append(1)
            return 42
        c = trellis.Cell(go)
        self.assertEqual(c.value, 42)
        self.assertEqual(log, [1])
        self.failUnless(isinstance(c, trellis.ConstantRule))
        self.assertEqual(repr(c), "Constant(42)")
        self.assertEqual(c.value, 42)
        self.assertEqual(c.get_value(), 42)
        self.assertEqual(c.rule, None)
        self.assertEqual(log, [1])
        self.failIf(c.dirty())
        c.__class__ = trellis.ReadOnlyCell  # transition must be reversible to undo
        self.failIf(isinstance(c, trellis.ConstantRule))

    def testModifierIsAtomic(self):
        log = []
        d(trellis.modifier)
        def do_it():
            self.failUnless(self.ctrl.active)
            self.assertEqual(self.ctrl.current_listener, None)
            log.append(True)
            return log
        rv = do_it()
        self.failUnless(rv is log)
        self.assertEqual(log, [True])



    d(a)
    def testModifierAlreadyAtomic(self):
        log = []
        d(trellis.modifier)
        def do_it():
            self.failUnless(self.ctrl.active)
            self.assertEqual(self.ctrl.current_listener, None)
            log.append(True)
            return log
        rv = do_it()
        self.failUnless(rv is log)
        self.assertEqual(log, [True])

    d(a)
    def testModifierFromCell(self):
        v1, v2 = trellis.Value(42), trellis.Value(99)
        d(trellis.modifier)
        def do_it():
            v1.value = v1.value * 2
            self.assertEqual(self.ctrl.reads, {v1:1})
        def rule():
            v2.value
            do_it()
            self.assertEqual(self.ctrl.reads, {v2:1})
        trellis.Cell(rule).value
        self.assertEqual(v1.value, 84)

    def testDiscreteToConstant(self):
        log = []
        c1 = trellis.ReadOnlyCell(lambda:True, False, True)
        c2 = trellis.Cell(lambda:log.append(c1.value))
        c2.value
        self.assertEqual(log, [True, False])
        self.failUnless(isinstance(c1, trellis.ConstantRule))







    def testReadWriteCells(self):
        C = trellis.Cell(lambda: (F.value-32) * 5.0/9, -40)
        F = trellis.Cell(lambda: (C.value * 9.0)/5 + 32, -40)
        self.assertEqual(C.value, -40)
        self.assertEqual(F.value, -40)
        C.value = 0
        self.assertEqual(C.value, 0)
        self.assertEqual(F.value, 32)

    def testSelfDependencyDoesNotIncreaseLayer(self):
        c1 = trellis.Value(23)
        c2 = trellis.Cell(lambda: c1.value + c2.value, 0)
        self.assertEqual(c2.value, 23)
        self.assertEqual(c2.layer, 1)
        c1.value = 19
        self.assertEqual(c2.value, 42)
        self.assertEqual(c2.layer, 1)

    def testSettingOccursForEqualObjects(self):
        d1 = {}; d2 = {}
        c1 = trellis.Value()
        c1.value = d1
        self.failUnless(c1.value is d1)
        c1.value = d2
        self.failUnless(c1.value is d2)

    def testRepeat(self):
        def counter():
            if counter.value == 10:
                return counter.value
            trellis.repeat()
            return counter.value + 1
        counter = trellis.ReadOnlyCell(counter, 1)
        self.assertEqual(counter.value, 10)







    d(a)
    def testPartialRollbackList(self):
        c1 = trellis.Cell(value=42)
        l = trellis.List()
        l.append(1)
        self.assertEqual(l.future, [1])
        sp = self.ctrl.savepoint()
        self.ctrl.change_attr(self.ctrl, 'current_listener', c1)
        l.append(2)
        self.assertEqual(l.future, [1, 2])
        self.ctrl.rollback_to(sp)
        self.assertEqual(l.future, [1])

    d(a)
    def testPartialRollbackDict(self):
        c1 = trellis.Cell(lambda:None)
        d = trellis.Dict()
        d[1] = 2
        self.assertEqual(d.added, {1:2})
        sp = self.ctrl.savepoint()
        self.ctrl.change_attr(self.ctrl, 'current_listener', c1)
        d[2] = 3
        self.assertEqual(d.added, {1:2, 2:3})
        self.ctrl.rollback_to(sp)
        self.assertEqual(d.added, {1:2})

    d(a)
    def testPartialRollbackSet(self):
        c1 = trellis.Cell(lambda:None)
        s = trellis.Set()
        s.add(1)
        self.assertEqual(list(s.added), [1])
        sp = self.ctrl.savepoint()
        self.ctrl.change_attr(self.ctrl, 'current_listener', c1)
        s.add(2)
        self.assertEqual(list(s.added), [1, 2])
        self.ctrl.rollback_to(sp)
        self.assertEqual(list(s.added), [1])



    def testSetAfterSchedule(self):
        def A():
            B.value
            C.value
        
        A = trellis.Cell(A, None)
        
        d(trellis.Cell)
        def B():
            A.value = C.value
        
        C = trellis.Value()
        
        A.value
        C.value = 1

    def run_modifier_and_rule(self, func, rule):
        d(self.ctrl.atomically)
        def go():
            self.ctrl.schedule(trellis.Cell(rule))
            func.sp = self.ctrl.savepoint()
            trellis.modifier(func)()



















    def _testNonterminatingXXX(self):

        class InfiniteLoop(trellis.Component):
            trellis.attrs(v1=False, v2=False)
        
            trellis.maintain()
            def a(self):
                #print 'A'
                if self.v1:
                    self.v2
                    return True
        
            trellis.maintain()
            def b(self):
                #print 'B'
                if self.v1:
                    self.a
                    self.v2 = True
        
        comp = InfiniteLoop()
        comp.v1 = True




















    def _testFalseCircularityXXX(self):

        class FalseCircularity(trellis.Component):
            trellis.attrs(v1=False, v2=False)
        
            trellis.maintain()
            def a(self):
                if self.v1:
                    self.v2
                    return True
        
            trellis.maintain()
            def b(self):
                if self.v1:
                    self.v2 = True
                else:
                    self.a
        
        comp = FalseCircularity()
        comp.v1 = True

        # there's no actual circularity, but the correct order can't be found



















    def testSensorRollback(self):
        connector = trellis.Connector(
            connect=lambda sensor:None, disconnect=lambda sensor, key: None
        )

        sensor = trellis.Cell(connector)
        
        class SensorInitUndoTest(trellis.Component):
            trellis.attrs(v1=False)
        
            trellis.maintain()
            def a(self):
                if self.v1:
                    return _Comp()
        
            trellis.maintain()
            def b(self):
                if self.v1:
                    self.a
        
        class _Comp(trellis.Component):
            trellis.maintain()
            def c(self):
                sensor.value
               
        comp = SensorInitUndoTest()
        comp.__cells__['a'].layer =  comp.__cells__['b'].layer + 1
        comp.v1 = True
        self.failUnless(sensor.next_listener() is comp.a.__cells__['c'])
        self.failUnless(sensor.listening is not trellis.NOT_GIVEN)











    def testSetShouldOverrideInitialCalculatedValue(self):
        class C(trellis.Component):
            trellis.maintain(optional=True)
            def calc(self):
                return 0
            trellis.maintain()
            def getx(self):
                self.calc        
            trellis.maintain()
            def set(self):
                # This should not conflict with .calc setting itself to 0
                self.calc = 1   
            def __init__(self):
                self.getx
                self.set        
        c = C()

    def testMakeDuringPerform(self):
        class C1(trellis.Component):
            x = trellis.attr()        
            trellis.maintain()
            def rule(self):
                self.x = 1
        
        class C2(trellis.Component):
            c1 = trellis.make(C1)        
            trellis.compute()
            def calc(self):
                return self.c1.x
        C2().calc

    def __testMaintainReassign(self):
        class C(trellis.Component):
            x = trellis.attr()
            trellis.maintain()
            def rule(self):
                self.x = 10
        d(trellis.atomically)
        def test():
            C(x = 1)

    def testFalsePositiveDepCycle(self):

        c1 = trellis.Cell(value=1)

        d(trellis.Cell)
        def c2():
            return c1.value+1

        d(trellis.Cell)
        def c3():
            return c1.value+c2.value

        self.assertEqual(c3.value, 3)

        d(trellis.Cell)
        def c5():
            c1.value = 27            
    
        d(trellis.atomically)
        def doit():
            c5.value
            for c in c2, c3:
                trellis.ctrl.has_run.setdefault(c, 1)
                trellis.on_undo(trellis.ctrl.has_run.pop, c)
                trellis.ctrl.to_retry.setdefault(c, 1)
            trellis.on_undo(trellis.ctrl._unrun, c2, [c3])
            trellis.ctrl._retry()














class TestDefaultEventLoop(unittest.TestCase):

    def setUp(self):
        self.loop = EventLoop()
        self.ctrl = trellis.ctrl
    
    def testCallAndPoll(self):
        log = []
        self.loop.call(log.append, 1)
        self.loop.call(log.append, 2)
        self.assertEqual(log, [])
        self.loop.poll()
        self.assertEqual(log, [1])
        self.loop.poll()
        self.assertEqual(log, [1, 2])
        self.loop.poll()
        self.assertEqual(log, [1, 2])

    d(a)
    def testLoopIsNonAtomic(self):
        self.assertRaises(AssertionError, self.loop.poll)
        self.assertRaises(AssertionError, self.loop.flush)
        self.assertRaises(AssertionError, self.loop.run)

    def testCallAndFlush(self):
        log = []
        self.loop.call(log.append, 1)
        self.loop.call(log.append, 2)
        self.loop.call(self.loop.call, log.append, 3)
        self.assertEqual(log, [])
        self.loop.flush()
        self.assertEqual(log, [1, 2])
        self.loop.poll()
        self.assertEqual(log, [1, 2, 3])
        self.loop.poll()
        self.assertEqual(log, [1, 2, 3])





    def testUndoOfCall(self):
        log = []
        def do():
            self.loop.call(log.append, 1)
            sp = self.ctrl.savepoint()
            self.loop.call(log.append, 2)
            self.ctrl.rollback_to(sp)
            self.loop.call(log.append, 3)
        self.ctrl.atomically(do)
        self.assertEqual(log, [])
        self.loop.flush()
        self.assertEqual(log, [1, 3])
        
    def testScheduleUndo(self):
        t = Time()
        t.auto_update = False
        t20 = t[20]
        log = []
        d(trellis.Cell)
        def checktime():
            t.reached(t20)
            log.append(t._events[t20._when])
            d(trellis.Performer)
            def err_after_reached():
                if len(t._schedule)>1:
                    raise DummyError
        self.assertRaises(DummyError, checktime.get_value)
        self.assertEqual(t._schedule, [t20._when, Max])
        self.assertEqual(dict(t._events), {t20._when:log[0]})
        del checktime
        self.failUnless(isinstance(log.pop(), trellis.Sensor))
        self.assertEqual(dict(t._events), {})
        self.assertEqual(log, [])

    def force_rollback(self):
        d(trellis.Performer)
        def do_it():
            raise DummyError



    def testUpdateUndo(self):
        t = Time()
        t.auto_update = False
        t20 = t[20]
        d(trellis.Cell)
        def checktime():
            if t.reached(t20):
                self.force_rollback()
        checktime.value
        self.assertEqual(t._schedule, [t20._when, Max])
        self.assertEqual(list(t._events), [t20._when])
        self.assertRaises(DummyError, t.advance, 20)
        self.assertEqual(t._schedule, [t20._when, Max])
        self.assertEqual(list(t._events), [t20._when])



























class TestTasks(unittest.TestCase):

    ctrl = trellis.ctrl

    def testRunAtomicallyInLoop(self):
        log = []
        def f():
            self.failUnless(self.ctrl.active)
            log.append(1)
            yield activity.Pause
            self.failUnless(self.ctrl.active)
            log.append(2)
        t = activity.TaskCell(f)
        self.assertEqual(log, [])
        t._loop.flush()
        self.assertEqual(log, [1])
        t._loop.flush()
        self.assertEqual(log, [1, 2])

    def testDependencyAndCallback(self):
        log = []
        v = trellis.Value(42)
        v1 = trellis.Value(1)
        c1 = trellis.Cell(lambda: v1.value*2)
        def f():
            while v.value:
                log.append(v.value)
                v1.value = v.value
                yield activity.Pause
        t = activity.TaskCell(f)
        check = []
        for j in 42, 57, 99, 106, 23, None:
            self.assertEqual(log, check)
            v.value = j
            if j: check.append(j)
            for i in range(5):
                t._loop.flush()
                if j: self.assertEqual(c1.value, j*2)
                self.assertEqual(log, check)
        

    def testPauseAndCall(self):
        log = []
        class TaskExample(trellis.Component):
            trellis.attrs(
                start = False,
                stop = False
            )
        
            def wait_for_start(self):
                log.append("waiting to start")
                while not self.start:
                    yield activity.Pause
        
            def wait_for_stop(self):
                while not self.stop:
                    log.append("waiting to stop")
                    yield activity.Pause
        
            activity.task()
            def demo(self):
                yield self.wait_for_start()
                log.append("starting")
                yield self.wait_for_stop()
                log.append("stopped")
        
        self.assertEqual(log, [])
        t = TaskExample()
        EventLoop.flush()
        self.assertEqual(log, ['waiting to start'])
        log.pop()
        t.start = True
        EventLoop.flush()
        self.assertEqual(log, ['starting', 'waiting to stop'])
        log.pop()
        log.pop()
        t.stop = True
        EventLoop.flush()
        self.assertEqual(log, ['stopped'])



    def testValueReturns(self):
        log = []
        def f1():
            yield 42
        def f2():
            yield f1(); yield activity.resume()
        def f3():
            yield f2(); v = activity.resume()
            log.append(v)

        t = activity.TaskCell(f3)
        EventLoop.flush()
        self.assertEqual(log, [42])

        log = []
        def f1():
            yield activity.Return(42)

        t = activity.TaskCell(f3)
        EventLoop.flush()
        self.assertEqual(log, [42])


    def testErrorPropagation(self):
        log = []
        def f1():
            raise DummyError
        def f2():
            try:
                yield f1(); activity.resume()
            except DummyError:
                log.append(True)
            else:
                pass
            
        t = activity.TaskCell(f2)
        self.assertEqual(log, [])
        EventLoop.flush()
        self.assertEqual(log, [True])

        
    def testSendAndThrow(self):
        log = []
        class SendThrowIter(object):
            count = 0
            def next(self):
                if self.count==0:
                    self.count = 1
                    def f(): yield 99
                    return f()
                raise StopIteration

            def send(self, value):
                log.append(value)
                def f(): raise DummyError; yield None
                return f()                    

            def throw(self, typ, val, tb):
                log.append(typ)
                log.append(val.__class__)   # type(val) is instance in Py<2.5
                log.append(type(tb))
                raise StopIteration                

        def fs(): yield SendThrowIter()
        t = activity.TaskCell(fs)
        self.assertEqual(log, [])
        EventLoop.flush()
        self.assertEqual(log, [99, DummyError,DummyError, types.TracebackType])
        













    def testResumeOnlyOnceUntilFlushed(self):        
        log = []
        c1 = trellis.Value(1)
        c2 = trellis.Value(2)
        def f():
            for i in range(3):
                c1.value, c2.value
                log.append(i)
                yield activity.Pause
            
        t = activity.TaskCell(f)
        self.assertEqual(log, [])
        EventLoop.flush()
        self.assertEqual(log, [0])
        c1.value = 3
        self.assertEqual(log, [0])
        c2.value = 4
        EventLoop.flush()
        self.assertEqual(log, [0, 1])
        
        




















    def testNoTodoRollbackIntoTask(self):
        class CV(trellis.Component):
            v = trellis.attr(False)
            s = trellis.make(trellis.Set)
        
            trellis.maintain()
            def maintain(self):
                if self.v:
                    self.s.add(1)
                else:
                    self.s.discard(1)
        
            trellis.perform()
            def perform(s):
                self.assertEqual(s.v, True)
                
        d(activity.TaskCell)
        def task():
            cv = CV()
            cv.v = True
            yield activity.Pause
        
        EventLoop.run()


















class SortedSetTestCase(unittest.TestCase):

    def testUnicodeSort(self):
        data = trellis.Set([1, 2])
        sorted_set = collections.SortedSet(
            data=data, sort_key=unicode, reverse=True
        )

        self.failUnlessEqual(list(sorted_set), [2, 1])
        data.add(0)
        self.failUnlessEqual(list(sorted_set), [2, 1, 0])

    def testStrSort(self):
        data = trellis.Set([1, 3, 4])
        sorted_set = collections.SortedSet(data=data, sort_key=str)

        self.failUnlessEqual(list(sorted_set), [1, 3, 4])
        data.add(2)
        self.failUnlessEqual(list(sorted_set), [1, 2, 3, 4])

    def testRemoveLast(self):
        data = trellis.Set([1, 2])
        sorted_set = collections.SortedSet(data=data)
        data.remove(2)
        self.failUnlessEqual(list(sorted_set), [1])

    def testAddToEnd(self):
        data = trellis.Set([1])
        sorted_set = collections.SortedSet(data=data)
        data.add(2)
        self.failUnlessEqual(list(sorted_set), [1, 2])










def additional_tests():
    import doctest, sys
    files = [
        'README.txt', 'STM-Observer.txt', 'Activity.txt', 'Collections.txt',
        'Internals.txt',
    ][(sys.version<'2.4')*3:]   # All but Internals+Collections use decorator syntax
    try:
        from sqlalchemy.orm.attributes import ClassManager
    except ImportError:
        pass
    else:
        files.insert(0, 'SQLAlchemy.txt')
    return doctest.DocFileSuite(
        optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE, *files
    )



























cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help