def configure_context(self): |
def configure_context(self): |
pass |
pass |
|
|
class TestListener(stm.AbstractListener): pass |
class TestListener(stm.AbstractListener): |
class TestSubject(stm.AbstractSubject): pass |
def __repr__(self): return self.name |
|
class TestSubject(stm.AbstractSubject): |
|
def __repr__(self): return self.name |
class DummyError(Exception): pass |
class DummyError(Exception): pass |
class UndirtyListener(TestListener): |
class UndirtyListener(TestListener): |
def dirty(self): |
def dirty(self): |
return False |
return False |
|
|
|
|
|
|
|
|
if wx: |
if wx: |
class TestWxEventLoop(EventLoopTestCase): |
class TestWxEventLoop(EventLoopTestCase): |
def configure_context(self): |
def configure_context(self): |
class TestLinks(unittest.TestCase): |
class TestLinks(unittest.TestCase): |
|
|
def setUp(self): |
def setUp(self): |
self.l1 = TestListener() |
self.l1 = TestListener(); self.l1.name = 'l1' |
self.l2 = TestListener() |
self.l2 = TestListener(); self.l1.name = 'l2' |
self.s1 = TestSubject() |
self.s1 = TestSubject(); self.s1.name = 's1' |
self.s2 = TestSubject() |
self.s2 = TestSubject(); self.s2.name = 's2' |
self.lk11 = stm.Link(self.s1, self.l1) |
self.lk11 = stm.Link(self.s1, self.l1) |
self.lk12 = stm.Link(self.s1, self.l2) |
self.lk12 = stm.Link(self.s1, self.l2) |
self.lk21 = stm.Link(self.s2, self.l1) |
self.lk21 = stm.Link(self.s2, self.l1) |
|
|
def setUp(self): |
def setUp(self): |
self.ctrl = stm.Controller() |
self.ctrl = stm.Controller() |
self.t0 = TestListener() |
self.t0 = TestListener(); self.t0.name='t0'; |
self.t1 = TestListener(); self.t1.layer = 1 |
self.t1 = TestListener(); self.t1.name='t1'; self.t1.layer = 1 |
self.t2 = TestListener(); self.t2.layer = 2 |
self.t2 = TestListener(); self.t2.name='t2'; self.t2.layer = 2 |
self.t3 = UndirtyListener() |
self.t3 = UndirtyListener(); self.t3.name='t3' |
self.s1 = TestSubject(); self.s2 = TestSubject() |
self.s1 = TestSubject(); self.s2 = TestSubject() |
|
self.s1.name = 's1'; self.s2.name = 's2' |
|
|
def tearDown(self): |
def tearDown(self): |
# Verify correct cleanup in all scenarios |
# Verify correct cleanup in all scenarios |
undo=[], managers={}, queues={}, layers=[], reads={}, writes={}, |
undo=[], managers={}, queues={}, layers=[], reads={}, writes={}, |
has_run={}, last_listener=None, last_notified=None, last_save=None, |
has_run={}, last_listener=None, last_notified=None, last_save=None, |
current_listener=None, readonly=False, in_cleanup=False, |
current_listener=None, readonly=False, in_cleanup=False, |
active=False, at_commit=[] |
active=False, at_commit=[], to_retry={} |
).items(): |
).items(): |
val = getattr(self.ctrl, k) |
val = getattr(self.ctrl, k) |
self.assertEqual(val, v, '%s: %r' % (k,val)) |
self.assertEqual(val, v, '%s: %r' % (k,val)) |
|
|
def testScheduleSimple(self): |
def testScheduleSimple(self): |
t1 = TestListener() |
t1 = TestListener(); t1.name='t1' |
t2 = TestListener() |
t2 = TestListener(); t2.name='t2' |
self.assertEqual(self.ctrl.layers, []) |
self.assertEqual(self.ctrl.layers, []) |
self.assertEqual(self.ctrl.queues, {}) |
self.assertEqual(self.ctrl.queues, {}) |
self.ctrl.schedule(t1) |
self.ctrl.schedule(t1) |
self.failUnless(isinstance(stm.ctrl, stm.Controller)) |
self.failUnless(isinstance(stm.ctrl, stm.Controller)) |
self.failUnless(isinstance(stm.ctrl, stm.threading.local)) |
self.failUnless(isinstance(stm.ctrl, stm.threading.local)) |
|
|
|
|
def testHeapingCancel(self): |
def testHeapingCancel(self): |
# verify that cancelling the last listener of a layer keeps |
# verify that cancelling the last listener of a layer keeps |
# the 'layers' list in heap order |
# the 'layers' list in heap order |
self.runAs(self.t1, rule1) |
self.runAs(self.t1, rule1) |
self.runAs(self.t2, rule2) |
self.runAs(self.t2, rule2) |
try: |
try: |
self.runAs(self.t0, rule0) |
self.ctrl._retry() |
except stm.CircularityError, e: |
except stm.CircularityError, e: |
self.assertEqual(e.args[0], |
self.assertEqual(e.args[0], |
{self.t0: set([self.t1]), self.t1: set([self.t2]), |
{self.t0: set([self.t1]), self.t1: set([self.t2]), |
self.t2: set([self.t0])}) |
self.t2: set([self.t0, self.t1])}) |
else: |
else: |
raise AssertionError("Should've caught a cycle") |
raise AssertionError("Should've caught a cycle") |
|
|
self.runAs(self.t0, rule) |
self.runAs(self.t0, rule) |
self.runAs(self.t1, rule) |
self.runAs(self.t1, rule) |
self.runAs(self.t2, rule) |
self.runAs(self.t2, rule) |
self.ctrl._retry(self.t1) |
self.ctrl.to_retry[self.t1]=1 |
|
self.ctrl._retry(); self.ctrl.to_retry.clear() |
self.assertEqual(self.ctrl.last_listener, self.t0) |
self.assertEqual(self.ctrl.last_listener, self.t0) |
self.assertEqual(self.ctrl.last_save, sp) |
self.assertEqual(self.ctrl.last_save, sp) |
self.ctrl._retry(self.t0) |
self.ctrl.to_retry[self.t0]=1 |
|
self.ctrl._retry(); self.ctrl.to_retry.clear() |
self.assertEqual(self.ctrl.last_save, None) |
self.assertEqual(self.ctrl.last_save, None) |
|
|
d(a) |
d(a) |
def rule1(): |
def rule1(): |
pass |
pass |
def rule2(): |
def rule2(): |
raise AssertionError("I should not be run") |
pass #raise AssertionError("I should not be run") |
self.runAs(self.t2, rule1) |
self.runAs(self.t2, rule1) |
self.runAs(self.t0, rule0) |
self.runAs(self.t0, rule0) |
self.runAs(self.t1, rule2) |
self.ctrl.schedule(self.t1) |
|
self.assertEqual(self.ctrl.to_retry, {self.t0:1}) |
|
self.ctrl._retry() |
self.assertEqual(self.ctrl.last_listener, self.t2) |
self.assertEqual(self.ctrl.last_listener, self.t2) |
self.assertEqual(self.ctrl.queues, {0: {self.t0:1}}) |
self.assertEqual(self.ctrl.queues, {}) |
self.ctrl.cancel(self.t0) |
|
|
|
def testRunScheduled(self): |
def testRunScheduled(self): |
log = [] |
log = [] |
self.assertEqual(log, [True]) |
self.assertEqual(log, [True]) |
|
|
|
|
|
|
|
|
|
|
def testRollbackReschedules(self): |
def testRollbackReschedules(self): |
sp = [] |
sp = [] |
def rule0(): |
def rule0(): |
self.assertEqual(self.ctrl.reads, {self.s1:1}) |
self.assertEqual(self.ctrl.reads, {self.s1:1}) |
self.assertEqual(self.ctrl.writes, {self.s2:1}) |
self.assertEqual(self.ctrl.writes, {self.s2:1}) |
self.runAs(self.t2, rule2) |
self.runAs(self.t2, rule2) |
self.assertEqual(self.ctrl.last_listener, self.t2) |
self.assertEqual(self.ctrl.last_listener, self.t1) |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.assertEqual(self.ctrl.current_listener, self.t1) |
self.assertEqual(self.ctrl.reads, {self.s1:1}) |
self.assertEqual(self.ctrl.reads, {self.s1:1}) |
self.assertEqual(self.ctrl.writes, {self.s2:1, s3:1}) |
self.assertEqual(self.ctrl.writes, {self.s2:1, s3:1}) |
|
|
def rule2(): |
def rule2(): |
self.assertEqual(self.ctrl.last_listener, self.t2) |
self.assertEqual(self.ctrl.last_listener, self.t1) |
self.assertEqual(self.ctrl.current_listener, self.t2) |
self.assertEqual(self.ctrl.current_listener, self.t2) |
self.assertEqual(self.ctrl.reads, {}) |
self.assertEqual(self.ctrl.reads, {}) |
self.assertEqual(self.ctrl.writes, {self.s2:1}) |
self.assertEqual(self.ctrl.writes, {self.s2:1}) |
self.assertEqual(v._set_by, None) |
self.assertEqual(v._set_by, None) |
def go(): |
def go(): |
v.value = 99 |
v.value = 99 |
t = TestListener() |
t = TestListener(); t.name = 't' |
t.run = go |
t.run = go |
self.assertRaises(stm.InputConflict, self.ctrl.run, t) |
self.assertRaises(stm.InputConflict, self.ctrl.run, t) |
self.assertEqual(v.value, 43) |
self.assertEqual(v.value, 43) |
def go(): |
def go(): |
v.value = 43 |
v.value = 43 |
t = TestListener() |
t = TestListener(); t.name = 't' |
t.run = go |
t.run = go |
self.ctrl.run(t) |
self.ctrl.run(t) |
self.assertEqual(v.value, 43) |
self.assertEqual(v.value, 43) |