from test_sets import * |
from test_sets import * |
from peak import context |
from peak import context |
from peak.events.activity import EventLoop, TwistedEventLoop, Time, NOT_YET |
from peak.events.activity import EventLoop, TwistedEventLoop, Time, NOT_YET |
from peak.events import trellis, stm |
from peak.events import trellis, stm, collections, activity |
from peak.util.decorators import rewrap, decorate as d |
from peak.util.decorators import rewrap, decorate as d |
from peak.util.extremes import Max |
from peak.util.extremes import Max |
import unittest, heapq, mocker, types |
import unittest, heapq, mocker, types, sys |
|
|
try: |
try: |
import testreactor |
import testreactor |
return False |
return False |
|
|
|
|
|
try: |
|
set |
|
except NameError: |
|
from sets import Set as set |
|
|
if wx: |
if wx: |
class TestWxEventLoop(EventLoopTestCase): |
class TestWxEventLoop(EventLoopTestCase): |
def configure_context(self): |
def configure_context(self): |
from peak.events.activity import EventLoop, WXEventLoop |
from peak.events.activity import EventLoop, WXEventLoop |
EventLoop <<= WXEventLoop |
EventLoop <<= WXEventLoop |
self.app = wx.PySimpleApp(redirect=False) |
self.app = wx.PySimpleApp(redirect=False) |
self.app.ExitOnFrameDelete = False |
self.frame = wx.Frame(None) |
|
|
def testSequentialCalls(self): |
def testSequentialCalls(self): |
log = [] |
log = [] |
EventLoop.call(log.append, 2) |
EventLoop.call(log.append, 2) |
EventLoop.call(log.append, 3) |
EventLoop.call(log.append, 3) |
EventLoop.call(log.append, 4) |
EventLoop.call(log.append, 4) |
|
event = Time[0.00001] |
|
def c(): |
|
if event: |
|
# events aren't |
EventLoop.call(EventLoop.stop) |
EventLoop.call(EventLoop.stop) |
|
c = trellis.Cell(c) |
|
c.value |
|
|
|
# This will loop indefinitely, if sub-millisecond events aren't |
|
# rounded up to the next millisecond. |
EventLoop.run() |
EventLoop.run() |
|
self.frame.Destroy() |
self.assertEqual(log, [1,2,3,4]) |
self.assertEqual(log, [1,2,3,4]) |
|
|
# XXX this should test timing stuff, but the only way to do that |
# XXX this should test more timing stuff, but the only way to do it |
# is with a wx mock, which I haven't time for as yet. |
# is with a wx mock, which I haven't time for as yet. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if testreactor: |
if testreactor: |
|
|
class TestReactorEventLoop(EventLoopTestCase, testreactor.ReactorTestCase): |
class TestReactorEventLoop(EventLoopTestCase, testreactor.ReactorTestCase): |
EventLoop.call(log.append, 4) |
EventLoop.call(log.append, 4) |
|
|
class IdleTimer(trellis.Component): |
class IdleTimer(trellis.Component): |
trellis.values( |
trellis.attrs( |
idle_for = NOT_YET, |
|
idle_timeout = 20, |
idle_timeout = 20, |
busy = False, |
busy = False, |
) |
) |
trellis.rules( |
idle_for = trellis.maintain( |
idle_for = lambda self: |
lambda self: self.idle_for.begins_with(not self.busy), |
self.idle_for.begins_with(not self.busy) |
initially=NOT_YET |
) |
) |
|
trellis.maintain() # XXX should be perform |
def alarm(self): |
def alarm(self): |
if self.idle_for[self.idle_timeout] and EventLoop.running: |
if self.idle_for[self.idle_timeout] and EventLoop.running: |
log.append(5) |
log.append(5) |
EventLoop.stop() |
EventLoop.stop() |
alarm = trellis.rule(alarm) |
|
|
|
it = IdleTimer() |
it = IdleTimer() |
EventLoop.run() |
EventLoop.run() |
|
|
|
|
|
|
|
|
class TestLinks(unittest.TestCase): |
class TestLinks(unittest.TestCase): |
|
|
def setUp(self): |
def setUp(self): |
# Verify correct cleanup in all scenarios |
# Verify correct cleanup in all scenarios |
for k,v in dict( |
for k,v in dict( |
undo=[], managers={}, queues={}, layers=[], reads={}, writes={}, |
undo=[], managers={}, queues={}, layers=[], reads={}, writes={}, |
has_run={}, last_listener=None, last_notified=None, last_save=None, |
has_run={}, destinations=None, routes=None, |
current_listener=None, readonly=False, in_cleanup=False, |
current_listener=None, readonly=False, in_cleanup=False, |
active=False, at_commit=[], to_retry={} |
active=False, at_commit=[], to_retry={} |
).items(): |
).items(): |
self.assertEqual(self.ctrl.writes, {self.s2:self.t0}) |
self.assertEqual(self.ctrl.writes, {self.s2:self.t0}) |
self.ctrl.reads.clear() # these would normally be handled by |
self.ctrl.reads.clear() # these would normally be handled by |
self.ctrl.writes.clear() # the run() method's try/finally |
self.ctrl.writes.clear() # the run() method's try/finally |
|
self.ctrl.current_listener = None # reset |
|
|
d(a) |
d(a) |
def testNoReadDuringCommit(self): |
def testNoReadDuringCommit(self): |
sp = self.ctrl.savepoint(); self.ctrl.has_run[self.t1] = self.t1 |
sp = self.ctrl.savepoint(); self.ctrl.has_run[self.t1] = self.t1 |
self.ctrl._process_writes(self.t1) |
self.ctrl._process_writes(self.t1) |
# Only t0 is notified, not t1, since t1 is the listener |
# Only t0 is notified, not t1, since t1 is the listener |
self.assertEqual(self.ctrl.last_notified, {self.t0: 1}) |
|
self.assertEqual(self.ctrl.queues, {2: {self.t0:1}}) |
self.assertEqual(self.ctrl.queues, {2: {self.t0:1}}) |
self.ctrl.rollback_to(sp) |
self.ctrl.rollback_to(sp) |
self.assertEqual(self.ctrl.last_notified, None) |
self.ctrl.current_listener = None # reset |
|
|
d(a) |
d(a) |
def testDependencyUpdatingAndUndo(self): |
def testDependencyUpdatingAndUndo(self): |
self.assertEqual(list(self.t0.iter_subjects()), [self.s2, self.s1]) |
self.assertEqual(list(self.t0.iter_subjects()), [self.s2, self.s1]) |
self.ctrl.rollback_to(sp) |
self.ctrl.rollback_to(sp) |
self.assertEqual(list(self.t0.iter_subjects()), [s3, self.s1]) |
self.assertEqual(list(self.t0.iter_subjects()), [s3, self.s1]) |
|
self.ctrl.current_listener = None # reset |
|
|
|
|
|
|
|
|
def runAs(self, listener, rule, initialized=True): |
def runAs(self, listener, rule): |
listener.run = rule |
listener.run = rule |
self.ctrl.run_rule(listener, initialized) |
self.ctrl.run_rule(listener) |
|
|
d(a) |
d(a) |
def testIsRunningAndHasRan(self): |
def testIsRunningAndHasRan(self): |
def rule(): |
def rule(): |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.assertEqual(self.ctrl.last_listener, self.t1) |
self.assertEqual(self.ctrl.has_run, {self.t1: 0}) |
self.assertEqual(self.ctrl.has_run, {self.t1: self.t1}) |
|
sp = self.ctrl.savepoint() |
sp = self.ctrl.savepoint() |
self.runAs(self.t1, rule) |
self.runAs(self.t1, rule) |
self.assertEqual(self.ctrl.last_save, sp) |
|
self.assertEqual(self.ctrl.current_listener, None) |
self.assertEqual(self.ctrl.current_listener, None) |
self.assertEqual(self.ctrl.last_listener, self.t1) |
self.assertEqual(self.ctrl.has_run, {self.t1: 0}) |
self.assertEqual(self.ctrl.has_run, {self.t1: self.t1}) |
|
self.ctrl.rollback_to(sp) # should clear last_listener, last_save |
|
|
|
d(a) |
d(a) |
def testIsRunningButHasNotRan(self): |
def testIsRunningButHasNotRan(self): |
def rule(): |
def rule(): |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.assertEqual(self.ctrl.last_listener, None) |
|
self.assertEqual(self.ctrl.has_run, {}) |
self.assertEqual(self.ctrl.has_run, {}) |
sp = self.ctrl.savepoint() |
sp = self.ctrl.savepoint() |
self.runAs(self.t1, rule, False) # uninit'd rule |
self.t1.run = rule; self.ctrl.initialize(self.t1) # uninit'd rule |
self.assertEqual(self.ctrl.last_save, None) |
|
self.assertEqual(self.ctrl.current_listener, None) |
self.assertEqual(self.ctrl.current_listener, None) |
self.assertEqual(self.ctrl.last_listener, None) |
|
self.assertEqual(self.ctrl.has_run, {}) |
self.assertEqual(self.ctrl.has_run, {}) |
self.ctrl.rollback_to(sp) # should clear last_listener, last_save |
|
|
|
d(a) |
|
def testClearLastListener(self): |
|
self.runAs(self.t1, lambda:1) |
|
self.assertEqual(self.ctrl.last_listener, self.t1) |
|
# last_listener should be cleared by cleanup() |
|
|
|
d(a) |
d(a) |
def testScheduleUndo(self): |
def testScheduleUndo(self): |
self.ctrl.rollback_to(sp) |
self.ctrl.rollback_to(sp) |
self.assertEqual(self.ctrl.queues, {}) |
self.assertEqual(self.ctrl.queues, {}) |
|
|
|
def testNestedReadOnly(self): |
|
log = [] |
|
def aRule(): |
|
log.append(trellis.ctrl.readonly); return 1 |
|
c1 = trellis.Cell(aRule) |
|
c2 = trellis.Cell(lambda: c1.value * aRule()) |
|
c3 = trellis.Performer(lambda: c2.value) |
|
self.assertEqual(log, [True, True]) |
|
|
d(a) |
d(a) |
def testWriteProcessingInRun(self): |
def testWriteProcessingInRun(self): |
self.runAs(self.t1, rule) |
self.runAs(self.t1, rule) |
# Only t0 is notified, not t1, since t1 is the listener & t3 is !dirty |
# Only t0 is notified, not t1, since t1 is the listener & t3 is !dirty |
self.assertEqual(self.ctrl.writes, {}) |
self.assertEqual(self.ctrl.writes, {}) |
self.assertEqual(self.ctrl.last_notified, {self.t0: 1}) |
|
self.assertEqual(self.ctrl.queues, {2: {self.t0:1}}) |
self.assertEqual(self.ctrl.queues, {2: {self.t0:1}}) |
self.ctrl.cancel(self.t0) |
self.ctrl.cancel(self.t0) |
|
|
self.assertEqual(self.ctrl.readonly, False) |
self.assertEqual(self.ctrl.readonly, False) |
|
|
|
|
|
|
d(a) |
d(a) |
def testRunClearsReadWriteOnError(self): |
def testRunClearsReadWriteOnError(self): |
def rule(): |
def rule(): |
try: |
try: |
self.ctrl._retry() |
self.ctrl._retry() |
except stm.CircularityError, e: |
except stm.CircularityError, e: |
self.assertEqual(e.args[0], |
self.assertEqual(e.args[0], {self.t0: set([self.t1]), |
{self.t0: set([self.t1]), self.t1: set([self.t2]), |
self.t1: set([self.t2]), self.t2: set([self.t0, self.t1])}) |
self.t2: set([self.t0, self.t1])}) |
self.assertEqual(e.args[1], (self.t1, self.t2, self.t0)) |
else: |
else: |
raise AssertionError("Should've caught a cycle") |
raise AssertionError("Should've caught a cycle") |
|
|
def testSimpleRetry(self): |
def testSimpleRetry(self): |
def rule(): |
def rule(): |
pass |
pass |
sp = self.ctrl.savepoint() |
|
self.runAs(self.t0, rule) |
self.runAs(self.t0, rule) |
self.runAs(self.t1, rule) |
self.runAs(self.t1, rule) |
self.runAs(self.t2, rule) |
self.runAs(self.t2, rule) |
|
self.assertEqual(set(self.ctrl.has_run),set([self.t0,self.t1,self.t2])) |
self.ctrl.to_retry[self.t1]=1 |
self.ctrl.to_retry[self.t1]=1 |
self.ctrl._retry(); self.ctrl.to_retry.clear() |
self.ctrl._retry() |
self.assertEqual(self.ctrl.last_listener, self.t0) |
self.assertEqual(set(self.ctrl.has_run), set([self.t0])) |
self.assertEqual(self.ctrl.last_save, sp) |
|
self.ctrl.to_retry[self.t0]=1 |
self.ctrl.to_retry[self.t0]=1 |
self.ctrl._retry(); self.ctrl.to_retry.clear() |
self.ctrl._retry() |
self.assertEqual(self.ctrl.last_save, None) |
|
|
|
d(a) |
d(a) |
def testNestedNoRetry(self): |
def testNestedNoRetry(self): |
def rule0(): |
def rule0(): |
self.runAs(self.t1, rule1, False) |
self.t1.run=rule1; self.ctrl.initialize(self.t1) |
def rule1(): |
def rule1(): |
pass |
pass |
self.runAs(self.t2, rule1) |
self.runAs(self.t2, rule1) |
self.runAs(self.t0, rule0) |
self.runAs(self.t0, rule0) |
self.ctrl.schedule(self.t1) |
self.ctrl.schedule(self.t1) |
self.assertEqual(self.ctrl.to_retry, {}) |
self.assertEqual(self.ctrl.to_retry, {}) |
self.assertEqual(self.ctrl.last_listener, self.t0) |
self.assertEqual( |
self.assertEqual(self.ctrl.to_retry, {}) |
set(self.ctrl.has_run), set([self.t0, self.t2]) |
|
) |
self.assertEqual(self.ctrl.queues, {1: {self.t1:1}}) |
self.assertEqual(self.ctrl.queues, {1: {self.t1:1}}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def testRunScheduled(self): |
def testRunScheduled(self): |
log = [] |
log = [] |
self.t1.run = lambda: log.append(True) |
self.t1.run = lambda: log.append(True) |
self.assertEqual(log, [True]) |
self.assertEqual(log, [True]) |
|
|
|
|
|
|
def testRollbackReschedules(self): |
def testRollbackReschedules(self): |
sp = [] |
sp = [] |
def rule0(): |
def rule0(): |
sp.append(self.ctrl.savepoint()) |
sp.append(self.ctrl.savepoint()) |
self.ctrl.atomically(go) |
self.ctrl.atomically(go) |
|
|
def testManagerCanCreateLoop(self): |
def testManagerCantCreateLoop(self): |
class Mgr: |
class Mgr: |
def __enter__(self): pass |
def __enter__(self): pass |
def __exit__(*args): |
def __exit__(*args): |
self.t1.run = rule1 |
self.t1.run = rule1 |
self.t0.run = lambda:self.ctrl.manage(Mgr()) |
self.t0.run = lambda:self.ctrl.manage(Mgr()) |
self.ctrl.atomically(self.ctrl.schedule, self.t0) |
self.ctrl.atomically(self.ctrl.schedule, self.t0) |
|
self.assertEqual(log, []) |
|
self.ctrl.atomically(lambda:None) |
self.assertEqual(log, [True]) |
self.assertEqual(log, [True]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
d(a) |
d(a) |
def testNotifyOnChange(self): |
def testNotifyOnChange(self): |
stm.Link(self.s2, self.t2) |
stm.Link(self.s2, self.t2) |
self.assertEqual(self.ctrl.queues, {2: {self.t2:1}}) |
self.assertEqual(self.ctrl.queues, {2: {self.t2:1}}) |
self.ctrl.cancel(self.t2) |
self.ctrl.cancel(self.t2) |
self.ctrl.writes.clear() |
self.ctrl.writes.clear() |
|
self.ctrl.current_listener = None # reset |
|
|
|
|
|
|
def testCommitCanLoop(self): |
def testCommitCanLoop(self): |
log=[] |
log=[] |
self.ctrl.on_undo(undo) |
self.ctrl.on_undo(undo) |
self.ctrl.rollback_to(0) |
self.ctrl.rollback_to(0) |
|
|
|
d(a) |
|
def testReentrantRollbackToMinimumTarget(self): |
|
sp = self.ctrl.savepoint() |
|
# these 2 rollbacks will be ignored, since they target a higher sp. |
|
# note that both are needed for testing, as one is there to potentially |
|
# set a new target, and the other is there to make the offset wrong if |
|
# the rollback stops prematurely. |
|
self.ctrl.on_undo(self.ctrl.rollback_to, sp+100) |
|
self.ctrl.on_undo(self.ctrl.rollback_to, sp+100) |
|
sp2 = self.ctrl.savepoint() |
|
|
|
# ensure that there's no way this test can pass unless rollback_to |
|
# notices re-entrant invocations (because it would overflow the stack) |
|
for i in range(sys.getrecursionlimit()*2): |
|
# request a rollback all the way to 0; this target should be used |
|
# in place of the sp2 target or sp+100 targets, since it will be |
|
# the lowest target encountered during the rollback. |
|
self.ctrl.on_undo(self.ctrl.rollback_to, sp) |
|
|
|
self.ctrl.rollback_to(sp2) # ask to rollback to posn 2 |
|
self.assertEqual(self.ctrl.savepoint(), sp) # but should rollback to 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
d(a) |
d(a) |
def testNestedRule(self): |
def testNestedRule(self): |
def rule1(): |
def rule1(): |
self.assertEqual(self.ctrl.last_listener, self.t1) |
self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1])) |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.ctrl.used(self.s1) |
self.ctrl.used(self.s1) |
self.ctrl.changed(self.s2) |
self.ctrl.changed(self.s2) |
self.assertEqual(self.ctrl.reads, {self.s1:1}) |
self.assertEqual(self.ctrl.reads, {self.s1:1}) |
self.assertEqual(self.ctrl.writes, {self.s2:self.t1}) |
self.assertEqual(self.ctrl.writes, {self.s2:self.t1}) |
self.runAs(self.t2, rule2, False) |
self.t2.run=rule2; self.ctrl.initialize(self.t2) |
self.assertEqual(self.ctrl.last_listener, self.t1) |
self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1])) |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.assertEqual(self.ctrl.reads, {self.s1:1}) |
self.assertEqual(self.ctrl.reads, {self.s1:1}) |
self.assertEqual(self.ctrl.writes, {self.s2:self.t1, s3:self.t2}) |
self.assertEqual(self.ctrl.writes, {self.s2:self.t1, s3:self.t2}) |
|
|
def rule2(): |
def rule2(): |
self.assertEqual(self.ctrl.last_listener, self.t1) |
self.assertEqual(set(self.ctrl.has_run), set([self.t0, self.t1])) |
self.assertEqual(self.ctrl.current_listener, self.t2) |
self.assertEqual(self.ctrl.current_listener, self.t2) |
self.assertEqual(self.ctrl.reads, {}) |
self.assertEqual(self.ctrl.reads, {}) |
self.assertEqual(self.ctrl.writes, {self.s2:self.t1}) |
self.assertEqual(self.ctrl.writes, {self.s2:self.t1}) |
s3 = TestSubject(); s3.name = 's3' |
s3 = TestSubject(); s3.name = 's3' |
self.runAs(self.t0, rule0) |
self.runAs(self.t0, rule0) |
self.runAs(self.t1, rule1) |
self.runAs(self.t1, rule1) |
self.assertEqual(self.ctrl.has_run, |
self.assertEqual( |
{self.t1:self.t1, self.t0: self.t0} # t2 was new, so doesn't show |
set(self.ctrl.has_run), |
|
set([self.t1, self.t0]) # t2 was new, so doesn't show |
) |
) |
self.assertEqual(list(self.t1.iter_subjects()), [self.s1]) |
self.assertEqual(list(self.t1.iter_subjects()), [self.s1]) |
self.assertEqual(list(self.t2.iter_subjects()), [self.s2]) |
self.assertEqual(list(self.t2.iter_subjects()), [self.s2]) |
self.ctrl.rollback_to(self.ctrl.last_save) # should undo both t1/t2 |
self.ctrl.rollback_to(self.ctrl.has_run[self.t1]) # should undo both t1/t2 |
self.assertEqual(self.ctrl.last_listener, self.t0) |
|
|
|
|
|
|
|
|
|
|
def testUndoLogSpansMultipleRecalcs(self): |
|
c1 = trellis.Value(False, discrete=True) |
|
c2 = trellis.Cell(lambda: (c1.value, log.append(trellis.savepoint()))) |
|
log = []; c2.value; log = []; c1.value = True |
|
self.failUnless(len(log)==2 and log[1]>log[0], log) |
|
|
|
def testUndoPostCommitCancelsUndoOfCommitSchedule(self): |
|
c1 = trellis.Value(False, discrete=True) |
|
def c2(): |
|
c1.value |
|
log.append(trellis.savepoint()) |
|
if len(log)==2: |
|
raise DummyError |
|
c2 = trellis.Cell(c2) |
|
log = []; c2.value; log = []; |
|
# This will raise a different error if undoing the on-commit stack |
|
# causes an underflow: |
|
self.assertRaises(DummyError, setattr, c1, 'value', True) |
|
|
|
|
|
class TestTime(unittest.TestCase): |
|
|
|
def testIndependentNextEventTime(self): |
|
# Ensure that next_event_time() never returns a *past* time |
|
t = Time() |
|
t.auto_update = False |
|
t20 = t[20] |
|
t40 = t[40] |
|
d(trellis.Cell) |
|
def check_reached(): |
|
t.reached(t20) |
|
t.reached(t40) |
|
nt = t.next_event_time(True) |
|
self.failIf(nt is not None and nt<=0) |
|
check_reached.value |
|
t.advance(25) |
|
t.advance(15) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
d(a) |
|
def testPartialRollbackList(self): |
|
c1 = trellis.Cell(value=42) |
|
l = trellis.List() |
|
l.append(1) |
|
self.assertEqual(l.future, [1]) |
|
sp = self.ctrl.savepoint() |
|
self.ctrl.change_attr(self.ctrl, 'current_listener', c1) |
|
l.append(2) |
|
self.assertEqual(l.future, [1, 2]) |
|
self.ctrl.rollback_to(sp) |
|
self.assertEqual(l.future, [1]) |
|
|
|
d(a) |
|
def testPartialRollbackDict(self): |
|
c1 = trellis.Cell(lambda:None) |
|
d = trellis.Dict() |
|
d[1] = 2 |
|
self.assertEqual(d.added, {1:2}) |
|
sp = self.ctrl.savepoint() |
|
self.ctrl.change_attr(self.ctrl, 'current_listener', c1) |
|
d[2] = 3 |
|
self.assertEqual(d.added, {1:2, 2:3}) |
|
self.ctrl.rollback_to(sp) |
|
self.assertEqual(d.added, {1:2}) |
|
|
|
d(a) |
|
def testPartialRollbackSet(self): |
|
c1 = trellis.Cell(lambda:None) |
|
s = trellis.Set() |
|
s.add(1) |
|
self.assertEqual(list(s.added), [1]) |
|
sp = self.ctrl.savepoint() |
|
self.ctrl.change_attr(self.ctrl, 'current_listener', c1) |
|
s.add(2) |
|
self.assertEqual(list(s.added), [1, 2]) |
|
self.ctrl.rollback_to(sp) |
|
self.assertEqual(list(s.added), [1]) |
|
|
|
|
|
|
|
def run_modifier_and_rule(self, func, rule): |
|
d(self.ctrl.atomically) |
|
def go(): |
|
self.ctrl.schedule(trellis.Cell(rule)) |
|
func.sp = self.ctrl.savepoint() |
|
trellis.modifier(func)() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def testSetShouldOverrideInitialCalculatedValue(self): |
|
class C(trellis.Component): |
|
trellis.maintain(optional=True) |
|
def calc(self): |
|
return 0 |
|
trellis.maintain() |
|
def getx(self): |
|
self.calc |
|
trellis.maintain() |
|
def set(self): |
|
# This should not conflict with .calc setting itself to 0 |
|
self.calc = 1 |
|
def __init__(self): |
|
self.getx |
|
self.set |
|
c = C() |
|
|
|
def testMakeDuringPerform(self): |
|
class C1(trellis.Component): |
|
x = trellis.attr() |
|
trellis.maintain() |
|
def rule(self): |
|
self.x = 1 |
|
|
|
class C2(trellis.Component): |
|
c1 = trellis.make(C1) |
|
trellis.compute() |
|
def calc(self): |
|
return self.c1.x |
|
C2().calc |
|
|
|
def __testMaintainReassign(self): |
|
class C(trellis.Component): |
|
x = trellis.attr() |
|
trellis.maintain() |
|
def rule(self): |
|
self.x = 10 |
|
d(trellis.atomically) |
|
def test(): |
|
C(x = 1) |
|
|
|
def testFalsePositiveDepCycle(self): |
|
|
|
c1 = trellis.Cell(value=1) |
|
|
|
d(trellis.Cell) |
|
def c2(): |
|
return c1.value+1 |
|
|
|
d(trellis.Cell) |
|
def c3(): |
|
return c1.value+c2.value |
|
|
|
self.assertEqual(c3.value, 3) |
|
|
|
d(trellis.Cell) |
|
def c5(): |
|
c1.value = 27 |
|
|
|
d(trellis.atomically) |
|
def doit(): |
|
c5.value |
|
for c in c2, c3: |
|
trellis.ctrl.has_run.setdefault(c, 1) |
|
trellis.on_undo(trellis.ctrl.has_run.pop, c) |
|
trellis.ctrl.to_retry.setdefault(c, 1) |
|
trellis.on_undo(trellis.ctrl._unrun, c2, [c3]) |
|
trellis.ctrl._retry() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestDefaultEventLoop(unittest.TestCase): |
class TestDefaultEventLoop(unittest.TestCase): |
|
|
def setUp(self): |
def setUp(self): |
self.loop.flush() |
self.loop.flush() |
self.assertEqual(log, [1, 3]) |
self.assertEqual(log, [1, 3]) |
|
|
|
def testScheduleUndo(self): |
|
t = Time() |
|
t.auto_update = False |
|
t20 = t[20] |
|
log = [] |
|
d(trellis.Cell) |
|
def checktime(): |
|
t.reached(t20) |
|
log.append(t._events[t20._when]) |
|
d(trellis.Performer) |
|
def err_after_reached(): |
|
if len(t._schedule)>1: |
|
raise DummyError |
|
self.assertRaises(DummyError, checktime.get_value) |
|
self.assertEqual(t._schedule, [t20._when, Max]) |
|
self.assertEqual(dict(t._events), {t20._when:log[0]}) |
|
del checktime |
|
self.failUnless(isinstance(log.pop(), trellis.Sensor)) |
|
self.assertEqual(dict(t._events), {}) |
|
self.assertEqual(log, []) |
|
|
|
def force_rollback(self): |
|
d(trellis.Performer) |
|
def do_it(): |
|
raise DummyError |
|
|
|
|
|
|
|
def testUpdateUndo(self): |
|
t = Time() |
|
t.auto_update = False |
|
t20 = t[20] |
|
d(trellis.Cell) |
|
def checktime(): |
|
if t.reached(t20): |
|
self.force_rollback() |
|
checktime.value |
|
self.assertEqual(t._schedule, [t20._when, Max]) |
|
self.assertEqual(list(t._events), [t20._when]) |
|
self.assertRaises(DummyError, t.advance, 20) |
|
self.assertEqual(t._schedule, [t20._when, Max]) |
|
self.assertEqual(list(t._events), [t20._when]) |
|
|
|
|
|
|
|
|
|
|
def f(): |
def f(): |
self.failUnless(self.ctrl.active) |
self.failUnless(self.ctrl.active) |
log.append(1) |
log.append(1) |
yield trellis.Pause |
yield activity.Pause |
self.failUnless(self.ctrl.active) |
self.failUnless(self.ctrl.active) |
log.append(2) |
log.append(2) |
t = trellis.TaskCell(f) |
t = activity.TaskCell(f) |
self.assertEqual(log, []) |
self.assertEqual(log, []) |
t._loop.flush() |
t._loop.flush() |
self.assertEqual(log, [1]) |
self.assertEqual(log, [1]) |
while v.value: |
while v.value: |
log.append(v.value) |
log.append(v.value) |
v1.value = v.value |
v1.value = v.value |
yield trellis.Pause |
yield activity.Pause |
t = trellis.TaskCell(f) |
t = activity.TaskCell(f) |
check = [] |
check = [] |
for j in 42, 57, 99, 106, 23, None: |
for j in 42, 57, 99, 106, 23, None: |
self.assertEqual(log, check) |
self.assertEqual(log, check) |
def testPauseAndCall(self): |
def testPauseAndCall(self): |
log = [] |
log = [] |
class TaskExample(trellis.Component): |
class TaskExample(trellis.Component): |
trellis.values( |
trellis.attrs( |
start = False, |
start = False, |
stop = False |
stop = False |
) |
) |
def wait_for_start(self): |
def wait_for_start(self): |
log.append("waiting to start") |
log.append("waiting to start") |
while not self.start: |
while not self.start: |
yield trellis.Pause |
yield activity.Pause |
|
|
def wait_for_stop(self): |
def wait_for_stop(self): |
while not self.stop: |
while not self.stop: |
log.append("waiting to stop") |
log.append("waiting to stop") |
yield trellis.Pause |
yield activity.Pause |
|
|
d(trellis.task) |
activity.task() |
def demo(self): |
def demo(self): |
yield self.wait_for_start() |
yield self.wait_for_start() |
log.append("starting") |
log.append("starting") |
def f1(): |
def f1(): |
yield 42 |
yield 42 |
def f2(): |
def f2(): |
yield f1(); yield trellis.resume() |
yield f1(); yield activity.resume() |
def f3(): |
def f3(): |
yield f2(); v = trellis.resume() |
yield f2(); v = activity.resume() |
log.append(v) |
log.append(v) |
|
|
t = trellis.TaskCell(f3) |
t = activity.TaskCell(f3) |
EventLoop.flush() |
EventLoop.flush() |
self.assertEqual(log, [42]) |
self.assertEqual(log, [42]) |
|
|
log = [] |
log = [] |
def f1(): |
def f1(): |
yield trellis.Return(42) |
yield activity.Return(42) |
|
|
t = trellis.TaskCell(f3) |
t = activity.TaskCell(f3) |
EventLoop.flush() |
EventLoop.flush() |
self.assertEqual(log, [42]) |
self.assertEqual(log, [42]) |
|
|
raise DummyError |
raise DummyError |
def f2(): |
def f2(): |
try: |
try: |
yield f1(); trellis.resume() |
yield f1(); activity.resume() |
except DummyError: |
except DummyError: |
log.append(True) |
log.append(True) |
else: |
else: |
pass |
pass |
|
|
t = trellis.TaskCell(f2) |
t = activity.TaskCell(f2) |
self.assertEqual(log, []) |
self.assertEqual(log, []) |
EventLoop.flush() |
EventLoop.flush() |
self.assertEqual(log, [True]) |
self.assertEqual(log, [True]) |
|
|
def throw(self, typ, val, tb): |
def throw(self, typ, val, tb): |
log.append(typ) |
log.append(typ) |
log.append(type(val)) |
log.append(val.__class__) # type(val) is instance in Py<2.5 |
log.append(type(tb)) |
log.append(type(tb)) |
raise StopIteration |
raise StopIteration |
|
|
def fs(): yield SendThrowIter() |
def fs(): yield SendThrowIter() |
t = trellis.TaskCell(fs) |
t = activity.TaskCell(fs) |
self.assertEqual(log, []) |
self.assertEqual(log, []) |
EventLoop.flush() |
EventLoop.flush() |
self.assertEqual(log, [99, DummyError,DummyError, types.TracebackType]) |
self.assertEqual(log, [99, DummyError,DummyError, types.TracebackType]) |
|
|
|
|
|
|
def additional_tests(): |
def testResumeOnlyOnceUntilFlushed(self): |
import doctest, sys |
log = [] |
files = [ |
c1 = trellis.Value(1) |
'README.txt', 'STM-Observer.txt', 'Collections.txt', 'Internals.txt', |
c2 = trellis.Value(2) |
'Specification.txt', |
def f(): |
][(sys.version<'2.4')*3:] # README.txt uses decorator syntax |
for i in range(3): |
return doctest.DocFileSuite( |
c1.value, c2.value |
optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE, *files |
log.append(i) |
) |
yield activity.Pause |
|
|
|
t = activity.TaskCell(f) |
|
self.assertEqual(log, []) |
|
EventLoop.flush() |
|
self.assertEqual(log, [0]) |
|
c1.value = 3 |
|
self.assertEqual(log, [0]) |
|
c2.value = 4 |
|
EventLoop.flush() |
|
self.assertEqual(log, [0, 1]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def testNoTodoRollbackIntoTask(self): |
|
class CV(trellis.Component): |
|
v = trellis.attr(False) |
|
s = trellis.make(trellis.Set) |
|
|
|
trellis.maintain() |
|
def maintain(self): |
|
if self.v: |
|
self.s.add(1) |
|
else: |
|
self.s.discard(1) |
|
|
|
trellis.perform() |
|
def perform(s): |
|
self.assertEqual(s.v, True) |
|
|
|
d(activity.TaskCell) |
|
def task(): |
|
cv = CV() |
|
cv.v = True |
|
yield activity.Pause |
|
|
|
EventLoop.run() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def additional_tests(): |
|
import doctest, sys |
|
files = [ |
|
'README.txt', 'STM-Observer.txt', 'Activity.txt', 'Collections.txt', |
|
'Internals.txt', |
|
][(sys.version<'2.4')*3:] # All but Internals+Collections use decorator syntax |
|
try: |
|
from sqlalchemy.orm.attributes import ClassManager |
|
except ImportError: |
|
pass |
|
else: |
|
files.insert(0, 'SQLAlchemy.txt') |
|
return doctest.DocFileSuite( |
|
optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE, *files |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|