View of /PEAK/src/peak/events/event_threads.py
Parent Directory
| Revision Log
Revision:
1719 -
(
download)
(
as text)
Thu Mar 18 22:07:28 2004 UTC (20 years, 1 month ago) by
pje
File size: 12276 byte(s)
The 'events.IEventSource' interface now returns a 'canceller' function
from the 'addCallback()' method, allowing you to cancel a previously-
scheduled callback. This fixes a memory leak and performance problem with
'events.AnyOf()', which previously could accumulate unneeded callbacks on
the sources it was monitoring. Note that if you have developed any custom
event sources with 'addCallback()' methods, you must make sure that they
return a canceller from now on.
from __future__ import generators
from peak.core import protocols, adapt, NOT_GIVEN
from interfaces import *
from types import GeneratorType
from sys import exc_info, _getframe
import traceback
from sources import Condition, Value, AnyOf, Observable
import time
from peak.util.advice import advice
__all__ = [
'resume', 'taskFactory', 'Scheduler', 'Task', 'TaskState', 'Interrupt',
]
def resume():
"""Call this after every 'yield' in a task
This function returns the event that caused the task to resume, or
reraises any exceptions thrown by a nested generator in the task. It
should be called after every 'yield' statement in a task, if the
statement yields a generator or an 'ITaskSwitch' or 'IEventSource'.
(If the statement simply yields a value to its calling generator, it need
not be followed by an 'events.resume()' call.)
Note that you should still call this function even if you don't need its
return value. Otherwise, errors from other generators will be silently
lost."""
state = _getframe(2).f_locals['state']
if state.handlingError:
try:
t,v,tb = exc_info()
raise t,v,tb
finally:
t=v=tb=None
# No exception, try to pass event data back into the frame
return state.lastEvent
class taskFactory(advice):
"""Wrap a generator function to return a new 'Task' each time it's called
Usage::
def someMethod(self, whatever):
yield whatever; events.resume()
# ...
someMethod = events.taskFactory(someMethod)
Each time it's called, 'ob.someMethod(whatever)' will return a new
'events.Task' that executes the code of the 'someMethod' function on the
supplied parameter(s). Note that this may also be used in conjunction with
'binding.Make', e.g.::
def aTask(self):
yield self.something; events.resume()
# ...
aTask = binding.Make( events.taskFactory(aTask) )
In this case, 'ob.aTask' will be an 'events.Task' that runs the
'aTask' method. This is often convenient to use with 'uponAssembly=True',
so that the task is started as soon as the component is assembled."""
def __call__(self,*__args,**__kw):
return Task(self._func(*__args,**__kw))
class Interrupt(object):
"""Interrupt a task with an error if specified event occurs
Usage::
try:
yield events.Interrupt( stream.readline(), scheduler.timeout(5) )
line = events.resume()
except events.Interruption:
print "readline() took more than 5 seconds"
An 'Interrupt' object is an 'events.ITaskSwitch', so you can only use it
within a task, and you cannot set callbacks on it. If the supplied
generator/iterator exits for any reason, the interruption is cancelled.
Also note that because generator objects are not reusable, neither are
'Interrupt' objects. You must create an 'Interrupt' for each desired
invocation of the applicable generator. However, the called generator
need not create an additional 'Interrupt' for any nested generator calls,
even though multiple interrupts may be active at the same time. This
allows you to do things like e.g. set one timeout for each line of data
being received, and another timeout for receiving an entire email.
"""
__slots__ = 'iterator','source','errorType'
protocols.advise(
instancesProvide=[ITaskSwitch],
)
def __init__(self,iterator,eventSource,errorType=Interruption):
"""'Interrupt(iterator,eventSource,errorType=Interruption)'
Wrap execution of 'iterator' so that it will raise 'errorType' if
'eventSource' fires before 'iterator' exits (or aborts), assuming that
the 'Interrupt' is yielded to a task."""
self.iterator = adapt(iterator,IProcedure)
self.source = adapt(eventSource,IEventSource)
self.errorType = errorType
def nextAction(self,task=None,state=None):
if state is not None:
cancelled = Condition(False)
def canceller():
yield self.iterator; cancelled.set(True); yield resume()
def interrupt(source,event):
if not cancelled():
state.CALL(doInterrupt()); task.step(source,event)
def doInterrupt():
raise self.errorType(resume())
yield None
state.CALL(canceller()); self.source.addCallback(interrupt)
return True
class ProcedureAsTaskSwitch(protocols.Adapter):
protocols.advise(
instancesProvide=[ITaskSwitch],
asAdapterForProtocols=[IProcedure]
)
def nextAction(self,task=None,state=None):
if state is not None:
state.CALL(self.subject)
return True
protocols.declareImplementation(GeneratorType, [IProcedure])
class Scheduler(object):
"""Time-based event sources; see 'events.IScheduler' for interface"""
protocols.advise( instancesProvide=[IScheduler] )
def __init__(self, time = time.time):
self.now = time
self._appointments = []
self.isEmpty = Condition(True)
def time_available(self):
if self._appointments:
return max(0, self._appointments[0][0] - self.now())
def tick(self,stop=None):
now = self.now()
while self._appointments and self._appointments[0][0] <= now and not stop:
self._appointments.pop(0)[1](self,now)
self.isEmpty.set(not self._appointments)
def sleep(self, secs=0):
return _Sleeper(self,secs)
def until(self,time):
c = Condition()
if time <= self.now():
c.set(True)
else:
self._callAt(lambda s,e: c.set(True), time)
return c
def timeout(self,secs):
return self.until(self.now()+secs)
def spawn(self, iterator):
return _STask(iterator, self)
def alarm(self, iterator, timeout, errorType=TimeoutError):
return Interrupt(iterator, self.timeout(timeout), errorType)
def _callAt(self, what, when):
self.isEmpty.set(False)
appts = self._appointments
item = (when,what)
lo = 0
hi = len(appts)
while lo < hi:
mid = (lo+hi)//2
if when < appts[mid][0]:
hi = mid
else:
lo = mid+1
appts.insert(lo, item)
return lambda: (item in appts) and appts.remove(item)
class _Sleeper(object):
protocols.advise(
instancesProvide=[IEventSource]
)
def __init__(self,scheduler,delay):
self.scheduler = scheduler
self.delay = delay
def nextAction(self,task=None,state=None):
if task is not None:
self.addCallback(task.step)
def addCallback(self,func):
return self.scheduler._callAt(
lambda s,e: func(self,e), self.scheduler.now() + self.delay
)
class TaskState(object):
"""Tracks the state of a task; see 'events.ITaskState' for details"""
__slots__ = 'lastEvent', 'handlingError', 'stack'
protocols.advise(
instancesProvide=[ITaskState]
)
def __init__(self):
self.lastEvent = NOT_GIVEN
self.handlingError = False
self.stack = []
def CALL(self,iterator):
self.stack.append( adapt(iterator,IProcedure) )
def YIELD(self,value):
self.lastEvent = value
def RETURN(self):
self.stack.pop()
def THROW(self):
self.handlingError = True
self.lastEvent = NOT_GIVEN
def CATCH(self):
self.handlingError = False
class Task(Observable):
"""Thread-like "task" that pauses and resumes in response to events
Usage::
def aGenerator(someArg):
yield untilSomeCondition; events.resume()
# ... do something ...
events.Task(aGenerator(someValue)) # create a task
When created, tasks run until the first 'yield' operation yielding an
'ITaskSwitch' results in the task being suspended. The task will
then be resumed when the waited-on event fires its callbacks, and so on.
Tasks offer an 'isFinished' 'ICondition' that can be waited on, or checked
to find out whether the task has successfully completed. See
'events.ITask' for more details."""
__slots__ = 'isFinished', '_state'
protocols.advise(
instancesProvide=[ITask],
asAdapterForProtocols=[IProcedure],
factoryMethod='fromProcedure',
)
singleFire = False # Broadcast results to listeners
overrunOK = False # Results are not bufferable
def __init__(self, iterator):
Observable.__init__(self)
self.isFinished = Condition()
self._state = TaskState()
self._state.CALL(iterator); self._start()
def fromProcedure(klass,ob,proto):
return klass(ob)
fromProcedure = classmethod(fromProcedure)
def _start(self):
"""Hook for subclasses to start the task"""
self.step()
def _uncaughtError(self):
"""Hook for subclasses to catch exceptions that escape the task"""
try:
raise
except SystemExit,v:
self.isFinished.set(True)
return True
def step(self, source=None, event=NOT_GIVEN):
"""See 'events.ITask.step'"""
state = self._state
state.YIELD(event)
while state.stack:
switch = event = None # avoid holding previous references
try:
for event in state.stack[-1]:
state.CATCH()
break
else:
state.RETURN() # No switch; topmost iterator is finished!
state.YIELD(NOT_GIVEN) # don't return a value
state.CATCH()
continue # resume the next iterator down, or exit
except:
# Remove the current iterator, since it can't be resumed
state.RETURN()
state.THROW()
if state.stack:
continue # delegate the error to the next iterator
else:
# nobody to delegate to, pass the buck to our caller
return self._uncaughtError()
# Perform the task switch, if any
try:
switch = adapt(event,ITaskSwitch,None)
if switch is not None:
if not switch.nextAction(self,state):
state.YIELD(switch) # save reference until we resume
return True
else:
state.YIELD(event)
if len(state.stack)==1:
self._fire(event)
else:
state.RETURN() # pass the yielded value back to caller
except:
state.THROW()
continue # push the error back into the current iterator
self.isFinished.set(True)
return True
class _STask(Task):
"""'events.Task' that handles errors better, by relying on a scheduler"""
__slots__ = 'scheduler','aborted'
protocols.advise( instancesProvide=[IScheduledTask] )
def __init__(self, iterator, scheduler):
self.scheduler = scheduler
self.aborted = Condition()
Task.__init__(self,iterator)
def _start(self):
super(_STask,self).step()
def _uncaughtError(self):
condition = self.aborted
try:
try:
raise
except SystemExit,v:
condition = self.isFinished
return True
finally:
self.scheduler._callAt(
lambda e,s: condition.set(True), self.scheduler.now()
)
def step(self, source=None, event=NOT_GIVEN):
"""See 'events.IScheduledTask.step'"""
self.scheduler._callAt(
lambda e,s: super(_STask,self).step(source,event),
self.scheduler.now()
)