[Subversion] / Trellis / test_trellis.py  

Diff of /Trellis/test_trellis.py

Parent Directory | Revision Log

version 2447, Fri Dec 28 16:20:47 2007 UTC version 2542, Thu May 15 16:06:30 2008 UTC
Line 1 
Line 1 
 from test_sets import *  from test_sets import *
 from peak import context  from peak import context
 from peak.events.activity import EventLoop, TwistedEventLoop, Time, NOT_YET  from peak.events.activity import EventLoop, TwistedEventLoop, Time, NOT_YET
 from peak.events import trellis, stm  from peak.events import trellis, stm, collections, activity
 from peak.util.decorators import rewrap, decorate as d  from peak.util.decorators import rewrap, decorate as d
 from peak.util.extremes import Max  from peak.util.extremes import Max
 import unittest, heapq, mocker, types, sys  import unittest, heapq, mocker, types, sys
Line 99 
Line 99 
             EventLoop.call(log.append, 4)              EventLoop.call(log.append, 4)
   
             class IdleTimer(trellis.Component):              class IdleTimer(trellis.Component):
                 trellis.values(                  trellis.attrs(
                     idle_for = NOT_YET,  
                     idle_timeout = 20,                      idle_timeout = 20,
                     busy = False,                      busy = False,
                 )                  )
                 trellis.rules(                  idle_for = trellis.maintain(
                     idle_for = lambda self:                      lambda self: self.idle_for.begins_with(not self.busy),
                         self.idle_for.begins_with(not self.busy)                      initially=NOT_YET
                 )                  )
                   trellis.maintain()  # XXX should be perform
                 def alarm(self):                  def alarm(self):
                     if self.idle_for[self.idle_timeout] and EventLoop.running:                      if self.idle_for[self.idle_timeout] and EventLoop.running:
                         log.append(5)                          log.append(5)
                         EventLoop.stop()                          EventLoop.stop()
                 alarm = trellis.rule(alarm)  
   
             it = IdleTimer()              it = IdleTimer()
             EventLoop.run()              EventLoop.run()
Line 121 
Line 120 
   
   
   
   
 class TestLinks(unittest.TestCase):  class TestLinks(unittest.TestCase):
   
     def setUp(self):      def setUp(self):
Line 324 
Line 324 
         self.assertEqual(self.ctrl.writes, {self.s2:self.t0})          self.assertEqual(self.ctrl.writes, {self.s2:self.t0})
         self.ctrl.reads.clear()     # these would normally be handled by          self.ctrl.reads.clear()     # these would normally be handled by
         self.ctrl.writes.clear()    # the run() method's try/finally          self.ctrl.writes.clear()    # the run() method's try/finally
           self.ctrl.current_listener = None   # reset
   
     d(a)      d(a)
     def testNoReadDuringCommit(self):      def testNoReadDuringCommit(self):
Line 346 
Line 346 
         # Only t0 is notified, not t1, since t1 is the listener          # Only t0 is notified, not t1, since t1 is the listener
         self.assertEqual(self.ctrl.queues, {2: {self.t0:1}})          self.assertEqual(self.ctrl.queues, {2: {self.t0:1}})
         self.ctrl.rollback_to(sp)          self.ctrl.rollback_to(sp)
           self.ctrl.current_listener = None   # reset
   
     d(a)      d(a)
     def testDependencyUpdatingAndUndo(self):      def testDependencyUpdatingAndUndo(self):
Line 362 
Line 362 
         self.assertEqual(list(self.t0.iter_subjects()), [self.s2, self.s1])          self.assertEqual(list(self.t0.iter_subjects()), [self.s2, self.s1])
         self.ctrl.rollback_to(sp)          self.ctrl.rollback_to(sp)
         self.assertEqual(list(self.t0.iter_subjects()), [s3, self.s1])          self.assertEqual(list(self.t0.iter_subjects()), [s3, self.s1])
           self.ctrl.current_listener = None   # reset
   
   
   
     def runAs(self, listener, rule, initialized=True):  
       def runAs(self, listener, rule):
         listener.run = rule          listener.run = rule
         self.ctrl.run_rule(listener, initialized)          self.ctrl.run_rule(listener)
   
     d(a)      d(a)
     def testIsRunningAndHasRan(self):      def testIsRunningAndHasRan(self):
Line 383 
Line 387 
             self.assertEqual(self.ctrl.current_listener, self.t1)              self.assertEqual(self.ctrl.current_listener, self.t1)
             self.assertEqual(self.ctrl.has_run, {})              self.assertEqual(self.ctrl.has_run, {})
         sp = self.ctrl.savepoint()          sp = self.ctrl.savepoint()
         self.runAs(self.t1, rule, False)    # uninit'd rule          self.t1.run = rule; self.ctrl.initialize(self.t1)    # uninit'd rule
         self.assertEqual(self.ctrl.current_listener, None)          self.assertEqual(self.ctrl.current_listener, None)
         self.assertEqual(self.ctrl.has_run, {})          self.assertEqual(self.ctrl.has_run, {})
   
Line 395 
Line 399 
         self.ctrl.rollback_to(sp)          self.ctrl.rollback_to(sp)
         self.assertEqual(self.ctrl.queues, {})          self.assertEqual(self.ctrl.queues, {})
   
       def testNestedReadOnly(self):
           log = []
           def aRule():
               log.append(trellis.ctrl.readonly); return 1
           c1 = trellis.Cell(aRule)
           c2 = trellis.Cell(lambda: c1.value * aRule())
           c3 = trellis.Performer(lambda: c2.value)
           self.assertEqual(log, [True, True])
   
   
   
   
   
     d(a)      d(a)
     def testWriteProcessingInRun(self):      def testWriteProcessingInRun(self):
Line 508 
Line 508 
     d(a)      d(a)
     def testNestedNoRetry(self):      def testNestedNoRetry(self):
         def rule0():          def rule0():
             self.runAs(self.t1, rule1, False)              self.t1.run=rule1; self.ctrl.initialize(self.t1)
         def rule1():          def rule1():
             pass              pass
         self.runAs(self.t2, rule1)          self.runAs(self.t2, rule1)
Line 543 
Line 543 
             sp.append(self.ctrl.savepoint())              sp.append(self.ctrl.savepoint())
         self.ctrl.atomically(go)          self.ctrl.atomically(go)
   
     def testManagerCanCreateLoop(self):      def testManagerCantCreateLoop(self):
         class Mgr:          class Mgr:
             def __enter__(self): pass              def __enter__(self): pass
             def __exit__(*args):              def __exit__(*args):
Line 554 
Line 554 
         self.t1.run = rule1          self.t1.run = rule1
         self.t0.run = lambda:self.ctrl.manage(Mgr())          self.t0.run = lambda:self.ctrl.manage(Mgr())
         self.ctrl.atomically(self.ctrl.schedule, self.t0)          self.ctrl.atomically(self.ctrl.schedule, self.t0)
           self.assertEqual(log, [])
           self.ctrl.atomically(lambda:None)
         self.assertEqual(log, [True])          self.assertEqual(log, [True])
   
     d(a)      d(a)
Line 566 
Line 568 
         self.assertEqual(self.ctrl.queues, {2: {self.t2:1}})          self.assertEqual(self.ctrl.queues, {2: {self.t2:1}})
         self.ctrl.cancel(self.t2)          self.ctrl.cancel(self.t2)
         self.ctrl.writes.clear()          self.ctrl.writes.clear()
           self.ctrl.current_listener = None   # reset
   
   
   
   
   
Line 622 
Line 622 
             self.ctrl.changed(self.s2)              self.ctrl.changed(self.s2)
             self.assertEqual(self.ctrl.reads, {self.s1:1})              self.assertEqual(self.ctrl.reads, {self.s1:1})
             self.assertEqual(self.ctrl.writes, {self.s2:self.t1})              self.assertEqual(self.ctrl.writes, {self.s2:self.t1})
             self.runAs(self.t2, rule2, False)              self.t2.run=rule2; self.ctrl.initialize(self.t2)
             self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1]))              self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1]))
             self.assertEqual(self.ctrl.current_listener, self.t1)              self.assertEqual(self.ctrl.current_listener, self.t1)
             self.assertEqual(self.ctrl.reads, {self.s1:1})              self.assertEqual(self.ctrl.reads, {self.s1:1})
Line 654 
Line 654 
   
   
   
       def testUndoLogSpansMultipleRecalcs(self):
           c1 = trellis.Value(False, discrete=True)
           c2 = trellis.Cell(lambda: (c1.value, log.append(trellis.savepoint())))
           log = []; c2.value; log = []; c1.value = True
           self.failUnless(len(log)==2 and log[1]>log[0], log)
   
       def testUndoPostCommitCancelsUndoOfCommitSchedule(self):
           c1 = trellis.Value(False, discrete=True)
           def c2():
               c1.value
               log.append(trellis.savepoint())
               if len(log)==2:
                   raise DummyError
           c2 = trellis.Cell(c2)
           log = []; c2.value; log = [];
           # This will raise a different error if undoing the on-commit stack
           # causes an underflow:
           self.assertRaises(DummyError, setattr, c1, 'value', True)
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
 class TestCells(mocker.MockerTestCase):  class TestCells(mocker.MockerTestCase):
   
     ctrl = stm.ctrl      ctrl = stm.ctrl
Line 900 
Line 941 
   
   
   
       d(a)
       def testTodoRollbackFuture(self):
           sp = self.ctrl.savepoint()
           tv = trellis.TodoValue(dict)
           self.assertEqual(tv._savepoint, None)
           tv.get_future()[1] = 2
           self.assertEqual(tv._savepoint, sp)
           sp2 = self.ctrl.savepoint()
           tv.get_future()[2] = 3
           self.assertEqual(tv._savepoint, sp)
           self.ctrl.rollback_to(sp2)
           self.assertEqual(self.ctrl.savepoint(), sp)
           self.assertEqual(tv._savepoint, None)
   
       d(a)
       def testTodoRollbackSet(self):
           sp = self.ctrl.savepoint()
           tv = trellis.TodoValue(dict)
           self.assertEqual(tv._savepoint, None)
           tv.get_future()[1] = 2
           self.assertEqual(tv._savepoint, sp)
           sp2 = self.ctrl.savepoint()
           tv.value = {2:3}
           self.assertEqual(tv._savepoint, sp)
           self.ctrl.rollback_to(sp2)
           self.assertEqual(self.ctrl.savepoint(), sp)
           self.assertEqual(tv._savepoint, None)
   
       d(a)
       def testFullRollbackList(self):
           l = trellis.List()
           sp = self.ctrl.savepoint()
           l.append(1)
           self.ctrl.on_undo(lambda:None)
           sp2 = self.ctrl.savepoint()
           l.append(2)
           self.ctrl.rollback_to(sp2)
           self.assertEqual(self.ctrl.savepoint(), sp)
   
   
   
       d(a)
       def testFullRollbackDict(self):
           d = trellis.Dict()
           sp = self.ctrl.savepoint()
           d[1] = 2
           self.ctrl.on_undo(lambda:None)
           sp2 = self.ctrl.savepoint()
           d[2] = 3
           self.ctrl.rollback_to(sp2)
           self.assertEqual(self.ctrl.savepoint(), sp)
   
       d(a)
       def testFullRollbackSet(self):
           s = trellis.Set()
           sp = self.ctrl.savepoint()
           s.add(1)
           self.ctrl.on_undo(lambda:None)
           sp2 = self.ctrl.savepoint()
           s.add(2)
           self.ctrl.rollback_to(sp2)
           self.assertEqual(self.ctrl.savepoint(), sp)
   
       def run_modifier_and_rule(self, func, rule):
           d(self.ctrl.atomically)
           def go():
               self.ctrl.schedule(trellis.Cell(rule))
               func.sp = self.ctrl.savepoint()
               trellis.modifier(func)()
   
   
   
   
   
   
   
   
   
   
   
   
   
       def testDictUndo(self):
           def do_it():
               dd[1] = 2
               self.ctrl.on_undo(lambda:None)
               do_it.sp2 = self.ctrl.savepoint()
               dd[4] = 6
               del dd[5]
           def rule():
               if dict(dd)=={4:5, 5:6}: return
               self.assertEqual(dict(dd), {1:2, 4:6})
               self.ctrl.rollback_to(do_it.sp2)
               self.assertEqual(self.ctrl.savepoint(), do_it.sp)
           dd = trellis.Dict()
           dd[4] = 5
           dd[5] = 6
           self.assertEqual(dict(dd), {4:5, 5:6})
           self.run_modifier_and_rule(do_it, rule)
           self.assertEqual(dict(dd), {4:5, 5:6})
   
       def testSetAndObservingUndo(self):
           def do_it():
               s.add(1)
               self.ctrl.on_undo(lambda:None)
               do_it.sp2 = self.ctrl.savepoint()
               s.add(3)
               s.remove(4)
           def rule():
               if set(s)==set([4,5]): return
               self.assertEqual(set(s), set([1,3,5]))
               self.ctrl.rollback_to(do_it.sp2)
               self.assertEqual(self.ctrl.savepoint(), do_it.sp)
           s = trellis.Set([])
           o = collections.Observing(keys=s)
           s.update([4,5])
           self.assertEqual(set(s), set([4,5]))
           self.assertEqual(set(o._watching), set([4,5]))
           self.run_modifier_and_rule(do_it, rule)
           self.assertEqual(set(s), set([4,5]))
           self.assertEqual(set(o._watching), set([4,5]))
   
   
 class TestDefaultEventLoop(unittest.TestCase):  class TestDefaultEventLoop(unittest.TestCase):
   
     def setUp(self):      def setUp(self):
Line 954 
Line 1118 
         self.loop.flush()          self.loop.flush()
         self.assertEqual(log, [1, 3])          self.assertEqual(log, [1, 3])
   
       def testScheduleUndo(self):
           t = Time()
           t.auto_update = False
           t20 = t[20]
           log = []
           d(trellis.Cell)
           def checktime():
               t.reached(t20)
               log.append(t._events[t20._when])
               d(trellis.Performer)
               def err_after_reached():
                   if len(t._schedule)>1:
                       raise DummyError
           self.assertRaises(DummyError, checktime.get_value)
           self.assertEqual(t._schedule, [t20._when, Max])
           self.assertEqual(dict(t._events), {})
           self.failUnless(isinstance(log.pop(), trellis.Value))
           self.assertEqual(log, [])
   
       def force_rollback(self):
           d(trellis.Performer)
           def do_it():
               raise DummyError
   
   
   
   
   
       def testUpdateUndo(self):
           t = Time()
           t.auto_update = False
           t20 = t[20]
           d(trellis.Cell)
           def checktime():
               if t.reached(t20):
                   self.force_rollback()
           checktime.value
           self.assertEqual(t._schedule, [t20._when, Max])
           self.assertEqual(list(t._events), [t20._when])
           self.assertRaises(DummyError, t.advance, 20)
           self.assertEqual(t._schedule, [t20._when, Max])
           self.assertEqual(list(t._events), [t20._when])
   
   
   
   
   
Line 991 
Line 1196 
         def f():          def f():
             self.failUnless(self.ctrl.active)              self.failUnless(self.ctrl.active)
             log.append(1)              log.append(1)
             yield trellis.Pause              yield activity.Pause
             self.failUnless(self.ctrl.active)              self.failUnless(self.ctrl.active)
             log.append(2)              log.append(2)
         t = trellis.TaskCell(f)          t = activity.TaskCell(f)
         self.assertEqual(log, [])          self.assertEqual(log, [])
         t._loop.flush()          t._loop.flush()
         self.assertEqual(log, [1])          self.assertEqual(log, [1])
Line 1010 
Line 1215 
             while v.value:              while v.value:
                 log.append(v.value)                  log.append(v.value)
                 v1.value = v.value                  v1.value = v.value
                 yield trellis.Pause                  yield activity.Pause
         t = trellis.TaskCell(f)          t = activity.TaskCell(f)
         check = []          check = []
         for j in 42, 57, 99, 106, 23, None:          for j in 42, 57, 99, 106, 23, None:
             self.assertEqual(log, check)              self.assertEqual(log, check)
Line 1026 
Line 1231 
     def testPauseAndCall(self):      def testPauseAndCall(self):
         log = []          log = []
         class TaskExample(trellis.Component):          class TaskExample(trellis.Component):
             trellis.values(              trellis.attrs(
                 start = False,                  start = False,
                 stop = False                  stop = False
             )              )
Line 1034 
Line 1239 
             def wait_for_start(self):              def wait_for_start(self):
                 log.append("waiting to start")                  log.append("waiting to start")
                 while not self.start:                  while not self.start:
                     yield trellis.Pause                      yield activity.Pause
   
             def wait_for_stop(self):              def wait_for_stop(self):
                 while not self.stop:                  while not self.stop:
                     log.append("waiting to stop")                      log.append("waiting to stop")
                     yield trellis.Pause                      yield activity.Pause
   
             d(trellis.task)              activity.task()
             def demo(self):              def demo(self):
                 yield self.wait_for_start()                  yield self.wait_for_start()
                 log.append("starting")                  log.append("starting")
Line 1069 
Line 1274 
         def f1():          def f1():
             yield 42              yield 42
         def f2():          def f2():
             yield f1(); yield trellis.resume()              yield f1(); yield activity.resume()
         def f3():          def f3():
             yield f2(); v = trellis.resume()              yield f2(); v = activity.resume()
             log.append(v)              log.append(v)
   
         t = trellis.TaskCell(f3)          t = activity.TaskCell(f3)
         EventLoop.flush()          EventLoop.flush()
         self.assertEqual(log, [42])          self.assertEqual(log, [42])
   
         log = []          log = []
         def f1():          def f1():
             yield trellis.Return(42)              yield activity.Return(42)
   
         t = trellis.TaskCell(f3)          t = activity.TaskCell(f3)
         EventLoop.flush()          EventLoop.flush()
         self.assertEqual(log, [42])          self.assertEqual(log, [42])
   
Line 1093 
Line 1298 
             raise DummyError              raise DummyError
         def f2():          def f2():
             try:              try:
                 yield f1(); trellis.resume()                  yield f1(); activity.resume()
             except DummyError:              except DummyError:
                 log.append(True)                  log.append(True)
             else:              else:
                 pass                  pass
   
         t = trellis.TaskCell(f2)          t = activity.TaskCell(f2)
         self.assertEqual(log, [])          self.assertEqual(log, [])
         EventLoop.flush()          EventLoop.flush()
         self.assertEqual(log, [True])          self.assertEqual(log, [True])
Line 1128 
Line 1333 
                 raise StopIteration                  raise StopIteration
   
         def fs(): yield SendThrowIter()          def fs(): yield SendThrowIter()
         t = trellis.TaskCell(fs)          t = activity.TaskCell(fs)
         self.assertEqual(log, [])          self.assertEqual(log, [])
         EventLoop.flush()          EventLoop.flush()
         self.assertEqual(log, [99, DummyError,DummyError, types.TracebackType])          self.assertEqual(log, [99, DummyError,DummyError, types.TracebackType])
Line 1146 
Line 1351 
   
   
   
       def testResumeOnlyOnceUntilFlushed(self):
           log = []
           c1 = trellis.Value(1)
           c2 = trellis.Value(2)
           def f():
               for i in range(3):
                   c1.value, c2.value
                   log.append(i)
                   yield activity.Pause
   
           t = activity.TaskCell(f)
           self.assertEqual(log, [])
           EventLoop.flush()
           self.assertEqual(log, [0])
           c1.value = 3
           self.assertEqual(log, [0])
           c2.value = 4
           EventLoop.flush()
           self.assertEqual(log, [0, 1])
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
 def additional_tests():  def additional_tests():
     import doctest, sys      import doctest, sys
     files = [      files = [
         'README.txt', 'STM-Observer.txt', 'Activity.txt', 'Collections.txt',          'README.txt', 'STM-Observer.txt', 'Activity.txt', 'Collections.txt',
         'Internals.txt', 'Specification.txt',          'Internals.txt',
     ][(sys.version<'2.4')*4:]   # README.txt uses decorator syntax      ][(sys.version<'2.4')*4:]   # All but Internals use decorator syntax
       try:
           from sqlalchemy.orm.attributes import ClassManager
       except ImportError:
           pass
       else:
           files.insert(0, 'SQLAlchemy.txt')
     return doctest.DocFileSuite(      return doctest.DocFileSuite(
         optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE, *files          optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE, *files
     )      )
Line 1181 
Line 1433 
   
   
   
   
   
   
   
   
   


Generate output suitable for use with a patch program
Legend:
Removed from v.2447  
changed lines
  Added in v.2542

cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help