View of /Trellis/peak/events/stm.py
Parent Directory
| Revision Log
Revision:
2436 -
(
download)
(
as text)
Fri Nov 30 22:46:43 2007 UTC (16 years, 5 months ago) by
pje
File size: 15334 byte(s)
The new algorithm lands - version bump to 0.6a1dev.
Too many changes to list!
"""Software Transactional Memory and Observers"""
import weakref, sys, heapq, UserList, UserDict, sets
from peak.util.extremes import Max
from peak.util import decorators
try:
import threading
except ImportError:
import dummy_threading as threading
__all__ = [
'STMHistory', 'AbstractSubject', 'Link', 'AbstractListener', 'Controller',
'CircularityError', 'LocalController',
]
class CircularityError(Exception):
"""Rules arranged in an infinite loop"""
class AbstractSubject(object):
"""Abstract base for objects that can be linked via ``Link`` objects"""
__slots__ = ()
manager = None
layer = 0
def __init__(self):
self.next_listener = None
def iter_listeners(self):
"""Yield the listeners of this subject"""
link = self.next_listener
while link is not None:
nxt = link.next_listener # avoid unlinks breaking iteration
ob = link()
if ob is not None:
yield ob
link = nxt
class AbstractListener(object):
"""Abstract base for objects that can be linked via ``Link`` objects"""
__slots__ = ()
layer = 0
def __init__(self):
self.next_subject = None
def iter_subjects(self):
"""Yield the listeners of this subject"""
link = self.next_subject
while link is not None:
nxt = link.next_subject # avoid unlinks breaking iteration
if link.subject is not None:
yield link.subject
link = nxt
def dirty(self):
"""Mark the listener dirty and query whether it should be scheduled
If a true value is returned, the listener should be scheduled. Note
that this method is allowed to have side-effects, but must be
idempotent.
"""
return True
def run(self):
"""Take whatever action the listener is supposed to take"""
raise NotImplementedError
class Link(weakref.ref):
"""Dependency link"""
__slots__ = [
'subject','next_subject','prev_subject','next_listener','prev_listener'
]
def __new__(cls, subject, listener):
self = weakref.ref.__new__(Link, listener, _unlink_fn)
self.subject = self.prev_listener = subject
self.prev_subject = None # listener link is via weak ref
nxt = self.next_subject = listener.next_subject
if nxt is not None:
nxt.prev_subject = self
nxt = self.next_listener = subject.next_listener
if nxt is not None:
nxt.prev_listener = self
listener.next_subject = self
subject.next_listener = self
return self
def unlink(self):
"""Deactivate the link and remove it from its lists"""
nxt = self.next_listener
prev = self.prev_listener
if nxt is not None:
nxt.prev_listener = prev
if prev is not None and prev.next_listener is self:
prev.next_listener = nxt
prev = self.prev_subject
nxt = self.next_subject
if nxt is not None:
nxt.prev_subject = prev
if prev is None:
prev = self() # get head of list
if prev is not None and prev.next_subject is self:
prev.next_subject = nxt
self.subject = self.next_subject = self.prev_subject = None
self.next_listener = self.prev_listener = None
_unlink_fn = Link.unlink
class STMHistory(object):
"""Simple STM implementation using undo logging and context managers"""
active = in_cleanup = undoing = False
def __init__(self):
self.undo = [] # [(func,args), ...]
self.at_commit =[] # [(func,args), ...]
self.managers = {} # [mgr]->seq # (context managers to __exit__ with)
def atomically(self, func=lambda:None, *args, **kw):
"""Invoke ``func(*args,**kw)`` atomically"""
if self.active:
return func(*args, **kw)
self.active = True
try:
try:
retval = func(*args, **kw)
self.cleanup()
return retval
except:
self.cleanup(*sys.exc_info())
finally:
self.active = False
def manage(self, mgr):
assert self.active, "Can't manage without active history"
if mgr not in self.managers:
mgr.__enter__()
self.managers[mgr] = len(self.managers)
def on_undo(self, func, *args):
"""Call `func(*args)` if atomic operation is undone"""
assert self.active, "Can't record undo without active history"
if not self.undoing: self.undo.append((func, args))
def savepoint(self):
"""Get a savepoint suitable for calling ``rollback_to()``"""
return len(self.undo)
def rollback_to(self, sp=0):
"""Rollback to the specified savepoint"""
assert self.active, "Can't rollback without active history"
undo = self.undo; self.undoing = True
try:
while len(undo) > sp: f, a = undo.pop(); f(*a)
finally: self.undoing = False
def cleanup(self, typ=None, val=None, tb=None):
# Exit the processing loop, unwinding managers
assert self.active, "Can't exit when inactive"
assert not self.in_cleanup, "Can't invoke cleanup while in cleanup"
self.in_cleanup = True
if typ is None:
try:
for (f,a) in self.at_commit: f(*a)
except:
typ, val, tb = sys.exc_info()
if typ is not None:
try:
self.rollback_to(0)
except:
typ, val, tb = sys.exc_info()
managers = [(posn,mgr) for (mgr, posn) in self.managers.items()]
managers.sort()
self.managers.clear()
try:
while managers:
try:
managers.pop()[1].__exit__(typ, val, tb)
except:
typ, val, tb = sys.exc_info()
if typ is not None:
raise typ, val, tb
finally:
del self.at_commit[:], self.undo[:]
self.in_cleanup = False
typ = val = tb = None
def change_attr(self, ob, attr, val):
"""Set `ob.attr` to `val`, w/undo log to restore the previous value"""
self.on_undo(setattr, ob, attr, getattr(ob, attr))
setattr(ob, attr, val)
def on_commit(self, func, *args):
"""Call `func(*args)` if atomic operation is committed"""
assert self.active, "Not in an atomic operation"
self.at_commit.append((func, args))
self.undo.append((self.at_commit.pop,()))
class Controller(STMHistory):
"""STM History with support for subjects, listeners, and queueing"""
last_listener = current_listener = last_notified = last_save = None
readonly = False
def __init__(self):
super(Controller, self).__init__()
self.reads = {}
self.writes = {}
self.has_run = {} # listeners that have run
self.layers = [] # heap of layer numbers
self.queues = {} # [layer] -> dict of listeners to be run
self.to_retry = {}
from peak.events.trellis import Value; self.pulse = Value(0)
def cleanup(self, *args):
try:
self.has_run.clear()
return super(Controller, self).cleanup(*args)
finally:
self.current_listener = self.last_listener = None
self.last_notified = self.last_save = None
def _retry(self):
try: # undo back through listener, watching to detect cycles
todo = self.to_retry.copy(); destinations = set(todo)
routes = {} # tree of rules that (re)triggered the original listener
while todo:
this = self.last_listener
if self.last_notified:
via = destinations.intersection(self.last_notified)
if via:
routes[this] = via; destinations.add(this)
self.rollback_to(self.last_save)
if this in todo: del todo[this]
for item in self.to_retry:
if item in routes: raise CircularityError(routes)
else:
map(self.schedule, self.to_retry)
finally:
self.to_retry.clear()
def run_rule(self, listener, initialized=True):
"""Run the specified listener"""
old = self.current_listener
self.current_listener = listener
if listener.layer is Max:
self.readonly = True
try:
assert listener not in self.has_run,"Re-run of rule without retry"
if old is not None:
assert not initialized,"Only un-initialized rules can be nested"
old_reads, self.reads = self.reads, {}
try:
listener.run()
self._process_reads(listener, initialized)
finally:
self.reads = old_reads
else:
if initialized:
self.change_attr(self, 'last_save', self.savepoint())
self.change_attr(self, 'last_listener', listener)
self.has_run[listener] = listener
self.on_undo(self.has_run.pop, listener, None)
try:
listener.run()
self._process_writes(listener)
self._process_reads(listener, initialized)
except:
self.reads.clear()
self.writes.clear()
raise
finally:
self.current_listener = old
self.readonly = False
def _process_writes(self, listener):
#
# Remove changed items from self.writes and notify their listeners,
# keeping a record in self.last_notified so that we can figure out
# later what caused a cyclic dependency (if one happens).
#
notified = {}
self.change_attr(self, 'last_notified', notified)
writes = self.writes
layer = listener.layer
has_run = self.has_run
while writes:
subject, writer = writes.popitem()
for dependent in subject.iter_listeners():
if has_run.get(dependent) is not listener:
if dependent.dirty():
self.schedule(dependent, layer, writer)
notified[dependent] = 1
def _process_reads(self, listener, undo=True):
#
# Remove subjects from self.reads and link them to `listener`
# (Old subjects of the listener are deleted, and self.reads is cleared
#
subjects = self.reads
link = listener.next_subject
while link is not None:
nxt = link.next_subject # avoid unlinks breaking iteration
if link.subject in subjects:
del subjects[link.subject]
else:
if undo: self.undo.append((Link, (link.subject, listener)))
link.unlink()
link = nxt
while subjects:
link = Link(subjects.popitem()[0], listener)
if undo: self.undo.append((link.unlink, ()))
def schedule(self, listener, source_layer=None, writer=None):
"""Schedule `listener` to run during an atomic operation
If an operation is already in progress, it's immediately scheduled, and
its scheduling is logged in the undo queue (unless it was already
scheduled).
If `source_layer` is specified, ensure that the listener belongs to
a higher layer than the source, moving the listener from an existing
queue layer if necessary. (This layer elevation is intentionally
NOT undo-logged, however.)
"""
new = old = listener.layer
get = self.queues.get
assert not self.readonly or old is Max, \
"Shouldn't be scheduling a non-Observer during commit"
if source_layer is not None and source_layer >= listener.layer:
new = source_layer + 1
if listener in self.has_run:
self.to_retry[self.has_run[listener]]=1
q = get(old)
if q and listener in q:
if new is not old:
self.cancel(listener)
elif self.active and not self.undoing:
self.on_undo(self.cancel, listener)
if new is not old:
listener.layer = new
q = get(new)
if q is None:
q = self.queues[new] = {listener:1}
heapq.heappush(self.layers, new)
else:
q[listener] = 1
def cancel(self, listener):
"""Prevent the listener from being recalculated, if applicable"""
q = self.queues.get(listener.layer)
if q and listener in q:
del q[listener]
if not q:
del self.queues[listener.layer]
self.layers.remove(listener.layer)
self.layers.sort() # preserve heap order
def atomically(self, func=lambda:None, *args, **kw):
"""Invoke ``func(*args,**kw)`` atomically"""
if self.active:
return func(*args, **kw)
return super(Controller,self).atomically(self._process, func, args, kw)
def _process(self, func, args, kw):
try:
retval = func(*args, **kw)
layers = self.layers
queues = self.queues
while layers or self.at_commit:
self.pulse.value += 1
while layers:
if self.to_retry:
self._retry()
q = queues[layers[0]]
if q:
listener = q.popitem()[0]
self.on_undo(self.schedule, listener)
self.run_rule(listener)
else:
del queues[layers[0]]
heapq.heappop(layers)
self.cleanup()
return retval
except:
del self.layers[:]
self.queues.clear()
raise
def lock(self, subject):
assert self.active, "Subjects must be accessed atomically"
manager = subject.manager
if manager is not None and manager not in self.managers:
self.manage(manager)
def used(self, subject):
self.lock(subject)
cl = self.current_listener
if cl is not None and subject not in self.reads:
self.reads[subject] = 1
if cl is not subject and subject.layer >= cl.layer:
cl.layer = subject.layer + 1
def changed(self, subject):
self.lock(subject)
cl = self.current_listener
if cl is not None:
self.writes[subject] = cl
else:
for listener in subject.iter_listeners():
if listener.dirty():
self.schedule(listener)
if self.readonly:
raise RuntimeError("Can't change objects during commit phase")
class LocalController(Controller, threading.local):
"""Thread-local Controller"""