[Subversion] / PEAK / src / peak / events / event_threads.py  

View of /PEAK/src/peak/events/event_threads.py

Parent Directory | Revision Log
Revision: 1606 - (download) (as text)
Mon Jan 19 03:35:36 2004 UTC (20 years, 3 months ago) by pje
File size: 8881 byte(s)
Lots of 'peak.events' migration:

Added 'events.ISignalSource', that returns 'events.Broadcaster' objects for
signals.  This allows you to yield to signals in an 'events.Thread', or
safely set one-time callbacks on them.

'running.ISignalManager' is now DEPRECATED; please use
'events.ISignalSource' instead.

'running.IMainLoop' has been changed to use an 'events.IReadable' for
the 'lastActivity' attribute, and the 'setExitCode' and 'childForked'
methods have been replaced with an 'exitWith()' method.

The 'peak.running.mainLoop.signalHandler' property has been replaced with
'peak.running.mainLoop.stopOnSignals', which defaults to including SIGINT,
SIGTERM, and SIGBREAK.  If you need custom signal handling, please use
the event sources provided by an 'events.ISignalSource'.

'peak.running.process.ChildProcess' has been rewritten to use
'events.ISignalSource' and an 'events.Thread' to monitor SIGCHLD.  Removed
'checkStatus()' from the 'running.IProcessProxy' interface.

Made most event sources weak-referenceable.

Changed 'events.Thread' to keep a reference to an object it's yielding on,
so that "weak" events like signals and I/O events will hang around until
they call back to the thread.
from peak.core import protocols, adapt, NOT_GIVEN
from interfaces import *
from types import GeneratorType
from sys import exc_info, _getframe
import traceback
from sources import Condition, Value
import time
from peak.util.advice import advice

__all__ = [
    'resume', 'threaded', 'Scheduler', 'Thread', 'ThreadState',
]


def resume():
    """Call this after every task/task switch 'yield' in a thread

    This function returns the event that caused the thread to resume, or
    reraises any exceptions thrown by a nested generator in the thread.  It
    should be called after every 'yield' statement in a thread, if the
    statement yields a generator or an 'ITaskSwitch' or 'IEventSource'.
    (If the statement simply yields a value to its calling generator, it need
    not be followed by an 'events.resume()' call.)

    Note that you should still call this function even if you don't need its
    return value.  Otherwise, errors from other generators will be silently
    lost."""

    state = _getframe(2).f_locals['state']

    if state.handlingError:
        try:
            t,v,tb = exc_info()
            raise t,v,tb
        finally:
            t=v=tb=None

    # No exception, try to pass event data back into the frame
    return state.lastEvent


class threaded(advice):

    """Wrap a generator function to return a 'Thread'

    Usage::

        def someMethod(self, whatever):
            yield whatever; events.resume()
            # ...

        someMethod = events.threaded(someMethod)

    When called, 'ob.someMethod(whatever)' will return an 'events.Thread' that
    executes 'someMethod'.  Note that this may also be used in conjunction with
    'binding.Make', e.g.::

        def aThread(self):
            yield self.something; events.resume()
            # ...

        aThread = binding.Make( events.threaded(aThread) )

    In this case, 'ob.aThread' will be an 'events.Thread' that runs the
    'aThread' method.  This is often convenient to use with 'uponAssembly=True',
    so that the thread is started as soon as the component is assembled."""

    def __call__(self,*__args,**__kw):
        return Thread(self._func(*__args,**__kw))













class TaskAsTaskSwitch(protocols.Adapter):

    protocols.advise(
        instancesProvide=[ITaskSwitch],
        asAdapterForProtocols=[ITask]
    )

    def nextAction(self,thread=None,state=None):
        if state is not None:
            state.CALL(self.subject)
        return True

protocols.declareImplementation(GeneratorType, [ITask])




























class Scheduler(object):

    """Time-based event sources; see 'events.IScheduler' for interface"""

    protocols.advise( instancesProvide=[IScheduler] )

    def __init__(self, time = time.time):
        self.now = time
        self._appointments = []


    def time_available(self):
        if self._appointments:
            return max(0, self._appointments[0][0] - self.now())


    def tick(self):
        now = self.now()
        while self._appointments and self._appointments[0][0] <= now:
            self._appointments.pop(0)[1](self,now)


    def sleep(self, secs=0):
        return _Sleeper(self,secs)

    def until(self,time):
        c = Condition()
        if time <= self.now():
            c.set(True)
        else:
            self._callAt(lambda s,e: c.set(True), time)
        return c

    def timeout(self,secs):
        return self.until(self.now()+secs)


    def spawn(self, iterator):
        return _SThread(iterator, self)


    def _callAt(self, what, when):

        appts = self._appointments
        lo = 0
        hi = len(appts)

        while lo < hi:
            mid = (lo+hi)//2
            if when < appts[mid][0]:
                hi = mid
            else:
                lo = mid+1

        appts.insert(lo, (when,what))


class _Sleeper(object):

    protocols.advise(
        instancesProvide=[IEventSource]
    )

    def __init__(self,scheduler,delay):
        self.scheduler = scheduler
        self.delay = delay

    def nextAction(self,thread=None,state=None):
        if thread is not None:
            self.addCallback(thread.step)

    def addCallback(self,func):
        self.scheduler._callAt(
            lambda s,e: func(self,e), self.scheduler.now() + self.delay
        )







class ThreadState(object):

    """Tracks the state of a thread; see 'events.IThreadState' for details"""

    __slots__ = 'lastEvent', 'handlingError', 'stack'

    protocols.advise(
        instancesProvide=[IThreadState]
    )

    def __init__(self):
        self.lastEvent = NOT_GIVEN
        self.handlingError = False
        self.stack = []

    def CALL(self,iterator):
        self.stack.append( adapt(iterator,ITask) )

    def YIELD(self,value):
        self.lastEvent = value

    def RETURN(self):
        self.stack.pop()

    def THROW(self):
        self.handlingError = True
        self.lastEvent = NOT_GIVEN

    def CATCH(self):
        self.handlingError = False











class Thread(object):
    """Lightweight "thread" that responds to events, using generator/iterators

    Usage::

        def aGenerator(someArg):
            yield untilSomeCondition; events.resume()
            # ... do something ...

        events.Thread(aGenerator(someValue))    # create the thread

    When created, threads run until the first 'yield' operation yielding an
    'ITaskSwitch' results in the thread being suspended.  The thread will
    then be resumed when the waited-on event fires its callbacks, and so on.

    Threads offer an 'isFinished' 'ICondition' that can be waited on, or checked
    to find out whether the thread has successfully completed.  See
    'events.IThread' for more details."""

    __slots__ = 'isFinished', '_state'

    protocols.advise( instancesProvide=[IThread] )

    def __init__(self, iterator):
        self.isFinished = Condition()
        self._state = ThreadState()
        self._state.CALL(iterator)
        self._start()

    def _start(self):
        """Hook for subclasses to start the thread"""
        self.step()

    def _uncaughtError(self):
        """Hook for subclasses to catch exceptions that escape the thread"""
        try:
            raise
        except SystemExit,v:
            self.isFinished.set(True)
            return True

    def step(self, source=None, event=NOT_GIVEN):
        """See 'events.IThread.step'"""
        state = self._state; state.YIELD(event)

        while state.stack:
            switch = event = None   # avoid holding previous references
            try:
                for event in state.stack[-1]:
                    state.CATCH()
                    break
                else:
                    state.RETURN() # No switch; topmost iterator is finished!
                    state.CATCH()
                    continue    # resume the next iterator down, or exit
            except:
                # Remove the current iterator, since it can't be resumed
                state.RETURN()
                state.THROW()

                if state.stack:
                    continue    # delegate the error to the next iterator
                else:
                    # nobody to delegate to, pass the buck to our caller
                    return self._uncaughtError()

            # Perform the task switch, if any
            try:
                switch = adapt(event,ITaskSwitch,None)
                if switch is not None:
                    if not switch.nextAction(self,state):
                        state.YIELD(switch) # save reference until we resume
                        return True
                else:
                    state.YIELD(event)
            except:
                state.THROW()
                continue    # push the error back into the current iterator

        self.isFinished.set(True)
        return True

class _SThread(Thread):

    """'events.Thread' that handles errors better, by relying on a scheduler"""

    __slots__ = 'scheduler','aborted'

    protocols.advise( instancesProvide=[IScheduledThread] )

    def __init__(self, iterator, scheduler):
        self.scheduler = scheduler
        self.aborted = Condition()
        Thread.__init__(self,iterator)


    def _start(self):
        super(_SThread,self).step()


    def _uncaughtError(self):
        condition = self.aborted
        try:
            try:
                raise
            except SystemExit,v:
                condition = self.isFinished
                return True
        finally:
            self.scheduler._callAt(
                lambda e,s: condition.set(True), self.scheduler.now()
            )


    def step(self, source=None, event=NOT_GIVEN):
        """See 'events.IScheduledThread.step'"""
        self.scheduler._callAt(
            lambda e,s: super(_SThread,self).step(source,event),
            self.scheduler.now()
        )




cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help