[Subversion] / Trellis / test_trellis.py  

Diff of /Trellis/test_trellis.py

Parent Directory | Revision Log

version 2436, Fri Nov 30 22:46:43 2007 UTC version 2595, Mon Nov 3 23:22:37 2008 UTC
Line 1 
Line 1 
 from test_sets import *  from test_sets import *
 from peak import context  from peak import context
 from peak.events.activity import EventLoop, TwistedEventLoop, Time, NOT_YET  from peak.events.activity import EventLoop, TwistedEventLoop, Time, NOT_YET
 from peak.events import trellis, stm  from peak.events import trellis, stm, collections, activity
 from peak.util.decorators import rewrap, decorate as d  from peak.util.decorators import rewrap, decorate as d
 from peak.util.extremes import Max  from peak.util.extremes import Max
 import unittest, heapq, mocker, types  import unittest, heapq, mocker, types, sys
   
 try:  try:
     import testreactor      import testreactor
Line 39 
Line 39 
         return False          return False
   
   
   try:
       set
   except NameError:
       from sets import Set as set
   
 if wx:  if wx:
     class TestWxEventLoop(EventLoopTestCase):      class TestWxEventLoop(EventLoopTestCase):
         def configure_context(self):          def configure_context(self):
             from peak.events.activity import EventLoop, WXEventLoop              from peak.events.activity import EventLoop, WXEventLoop
             EventLoop <<= WXEventLoop              EventLoop <<= WXEventLoop
             self.app = wx.PySimpleApp(redirect=False)              self.app = wx.PySimpleApp(redirect=False)
             self.app.ExitOnFrameDelete = False              self.frame = wx.Frame(None)
   
         def testSequentialCalls(self):          def testSequentialCalls(self):
             log = []              log = []
Line 53 
Line 58 
             EventLoop.call(log.append, 2)              EventLoop.call(log.append, 2)
             EventLoop.call(log.append, 3)              EventLoop.call(log.append, 3)
             EventLoop.call(log.append, 4)              EventLoop.call(log.append, 4)
               event = Time[0.00001]
               def c():
                   if event:
                       # events aren't
             EventLoop.call(EventLoop.stop)              EventLoop.call(EventLoop.stop)
               c = trellis.Cell(c)
               c.value
   
               # This will loop indefinitely, if sub-millisecond events aren't
               # rounded up to the next millisecond.
             EventLoop.run()              EventLoop.run()
               self.frame.Destroy()
             self.assertEqual(log, [1,2,3,4])              self.assertEqual(log, [1,2,3,4])
   
             # XXX this should test timing stuff, but the only way to do that              # XXX this should test more timing stuff, but the only way to do it
             #     is with a wx mock, which I haven't time for as yet.              #     is with a wx mock, which I haven't time for as yet.
   
   
Line 65 
Line 80 
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
 if testreactor:  if testreactor:
   
     class TestReactorEventLoop(EventLoopTestCase, testreactor.ReactorTestCase):      class TestReactorEventLoop(EventLoopTestCase, testreactor.ReactorTestCase):
Line 99 
Line 99 
             EventLoop.call(log.append, 4)              EventLoop.call(log.append, 4)
   
             class IdleTimer(trellis.Component):              class IdleTimer(trellis.Component):
                 trellis.values(                  trellis.attrs(
                     idle_for = NOT_YET,  
                     idle_timeout = 20,                      idle_timeout = 20,
                     busy = False,                      busy = False,
                 )                  )
                 trellis.rules(                  idle_for = trellis.maintain(
                     idle_for = lambda self:                      lambda self: self.idle_for.begins_with(not self.busy),
                         self.idle_for.begins_with(not self.busy)                      initially=NOT_YET
                 )                  )
                   trellis.maintain()  # XXX should be perform
                 def alarm(self):                  def alarm(self):
                     if self.idle_for[self.idle_timeout] and EventLoop.running:                      if self.idle_for[self.idle_timeout] and EventLoop.running:
                         log.append(5)                          log.append(5)
                         EventLoop.stop()                          EventLoop.stop()
                 alarm = trellis.rule(alarm)  
   
             it = IdleTimer()              it = IdleTimer()
             EventLoop.run()              EventLoop.run()
Line 121 
Line 120 
   
   
   
   
 class TestLinks(unittest.TestCase):  class TestLinks(unittest.TestCase):
   
     def setUp(self):      def setUp(self):
Line 218 
Line 218 
         # Verify correct cleanup in all scenarios          # Verify correct cleanup in all scenarios
         for k,v in dict(          for k,v in dict(
             undo=[], managers={}, queues={}, layers=[], reads={}, writes={},              undo=[], managers={}, queues={}, layers=[], reads={}, writes={},
             has_run={}, last_listener=None, last_notified=None, last_save=None,              has_run={}, destinations=None, routes=None,
             current_listener=None, readonly=False, in_cleanup=False,              current_listener=None, readonly=False, in_cleanup=False,
             active=False, at_commit=[], to_retry={}              active=False, at_commit=[], to_retry={}
         ).items():          ).items():
Line 324 
Line 324 
         self.assertEqual(self.ctrl.writes, {self.s2:self.t0})          self.assertEqual(self.ctrl.writes, {self.s2:self.t0})
         self.ctrl.reads.clear()     # these would normally be handled by          self.ctrl.reads.clear()     # these would normally be handled by
         self.ctrl.writes.clear()    # the run() method's try/finally          self.ctrl.writes.clear()    # the run() method's try/finally
           self.ctrl.current_listener = None   # reset
   
     d(a)      d(a)
     def testNoReadDuringCommit(self):      def testNoReadDuringCommit(self):
Line 344 
Line 344 
         sp = self.ctrl.savepoint(); self.ctrl.has_run[self.t1] = self.t1          sp = self.ctrl.savepoint(); self.ctrl.has_run[self.t1] = self.t1
         self.ctrl._process_writes(self.t1)          self.ctrl._process_writes(self.t1)
         # Only t0 is notified, not t1, since t1 is the listener          # Only t0 is notified, not t1, since t1 is the listener
         self.assertEqual(self.ctrl.last_notified, {self.t0: 1})  
         self.assertEqual(self.ctrl.queues, {2: {self.t0:1}})          self.assertEqual(self.ctrl.queues, {2: {self.t0:1}})
         self.ctrl.rollback_to(sp)          self.ctrl.rollback_to(sp)
         self.assertEqual(self.ctrl.last_notified, None)          self.ctrl.current_listener = None   # reset
   
     d(a)      d(a)
     def testDependencyUpdatingAndUndo(self):      def testDependencyUpdatingAndUndo(self):
Line 363 
Line 362 
         self.assertEqual(list(self.t0.iter_subjects()), [self.s2, self.s1])          self.assertEqual(list(self.t0.iter_subjects()), [self.s2, self.s1])
         self.ctrl.rollback_to(sp)          self.ctrl.rollback_to(sp)
         self.assertEqual(list(self.t0.iter_subjects()), [s3, self.s1])          self.assertEqual(list(self.t0.iter_subjects()), [s3, self.s1])
           self.ctrl.current_listener = None   # reset
   
   
   
   
     def runAs(self, listener, rule, initialized=True):      def runAs(self, listener, rule):
         listener.run = rule          listener.run = rule
         self.ctrl.run_rule(listener, initialized)          self.ctrl.run_rule(listener)
   
     d(a)      d(a)
     def testIsRunningAndHasRan(self):      def testIsRunningAndHasRan(self):
         def rule():          def rule():
             self.assertEqual(self.ctrl.current_listener, self.t1)              self.assertEqual(self.ctrl.current_listener, self.t1)
             self.assertEqual(self.ctrl.last_listener, self.t1)              self.assertEqual(self.ctrl.has_run, {self.t1: 0})
             self.assertEqual(self.ctrl.has_run, {self.t1: self.t1})  
         sp = self.ctrl.savepoint()          sp = self.ctrl.savepoint()
         self.runAs(self.t1, rule)          self.runAs(self.t1, rule)
         self.assertEqual(self.ctrl.last_save, sp)  
         self.assertEqual(self.ctrl.current_listener, None)          self.assertEqual(self.ctrl.current_listener, None)
         self.assertEqual(self.ctrl.last_listener, self.t1)          self.assertEqual(self.ctrl.has_run, {self.t1: 0})
         self.assertEqual(self.ctrl.has_run, {self.t1: self.t1})  
         self.ctrl.rollback_to(sp)   # should clear last_listener, last_save  
   
     d(a)      d(a)
     def testIsRunningButHasNotRan(self):      def testIsRunningButHasNotRan(self):
         def rule():          def rule():
             self.assertEqual(self.ctrl.current_listener, self.t1)              self.assertEqual(self.ctrl.current_listener, self.t1)
             self.assertEqual(self.ctrl.last_listener, None)  
             self.assertEqual(self.ctrl.has_run, {})              self.assertEqual(self.ctrl.has_run, {})
         sp = self.ctrl.savepoint()          sp = self.ctrl.savepoint()
         self.runAs(self.t1, rule, False)    # uninit'd rule          self.t1.run = rule; self.ctrl.initialize(self.t1)    # uninit'd rule
         self.assertEqual(self.ctrl.last_save, None)  
         self.assertEqual(self.ctrl.current_listener, None)          self.assertEqual(self.ctrl.current_listener, None)
         self.assertEqual(self.ctrl.last_listener, None)  
         self.assertEqual(self.ctrl.has_run, {})          self.assertEqual(self.ctrl.has_run, {})
         self.ctrl.rollback_to(sp)   # should clear last_listener, last_save  
   
     d(a)  
     def testClearLastListener(self):  
         self.runAs(self.t1, lambda:1)  
         self.assertEqual(self.ctrl.last_listener, self.t1)  
         # last_listener should be cleared by cleanup()  
   
     d(a)      d(a)
     def testScheduleUndo(self):      def testScheduleUndo(self):
Line 413 
Line 399 
         self.ctrl.rollback_to(sp)          self.ctrl.rollback_to(sp)
         self.assertEqual(self.ctrl.queues, {})          self.assertEqual(self.ctrl.queues, {})
   
       def testNestedReadOnly(self):
           log = []
           def aRule():
               log.append(trellis.ctrl.readonly); return 1
           c1 = trellis.Cell(aRule)
           c2 = trellis.Cell(lambda: c1.value * aRule())
           c3 = trellis.Performer(lambda: c2.value)
           self.assertEqual(log, [True, True])
   
     d(a)      d(a)
     def testWriteProcessingInRun(self):      def testWriteProcessingInRun(self):
Line 435 
Line 421 
         self.runAs(self.t1, rule)          self.runAs(self.t1, rule)
         # Only t0 is notified, not t1, since t1 is the listener & t3 is !dirty          # Only t0 is notified, not t1, since t1 is the listener & t3 is !dirty
         self.assertEqual(self.ctrl.writes, {})          self.assertEqual(self.ctrl.writes, {})
         self.assertEqual(self.ctrl.last_notified, {self.t0: 1})  
         self.assertEqual(self.ctrl.queues, {2: {self.t0:1}})          self.assertEqual(self.ctrl.queues, {2: {self.t0:1}})
         self.ctrl.cancel(self.t0)          self.ctrl.cancel(self.t0)
   
Line 463 
Line 448 
         self.assertEqual(self.ctrl.readonly, False)          self.assertEqual(self.ctrl.readonly, False)
   
   
   
     d(a)      d(a)
     def testRunClearsReadWriteOnError(self):      def testRunClearsReadWriteOnError(self):
         def rule():          def rule():
Line 498 
Line 484 
         try:          try:
             self.ctrl._retry()              self.ctrl._retry()
         except stm.CircularityError, e:          except stm.CircularityError, e:
             self.assertEqual(e.args[0],              self.assertEqual(e.args[0], {self.t0: set([self.t1]),
                 {self.t0: set([self.t1]), self.t1: set([self.t2]),                  self.t1: set([self.t2]), self.t2: set([self.t0, self.t1])})
                  self.t2: set([self.t0, self.t1])})              self.assertEqual(e.args[1], (self.t1, self.t2, self.t0))
         else:          else:
             raise AssertionError("Should've caught a cycle")              raise AssertionError("Should've caught a cycle")
   
Line 508 
Line 494 
     def testSimpleRetry(self):      def testSimpleRetry(self):
         def rule():          def rule():
             pass              pass
         sp = self.ctrl.savepoint()  
         self.runAs(self.t0, rule)          self.runAs(self.t0, rule)
         self.runAs(self.t1, rule)          self.runAs(self.t1, rule)
         self.runAs(self.t2, rule)          self.runAs(self.t2, rule)
           self.assertEqual(set(self.ctrl.has_run),set([self.t0,self.t1,self.t2]))
         self.ctrl.to_retry[self.t1]=1          self.ctrl.to_retry[self.t1]=1
         self.ctrl._retry(); self.ctrl.to_retry.clear()          self.ctrl._retry()
         self.assertEqual(self.ctrl.last_listener, self.t0)          self.assertEqual(set(self.ctrl.has_run), set([self.t0]))
         self.assertEqual(self.ctrl.last_save, sp)  
         self.ctrl.to_retry[self.t0]=1          self.ctrl.to_retry[self.t0]=1
         self.ctrl._retry(); self.ctrl.to_retry.clear()          self.ctrl._retry()
         self.assertEqual(self.ctrl.last_save, None)  
   
     d(a)      d(a)
     def testNestedNoRetry(self):      def testNestedNoRetry(self):
         def rule0():          def rule0():
             self.runAs(self.t1, rule1, False)              self.t1.run=rule1; self.ctrl.initialize(self.t1)
         def rule1():          def rule1():
             pass              pass
         self.runAs(self.t2, rule1)          self.runAs(self.t2, rule1)
         self.runAs(self.t0, rule0)          self.runAs(self.t0, rule0)
         self.ctrl.schedule(self.t1)          self.ctrl.schedule(self.t1)
         self.assertEqual(self.ctrl.to_retry, {})          self.assertEqual(self.ctrl.to_retry, {})
         self.assertEqual(self.ctrl.last_listener, self.t0)          self.assertEqual(
         self.assertEqual(self.ctrl.to_retry, {})              set(self.ctrl.has_run), set([self.t0, self.t2])
           )
         self.assertEqual(self.ctrl.queues, {1: {self.t1:1}})          self.assertEqual(self.ctrl.queues, {1: {self.t1:1}})
   
   
   
   
   
   
   
   
   
   
   
   
     def testRunScheduled(self):      def testRunScheduled(self):
         log = []          log = []
         self.t1.run = lambda: log.append(True)          self.t1.run = lambda: log.append(True)
Line 554 
Line 530 
         self.assertEqual(log, [True])          self.assertEqual(log, [True])
   
   
   
     def testRollbackReschedules(self):      def testRollbackReschedules(self):
         sp = []          sp = []
         def rule0():          def rule0():
Line 566 
Line 543 
             sp.append(self.ctrl.savepoint())              sp.append(self.ctrl.savepoint())
         self.ctrl.atomically(go)          self.ctrl.atomically(go)
   
     def testManagerCanCreateLoop(self):      def testManagerCantCreateLoop(self):
         class Mgr:          class Mgr:
             def __enter__(self): pass              def __enter__(self): pass
             def __exit__(*args):              def __exit__(*args):
Line 577 
Line 554 
         self.t1.run = rule1          self.t1.run = rule1
         self.t0.run = lambda:self.ctrl.manage(Mgr())          self.t0.run = lambda:self.ctrl.manage(Mgr())
         self.ctrl.atomically(self.ctrl.schedule, self.t0)          self.ctrl.atomically(self.ctrl.schedule, self.t0)
           self.assertEqual(log, [])
           self.ctrl.atomically(lambda:None)
         self.assertEqual(log, [True])          self.assertEqual(log, [True])
   
   
   
   
   
   
   
   
     d(a)      d(a)
     def testNotifyOnChange(self):      def testNotifyOnChange(self):
         stm.Link(self.s2, self.t2)          stm.Link(self.s2, self.t2)
Line 596 
Line 568 
         self.assertEqual(self.ctrl.queues, {2: {self.t2:1}})          self.assertEqual(self.ctrl.queues, {2: {self.t2:1}})
         self.ctrl.cancel(self.t2)          self.ctrl.cancel(self.t2)
         self.ctrl.writes.clear()          self.ctrl.writes.clear()
           self.ctrl.current_listener = None   # reset
   
   
   
     def testCommitCanLoop(self):      def testCommitCanLoop(self):
         log=[]          log=[]
Line 614 
Line 589 
         self.ctrl.on_undo(undo)          self.ctrl.on_undo(undo)
         self.ctrl.rollback_to(0)          self.ctrl.rollback_to(0)
   
       d(a)
       def testReentrantRollbackToMinimumTarget(self):
           sp = self.ctrl.savepoint()
           # these 2 rollbacks will be ignored, since they target a higher sp.
           # note that both are needed for testing, as one is there to potentially
           # set a new target, and the other is there to make the offset wrong if
           # the rollback stops prematurely.
           self.ctrl.on_undo(self.ctrl.rollback_to, sp+100)
           self.ctrl.on_undo(self.ctrl.rollback_to, sp+100)
           sp2 = self.ctrl.savepoint()
   
           # ensure that there's no way this test can pass unless rollback_to
           # notices re-entrant invocations (because it would overflow the stack)
           for i in range(sys.getrecursionlimit()*2):
               # request a rollback all the way to 0; this target should be used
               # in place of the sp2 target or sp+100 targets, since it will be
               # the lowest target encountered during the rollback.
               self.ctrl.on_undo(self.ctrl.rollback_to, sp)
   
           self.ctrl.rollback_to(sp2) # ask to rollback to posn 2
           self.assertEqual(self.ctrl.savepoint(), sp)  # but should rollback to 0
   
   
   
   
   
   
   
   
   
   
     d(a)      d(a)
     def testNestedRule(self):      def testNestedRule(self):
         def rule1():          def rule1():
             self.assertEqual(self.ctrl.last_listener, self.t1)              self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1]))
             self.assertEqual(self.ctrl.current_listener, self.t1)              self.assertEqual(self.ctrl.current_listener, self.t1)
             self.ctrl.used(self.s1)              self.ctrl.used(self.s1)
             self.ctrl.changed(self.s2)              self.ctrl.changed(self.s2)
             self.assertEqual(self.ctrl.reads, {self.s1:1})              self.assertEqual(self.ctrl.reads, {self.s1:1})
             self.assertEqual(self.ctrl.writes, {self.s2:self.t1})              self.assertEqual(self.ctrl.writes, {self.s2:self.t1})
             self.runAs(self.t2, rule2, False)              self.t2.run=rule2; self.ctrl.initialize(self.t2)
             self.assertEqual(self.ctrl.last_listener, self.t1)              self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1]))
             self.assertEqual(self.ctrl.current_listener, self.t1)              self.assertEqual(self.ctrl.current_listener, self.t1)
             self.assertEqual(self.ctrl.reads, {self.s1:1})              self.assertEqual(self.ctrl.reads, {self.s1:1})
             self.assertEqual(self.ctrl.writes, {self.s2:self.t1, s3:self.t2})              self.assertEqual(self.ctrl.writes, {self.s2:self.t1, s3:self.t2})
   
         def rule2():          def rule2():
             self.assertEqual(self.ctrl.last_listener, self.t1)              self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1]))
             self.assertEqual(self.ctrl.current_listener, self.t2)              self.assertEqual(self.ctrl.current_listener, self.t2)
             self.assertEqual(self.ctrl.reads, {})              self.assertEqual(self.ctrl.reads, {})
             self.assertEqual(self.ctrl.writes, {self.s2:self.t1})              self.assertEqual(self.ctrl.writes, {self.s2:self.t1})
Line 656 
Line 642 
         s3 = TestSubject(); s3.name = 's3'          s3 = TestSubject(); s3.name = 's3'
         self.runAs(self.t0, rule0)          self.runAs(self.t0, rule0)
         self.runAs(self.t1, rule1)          self.runAs(self.t1, rule1)
         self.assertEqual(self.ctrl.has_run,          self.assertEqual(
             {self.t1:self.t1, self.t0: self.t0}  # t2 was new, so doesn't show              set(self.ctrl.has_run),
               set([self.t1, self.t0])  # t2 was new, so doesn't show
         )          )
         self.assertEqual(list(self.t1.iter_subjects()), [self.s1])          self.assertEqual(list(self.t1.iter_subjects()), [self.s1])
         self.assertEqual(list(self.t2.iter_subjects()), [self.s2])          self.assertEqual(list(self.t2.iter_subjects()), [self.s2])
         self.ctrl.rollback_to(self.ctrl.last_save)  # should undo both t1/t2          self.ctrl.rollback_to(self.ctrl.has_run[self.t1])  # should undo both t1/t2
         self.assertEqual(self.ctrl.last_listener, self.t0)  
   
   
   
   
       def testUndoLogSpansMultipleRecalcs(self):
           c1 = trellis.Value(False, discrete=True)
           c2 = trellis.Cell(lambda: (c1.value, log.append(trellis.savepoint())))
           log = []; c2.value; log = []; c1.value = True
           self.failUnless(len(log)==2 and log[1]>log[0], log)
   
       def testUndoPostCommitCancelsUndoOfCommitSchedule(self):
           c1 = trellis.Value(False, discrete=True)
           def c2():
               c1.value
               log.append(trellis.savepoint())
               if len(log)==2:
                   raise DummyError
           c2 = trellis.Cell(c2)
           log = []; c2.value; log = [];
           # This will raise a different error if undoing the on-commit stack
           # causes an underflow:
           self.assertRaises(DummyError, setattr, c1, 'value', True)
   
   
   class TestTime(unittest.TestCase):
   
       def testIndependentNextEventTime(self):
           # Ensure that next_event_time() never returns a *past* time
           t = Time()
           t.auto_update = False
           t20 = t[20]
           t40 = t[40]
           d(trellis.Cell)
           def check_reached():
               t.reached(t20)
               t.reached(t40)
               nt = t.next_event_time(True)
               self.failIf(nt is not None and nt<=0)
           check_reached.value
           t.advance(25)
           t.advance(15)
   
   
   
   
Line 914 
Line 941 
   
   
   
       d(a)
       def testPartialRollbackList(self):
           c1 = trellis.Cell(value=42)
           l = trellis.List()
           l.append(1)
           self.assertEqual(l.future, [1])
           sp = self.ctrl.savepoint()
           self.ctrl.change_attr(self.ctrl, 'current_listener', c1)
           l.append(2)
           self.assertEqual(l.future, [1, 2])
           self.ctrl.rollback_to(sp)
           self.assertEqual(l.future, [1])
   
       d(a)
       def testPartialRollbackDict(self):
           c1 = trellis.Cell(lambda:None)
           d = trellis.Dict()
           d[1] = 2
           self.assertEqual(d.added, {1:2})
           sp = self.ctrl.savepoint()
           self.ctrl.change_attr(self.ctrl, 'current_listener', c1)
           d[2] = 3
           self.assertEqual(d.added, {1:2, 2:3})
           self.ctrl.rollback_to(sp)
           self.assertEqual(d.added, {1:2})
   
       d(a)
       def testPartialRollbackSet(self):
           c1 = trellis.Cell(lambda:None)
           s = trellis.Set()
           s.add(1)
           self.assertEqual(list(s.added), [1])
           sp = self.ctrl.savepoint()
           self.ctrl.change_attr(self.ctrl, 'current_listener', c1)
           s.add(2)
           self.assertEqual(list(s.added), [1, 2])
           self.ctrl.rollback_to(sp)
           self.assertEqual(list(s.added), [1])
   
   
   
       def run_modifier_and_rule(self, func, rule):
           d(self.ctrl.atomically)
           def go():
               self.ctrl.schedule(trellis.Cell(rule))
               func.sp = self.ctrl.savepoint()
               trellis.modifier(func)()
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
       def testSetShouldOverrideInitialCalculatedValue(self):
           class C(trellis.Component):
               trellis.maintain(optional=True)
               def calc(self):
                   return 0
               trellis.maintain()
               def getx(self):
                   self.calc
               trellis.maintain()
               def set(self):
                   # This should not conflict with .calc setting itself to 0
                   self.calc = 1
               def __init__(self):
                   self.getx
                   self.set
           c = C()
   
       def testMakeDuringPerform(self):
           class C1(trellis.Component):
               x = trellis.attr()
               trellis.maintain()
               def rule(self):
                   self.x = 1
   
           class C2(trellis.Component):
               c1 = trellis.make(C1)
               trellis.compute()
               def calc(self):
                   return self.c1.x
           C2().calc
   
       def __testMaintainReassign(self):
           class C(trellis.Component):
               x = trellis.attr()
               trellis.maintain()
               def rule(self):
                   self.x = 10
           d(trellis.atomically)
           def test():
               C(x = 1)
   
       def testFalsePositiveDepCycle(self):
   
           c1 = trellis.Cell(value=1)
   
           d(trellis.Cell)
           def c2():
               return c1.value+1
   
           d(trellis.Cell)
           def c3():
               return c1.value+c2.value
   
           self.assertEqual(c3.value, 3)
   
           d(trellis.Cell)
           def c5():
               c1.value = 27
   
           d(trellis.atomically)
           def doit():
               c5.value
               for c in c2, c3:
                   trellis.ctrl.has_run.setdefault(c, 1)
                   trellis.on_undo(trellis.ctrl.has_run.pop, c)
                   trellis.ctrl.to_retry.setdefault(c, 1)
               trellis.on_undo(trellis.ctrl._unrun, c2, [c3])
               trellis.ctrl._retry()
   
   
   
   
   
   
   
   
   
   
   
   
   
   
 class TestDefaultEventLoop(unittest.TestCase):  class TestDefaultEventLoop(unittest.TestCase):
   
     def setUp(self):      def setUp(self):
Line 968 
Line 1159 
         self.loop.flush()          self.loop.flush()
         self.assertEqual(log, [1, 3])          self.assertEqual(log, [1, 3])
   
       def testScheduleUndo(self):
           t = Time()
           t.auto_update = False
           t20 = t[20]
           log = []
           d(trellis.Cell)
           def checktime():
               t.reached(t20)
               log.append(t._events[t20._when])
               d(trellis.Performer)
               def err_after_reached():
                   if len(t._schedule)>1:
                       raise DummyError
           self.assertRaises(DummyError, checktime.get_value)
           self.assertEqual(t._schedule, [t20._when, Max])
           self.assertEqual(dict(t._events), {t20._when:log[0]})
           del checktime
           self.failUnless(isinstance(log.pop(), trellis.Sensor))
           self.assertEqual(dict(t._events), {})
           self.assertEqual(log, [])
   
       def force_rollback(self):
           d(trellis.Performer)
           def do_it():
               raise DummyError
   
   
   
       def testUpdateUndo(self):
           t = Time()
           t.auto_update = False
           t20 = t[20]
           d(trellis.Cell)
           def checktime():
               if t.reached(t20):
                   self.force_rollback()
           checktime.value
           self.assertEqual(t._schedule, [t20._when, Max])
           self.assertEqual(list(t._events), [t20._when])
           self.assertRaises(DummyError, t.advance, 20)
           self.assertEqual(t._schedule, [t20._when, Max])
           self.assertEqual(list(t._events), [t20._when])
   
   
   
   
   
Line 1005 
Line 1237 
         def f():          def f():
             self.failUnless(self.ctrl.active)              self.failUnless(self.ctrl.active)
             log.append(1)              log.append(1)
             yield trellis.Pause              yield activity.Pause
             self.failUnless(self.ctrl.active)              self.failUnless(self.ctrl.active)
             log.append(2)              log.append(2)
         t = trellis.TaskCell(f)          t = activity.TaskCell(f)
         self.assertEqual(log, [])          self.assertEqual(log, [])
         t._loop.flush()          t._loop.flush()
         self.assertEqual(log, [1])          self.assertEqual(log, [1])
Line 1024 
Line 1256 
             while v.value:              while v.value:
                 log.append(v.value)                  log.append(v.value)
                 v1.value = v.value                  v1.value = v.value
                 yield trellis.Pause                  yield activity.Pause
         t = trellis.TaskCell(f)          t = activity.TaskCell(f)
         check = []          check = []
         for j in 42, 57, 99, 106, 23, None:          for j in 42, 57, 99, 106, 23, None:
             self.assertEqual(log, check)              self.assertEqual(log, check)
Line 1040 
Line 1272 
     def testPauseAndCall(self):      def testPauseAndCall(self):
         log = []          log = []
         class TaskExample(trellis.Component):          class TaskExample(trellis.Component):
             trellis.values(              trellis.attrs(
                 start = False,                  start = False,
                 stop = False                  stop = False
             )              )
Line 1048 
Line 1280 
             def wait_for_start(self):              def wait_for_start(self):
                 log.append("waiting to start")                  log.append("waiting to start")
                 while not self.start:                  while not self.start:
                     yield trellis.Pause                      yield activity.Pause
   
             def wait_for_stop(self):              def wait_for_stop(self):
                 while not self.stop:                  while not self.stop:
                     log.append("waiting to stop")                      log.append("waiting to stop")
                     yield trellis.Pause                      yield activity.Pause
   
             d(trellis.task)              activity.task()
             def demo(self):              def demo(self):
                 yield self.wait_for_start()                  yield self.wait_for_start()
                 log.append("starting")                  log.append("starting")
Line 1083 
Line 1315 
         def f1():          def f1():
             yield 42              yield 42
         def f2():          def f2():
             yield f1(); yield trellis.resume()              yield f1(); yield activity.resume()
         def f3():          def f3():
             yield f2(); v = trellis.resume()              yield f2(); v = activity.resume()
             log.append(v)              log.append(v)
   
         t = trellis.TaskCell(f3)          t = activity.TaskCell(f3)
         EventLoop.flush()          EventLoop.flush()
         self.assertEqual(log, [42])          self.assertEqual(log, [42])
   
         log = []          log = []
         def f1():          def f1():
             yield trellis.Return(42)              yield activity.Return(42)
   
         t = trellis.TaskCell(f3)          t = activity.TaskCell(f3)
         EventLoop.flush()          EventLoop.flush()
         self.assertEqual(log, [42])          self.assertEqual(log, [42])
   
Line 1107 
Line 1339 
             raise DummyError              raise DummyError
         def f2():          def f2():
             try:              try:
                 yield f1(); trellis.resume()                  yield f1(); activity.resume()
             except DummyError:              except DummyError:
                 log.append(True)                  log.append(True)
             else:              else:
                 pass                  pass
   
         t = trellis.TaskCell(f2)          t = activity.TaskCell(f2)
         self.assertEqual(log, [])          self.assertEqual(log, [])
         EventLoop.flush()          EventLoop.flush()
         self.assertEqual(log, [True])          self.assertEqual(log, [True])
Line 1137 
Line 1369 
   
             def throw(self, typ, val, tb):              def throw(self, typ, val, tb):
                 log.append(typ)                  log.append(typ)
                 log.append(type(val))                  log.append(val.__class__)   # type(val) is instance in Py<2.5
                 log.append(type(tb))                  log.append(type(tb))
                 raise StopIteration                  raise StopIteration
   
         def fs(): yield SendThrowIter()          def fs(): yield SendThrowIter()
         t = trellis.TaskCell(fs)          t = activity.TaskCell(fs)
         self.assertEqual(log, [])          self.assertEqual(log, [])
         EventLoop.flush()          EventLoop.flush()
         self.assertEqual(log, [99, DummyError,DummyError, types.TracebackType])          self.assertEqual(log, [99, DummyError,DummyError, types.TracebackType])
Line 1160 
Line 1392 
   
   
   
 def additional_tests():      def testResumeOnlyOnceUntilFlushed(self):
     import doctest, sys          log = []
     files = [          c1 = trellis.Value(1)
         'README.txt', 'STM-Observer.txt', 'Collections.txt', 'Internals.txt',          c2 = trellis.Value(2)
         'Specification.txt',          def f():
     ][(sys.version<'2.4')*3:]   # README.txt uses decorator syntax              for i in range(3):
     return doctest.DocFileSuite(                  c1.value, c2.value
         optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE, *files                  log.append(i)
     )                  yield activity.Pause
   
           t = activity.TaskCell(f)
           self.assertEqual(log, [])
           EventLoop.flush()
           self.assertEqual(log, [0])
           c1.value = 3
           self.assertEqual(log, [0])
           c2.value = 4
           EventLoop.flush()
           self.assertEqual(log, [0, 1])
   
   
   
   
   
   
   
   
   
   
Line 1184 
Line 1433 
   
   
   
       def testNoTodoRollbackIntoTask(self):
           class CV(trellis.Component):
               v = trellis.attr(False)
               s = trellis.make(trellis.Set)
   
               trellis.maintain()
               def maintain(self):
                   if self.v:
                       self.s.add(1)
                   else:
                       self.s.discard(1)
   
               trellis.perform()
               def perform(s):
                   self.assertEqual(s.v, True)
   
           d(activity.TaskCell)
           def task():
               cv = CV()
               cv.v = True
               yield activity.Pause
   
           EventLoop.run()
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   def additional_tests():
       import doctest, sys
       files = [
           'README.txt', 'STM-Observer.txt', 'Activity.txt', 'Collections.txt',
           'Internals.txt',
       ][(sys.version<'2.4')*3:]   # All but Internals+Collections use decorator syntax
       try:
           from sqlalchemy.orm.attributes import ClassManager
       except ImportError:
           pass
       else:
           files.insert(0, 'SQLAlchemy.txt')
       return doctest.DocFileSuite(
           optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE, *files
       )
   
   
   
   
   
   
   
   
   
   
   
   


Generate output suitable for use with a patch program
Legend:
Removed from v.2436  
changed lines
  Added in v.2595

cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help