[Subversion] / PEAK / src / peak / events / sources.py  

View of /PEAK/src/peak/events/sources.py

Parent Directory | Revision Log
Revision: 1606 - (download) (as text)
Mon Jan 19 03:35:36 2004 UTC (20 years, 3 months ago) by pje
File size: 12299 byte(s)
Lots of 'peak.events' migration:

Added 'events.ISignalSource', that returns 'events.Broadcaster' objects for
signals.  This allows you to yield to signals in an 'events.Thread', or
safely set one-time callbacks on them.

'running.ISignalManager' is now DEPRECATED; please use
'events.ISignalSource' instead.

'running.IMainLoop' has been changed to use an 'events.IReadable' for
the 'lastActivity' attribute, and the 'setExitCode' and 'childForked'
methods have been replaced with an 'exitWith()' method.

The 'peak.running.mainLoop.signalHandler' property has been replaced with
'peak.running.mainLoop.stopOnSignals', which defaults to including SIGINT,
SIGTERM, and SIGBREAK.  If you need custom signal handling, please use
the event sources provided by an 'events.ISignalSource'.

'peak.running.process.ChildProcess' has been rewritten to use
'events.ISignalSource' and an 'events.Thread' to monitor SIGCHLD.  Removed
'checkStatus()' from the 'running.IProcessProxy' interface.

Made most event sources weak-referenceable.

Changed 'events.Thread' to keep a reference to an object it's yielding on,
so that "weak" events like signals and I/O events will hang around until
they call back to the thread.
from peak.core import protocols, adapt, NOT_GIVEN
from interfaces import *
from peak.util.EigenData import AlreadyRead


__all__ = [
    'AnyOf', 'Distributor', 'Value', 'Condition', 'Semaphore',
    'DerivedValue', 'DerivedCondition', 'Observable', 'Readable', 'Writable',
    'Conditional', 'Broadcaster',
]































class AnyOf(object):

    """Union of multiple event sources

    Example usage::

        timedOut = scheduler.timeout(30)
        untilSomethingHappens = events.AnyOf(stream.dataRcvd, timedOut)

        while not timedOut():
            yield untilSomethingHappens; src,evt = events.resume()
            if src is stream.dataRcvd:
                data = event
                print data

    'AnyOf' fires callbacks whenever any of its actual event sources fire (and
    allows threads to continue if any of its actual event sources allow it).
    The 'event' it supplies to its user is actually a '(source,event)' tuple
    as shown above, so you can distinguish which of the actual event sources
    fired, as well as receive the event from it.

    Note that callbacks registered with an 'AnyOf' instance will fire at most
    once, even if more than one of the original event sources fires.  Thus,
    you should not assume in a callback or thread that the event you received
    is the only one that has occurred.  This is especially true in scheduled
    threads, where many things may happen between the triggering of an event
    and the resumption of a thread that was waiting for the event.
    """

    __slots__ = '_sources'

    protocols.advise(
        instancesProvide=[IEventSource]
    )







    def __new__(klass, *sources):
        if len(sources)==1:
            return adapt(sources[0],IEventSource)
        elif sources:
            self =object.__new__(klass)
            self._sources = adapt(sources, protocols.sequenceOf(IEventSource))
            return self

        raise ValueError, "AnyOf must be called with one or more IEventSources"


    def nextAction(self, thread=None, state=None):
        """See 'events.ITaskSwitch.nextAction()'"""

        for source in self._sources:
            action = source.nextAction()

            if action:
                flag = source.nextAction(thread,state)

                if state is not None:
                    state.YIELD( (source,state.lastEvent) )
                return flag

        if thread is not None:
            self.addCallback(thread.step)


    def addCallback(self,func):
        """See 'events.IEventSource.addCallback()'"""

        unfired = [True]
        def onceOnly(source,event):
            if unfired:
                unfired.pop()
                return func(self, (source,event))

        for source in self._sources:
            source.addCallback(onceOnly)


class Observable(object):

    """Base class for a generic event source

    You may subclass this class to create other kinds of event sources: change
    the 'singleFire' class attribute to 'False' in your subclass if you would
    like for events to be broadcast to all callbacks, whether they accept or
    reject the event.
    """

    __slots__ = '_callbacks', '__weakref__'

    singleFire = True

    protocols.advise(
        instancesProvide=[IEventSource]
    )

    def __init__(self):
        self._callbacks = []


    def nextAction(self, thread=None, state=None):
        """See 'events.ITaskSwitch.nextAction()'"""
        if thread is not None:
            self.addCallback(thread.step)


    def addCallback(self,func):
        """See 'events.IEventSource.addCallback()'"""
        self._callbacks.append(func)










    def _fire(self, event):

        callbacks, self._callbacks = self._callbacks, []
        count = len(callbacks)

        try:
            if self.singleFire:
                while count:
                    if callbacks.pop(0)(self,event):
                        return
                    count -= 1
            else:
                while count:
                    callbacks.pop(0)(self,event)
                    count -= 1

        finally:
            self._callbacks[0:0] = callbacks      # put back unfired callbacks























class Distributor(Observable):

    """Sends each event to one callback

    This is perhaps the simplest possible 'IEventSource'.  When its 'send()'
    method is called, the supplied event is passed to any registered
    callbacks.  As soon as a callback "accepts" the event (by returning a true
    value), distribution of the event stops.

    Yielding to an 'events.Distributor' in a thread always suspends the thread
    until the next 'send()' call on the distributor.
    """

    __slots__ = ()

    def send(self,event):
        """Send 'event' to one or more callbacks, until accepted"""
        self._fire(event)


class Broadcaster(Observable):

    """Like a distributor, but broadcasting events to all callbacks"""

    __slots__ = ()

    singleFire   = False

    def send(self,event):
        """Send 'event' to all callbacks"""
        self._fire(event)










class Readable(Observable):

    """Base class for an 'IReadableSource' -- adds a '_value' and '__call__'"""

    __slots__ = '_value'

    singleFire   = False

    protocols.advise(
        instancesProvide=[IReadableSource]
    )

    def __call__(self):
        """See 'events.IReadableSource.__call__()'"""
        return self._value


class Writable(Readable):

    """Base class for an 'IWritableSource' -- adds a 'set()' method"""

    __slots__ = ()

    protocols.advise(
        instancesProvide=[IWritableSource]
    )

    def set(self,value,force=False):
        """See 'events.IWritableSource.set()'"""
        if force or value<>self._value:
            self._value = value
            self._fire(value)









class Value(Writable):

    """Broadcast changes in a variable to all observers

    'events.Value()' instances implement a near-trivial version of the
    'Observer' pattern: callbacks can be informed that a change has been
    made to the 'Value'.  Example::

        aValue = events.Value(42)
        assert aValue()==42
        aValue.set(78)  # fires any registered callbacks
        aValue.set(78)  # doesn't fire, value hasn't changed
        aValue.set(78,force=True)   # force firing even though value's the same

    Events are broadcast to all callbacks, whether they "accept" or "reject"
    the event, and threads yielding to a 'Value' are suspended until the next
    event is broadcast.  The current value of the 'Value' is supplied to
    callbacks and threads via the 'event' parameter, and the 'Value' itself
    is supplied as the 'source'.  (See 'events.IEventSink'.)
    """

    __slots__ = ()

    defaultValue = NOT_GIVEN

    def __init__(self,value=NOT_GIVEN):
        if value is NOT_GIVEN:
            value = self.defaultValue
        self._value = value
        super(Value,self).__init__()











class DerivedValue(Readable):

    """'DerivedValue(formula, *values)' - a value derived from other values

    Usage::

        # 'derived' changes whenever x or y change
        derived = DerivedValue(lambda: x()+y(), x, y)

    A 'DerivedValue' fires an event equal to 'formula()' whenever any of the
    supplied 'values' fire, and the value of 'formula()' is not equal to its
    last known value (if any).
    """

    __slots__ = '_source', '_formula', '_registered'

    def __init__(self,formula,*values):
        self._source = AnyOf(*values)
        self._formula = formula
        self._registered = False
        super(DerivedValue,self).__init__()


    def __call__(self):
        """Get current or cached value of 'formula()'"""
        try:
            return self._value
        except AttributeError:
            value = self._value = self._formula()
            self._register()
            return value


    def _register(self):
        if not self._registered and self._callbacks or hasattr(self,'_value'):
            self._registered = True
            self._source.addCallback(self._set)




    def _set(self,source,event):

        self._registered = False

        if self._callbacks:
            value = self._formula()
            self._register()

            if not hasattr(self,'_value') or self._value<>value:
                self._value = value
                self._fire(value)

        else:
            del self._value
            # no need to register, since we have neither callback nor caching


    def addCallback(self,func):
        """See 'events.IEventSource.addCallback()'"""
        super(DerivedValue,self).addCallback(func)
        self._register()




















class Conditional(Readable):

    """Base class for an 'IConditional': fires if and only if value is true"""

    protocols.advise( instancesProvide=[IConditional] )

    __slots__ = ()

    def nextAction(self, thread=None, state=None):
        """Suspend only if current value is false"""
        value = self()
        if value:
            if state is not None:
                state.YIELD(value)
            return True

        if thread is not None:
            self.addCallback(thread.step)


    def addCallback(self, func):
        """Add callback, but fire it immediately if value is currently true"""

        value = self()

        if value:
            func(self,value)
        else:
            super(Conditional,self).addCallback(func)


    def _fire(self, event):
        """Only transmit true events"""
        if event:
            super(Conditional,self)._fire(event)






class Condition(Conditional,Value):

    """Send callbacks/allow threads to proceed when condition is true

    A 'Condition' is very similar to a 'Value', except in its yielding and
    callback behavior.  Yielding to a 'Condition' in a thread will suspend the
    thread *only* if the current value of the 'Condition' is false.  If the
    'Condition' has a true value, the thread is allowed to proceed, and
    'events.resume()' will return the value of the 'Condition'.  If the
    'Condition' has a false value, the thread will be suspended until
    the value is changed to a true one.

    The behavior for callbacks is similar: when a callback is added with
    'addCallback()', it will be fired immediately if the 'Condition' is true
    at the time the callback is added.  Otherwise, the callback will be fired
    once the 'Condition' becomes true.
    """

    __slots__ = ()
    defaultValue = False





















class DerivedCondition(Conditional,DerivedValue):

    """'DerivedCondition(formula, *values)' - derive condition from value(s)

    Usage::

        # 'derived' is re-evaluated whenever x or y change
        derived = DerivedCondition(lambda: x()>=y(), x, y)

    A 'DerivedCondition' fires an event equal to 'formula()' whenever any of
    the supplied 'values' fire, the value of 'formula()' is not equal to its
    last known value (if any), and the value of 'formula()' is true.

    Note that like other 'events.IConditional' implementations, callbacks
    added to a 'DerivedCondition' will be fired immediately if the current
    value of 'formula()' is true, and threads yielding to a true
    'DerivedCondition' will also proceed immediately without waiting for a
    callback.
    """

    __slots__ = ()




















class Semaphore(Conditional,Value):

    """Allow up to 'n' threads to proceed simultaneously

    A 'Semaphore' is like a 'Condition', except that it does not broadcast
    its events.  Each event is supplied to callbacks only until one "accepts"
    the event (by returning a true value).

    'Semaphore' instances also have 'put()' and 'take()' methods that
    respectively increase or decrease their value by 1.

    Note that 'Semaphore' does not automatically decrease its count due to
    a callback or thread resumption.  You must explicitly 'take()' the
    semaphore in your thread or callback to reduce its count.
    """

    __slots__ = ()

    protocols.advise( instancesProvide=[ISemaphore] )

    singleFire   = True
    defaultValue = 0

    def put(self):
        """See 'events.ICondition.put()'"""
        self.set(self._value+1)

    def take(self):
        """See 'events.ICondition.take()'"""
        self.set(self._value-1)












cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help