[Subversion] / Trellis / peak / events / activity.py  

View of /Trellis/peak/events/activity.py

Parent Directory | Revision Log
Revision: 2436 - (download) (as text)
Fri Nov 30 22:46:43 2007 UTC (16 years, 5 months ago) by pje
File size: 9766 byte(s)
The new algorithm lands - version bump to 0.6a1dev.
Too many changes to list!
from peak import context
from peak.events import trellis
from peak.util import addons, decorators
from peak.util.extremes import Min, Max
import heapq, weakref, time

__all__ = [
    'Time', 'EPOCH', 'NOT_YET', 'EventLoop', 'WXEventLoop', 'TwistedEventLoop',
]

try:
    set
except NameError:
    from sets import Set as set
        


























class _Timer(object):
    """Value representing a moment in time"""

    __slots__ = '_when'

    def __init__(self, _when):
        self._when = _when

    def __getitem__(self, interval):
        """Get a timer that's offset from this one by `interval` seconds"""
        if self is NOT_YET: return self
        return _Timer(self._when + interval)

    def __sub__(self, other):
        """Get the interval in seconds between two timers"""
        if not isinstance(other, _Timer):
            raise TypeError("Can't subtract %r from timer" % (other,))
        return self._when - other._when

    def __eq__(self, other):
        if not isinstance(other, _Timer):
            return False
        return self._when == other._when

    def __ne__(self, other):
        return not self==other

    def __ge__(self, other):
        return not self < other

    def __nonzero__(self):
        return Time.reached(self)

    def __lt__(self, other):
        if not isinstance(other, _Timer):
            raise TypeError # for now
        return self._when < other._when

    def __hash__(self):
        return hash(self._when)

    def begins_with(self, flag):
        """Keep track of the moment when `flag` first becomes true"""
        if flag:
            return min(self, Time[0])
        return NOT_YET

EPOCH = _Timer(0)
NOT_YET = _Timer(Max)
    
































class EventLoop(trellis.Component, context.Service):
    """Run an application event loop"""

    trellis.values(
        running = False,
        stop_requested = False, _call_queue = None
    )
    trellis.rules(
        _call_queue = lambda self: [],
        _next_time = lambda self: Time.next_event_time(True),
    )
    _callback_active = False

    def run(self):
        """Loop updating the time and invoking requested calls"""
        assert not self.running, "EventLoop is already running"
        assert not trellis.ctrl.active, "Event loop can't be run atomically"
        trellis.atomically(self._setup)
        self.stop_requested = False
        self.running = True
        try:
            self._loop()
        finally:
            self.running = False
            self.stop_requested = False
            
    def stop(self):
        """Stop the event loop at the next opportunity"""
        assert self.running, "EventLoop isn't running"
        self.stop_requested = True

    decorators.decorate(trellis.modifier)
    def call(self, func, *args, **kw):
        """Call `func(*args, **kw)` at the next opportunity"""
        self._call_queue.append((func, args, kw))
        self._setup()
        trellis.on_undo(self._call_queue.pop)
        self._callback_if_needed()



    def poll(self):
        """Execute up to a single pending call"""
        self.flush(1)

    def flush(self, count=0):
        """Execute the specified number of pending calls (0 for all)"""
        assert not trellis.ctrl.active, "Event loop can't be run atomically"
        queue = self._split_queue(count)
        for (f, args, kw) in queue:
            f(*args, **kw)
        self._callback_if_needed()
        if Time.auto_update:
            Time.tick()                                        
        else:
            Time.advance(self._next_time or 0)

    decorators.decorate(trellis.modifier)
    def _callback_if_needed(self):
        if self._call_queue and not self._callback_active:
            self._arrange_callback(self._callback)
            self._callback_active = True
        
    decorators.decorate(trellis.modifier)
    def _split_queue(self, count):
        queue = self._call_queue
        count = count or len(queue)
        if queue:
            head, self._call_queue = queue[:count], queue[count:]
            return head
        return ()

    def _callback(self):
        self._callback_active = False
        self.flush(1)







    def _loop(self):
        """Subclasses should invoke their external loop here"""
        queue = self._call_queue
        while (queue or self._next_time) and not self.stop_requested:
            self.flush(1)

    def _setup(self):
        """Subclasses should import/setup their external loop here

        Note: must be inherently thread-safe, or else use a cell attribute in
        order to benefit from locking.  This method is called atomically, but
        you should not log any undo actions."""

    def _arrange_callback(self, func):
        """Subclasses should register `func` to be called by external loop

        Note: Must be safe to call this from a 'foreign' thread."""
























class TwistedEventLoop(EventLoop):
    """Twisted version of the event loop"""

    context.replaces(EventLoop)    
    reactor = _delayed_call = None

    def _ticker(self):
        if self.running:
            if Time.auto_update:
                if self._next_time is not None:
                    if self._delayed_call and self._delayed_call.active():
                        self._delayed_call.reset(self._next_time)
                    else:
                        self._delayed_call = self.reactor.callLater(
                            self._next_time, Time.tick
                        )
            if self.stop_requested:
                self.reactor.stop()

    _ticker = trellis.observer(_ticker)

    def _loop(self):
        """Loop updating the time and invoking requested calls"""
        Time.tick()
        self.reactor.run()

    def _arrange_callback(self, func):
        self.reactor.callLater(0, func)

    def _setup(self):
        if self.reactor is None:
            from twisted.internet import reactor
            self.reactor = reactor








class WXEventLoop(EventLoop):
    """wxPython version of the event loop

    This isn't adequately tested; the wx event loop is completely hosed when
    it comes to running without any windows, so it's basically impossible to
    unit test without mocking 'wx' (which I haven't tried to do.  Use at your
    own risk.  :(
    """
    context.replaces(EventLoop)    
    wx = None

    def _ticker(self):
        if self.running:
            if Time.auto_update:
                if self._next_time is not None:
                    self.wx.FutureCall(self._next_time*1000, Time.tick)
            if self.stop_requested:
                self.wx.GetApp().ExitMainLoop()
    _ticker = trellis.observer(_ticker)

    def _loop(self):
        """Loop updating the time and invoking requested calls"""
        app = self.wx.GetApp()
        assert app is not None, "wx.App not created"
        while not self.stop_requested:
            app.MainLoop()
            if app.ExitOnFrameDelete:   # handle case where windows exist
                self.stop_requested = True
            else:
                app.ProcessPendingEvents()  # ugh

    def _arrange_callback(self, func):
        """Call `func(*args, **kw)` at the next opportunity"""
        self.wx.CallAfter(func)
            
    def _setup(self):
        if not self.wx:
            import wx
            self.wx = wx


class Time(trellis.Component, context.Service):
    """Manage current time and intervals"""

    trellis.values(
        _tick = EPOCH._when,
        auto_update = True,
        _schedule = None,
    )
    _now   = EPOCH._when

    trellis.rules(
        _schedule = lambda self: [Max],
        _events = lambda self: weakref.WeakValueDictionary(),
    )
    def _updated(self):
        while self._tick >= self._schedule[0]:
            key = heapq.heappop(self._schedule)
            if key in self._events:
                self._events.pop(key).value = True

    _updated = trellis.rule(_updated)   

    def reached(self, timer):
        when = timer._when
        if when not in self._events:
            if self._now >= when:
                return True
            if trellis.ctrl.current_listener is not None:
                heapq.heappush(self._schedule, when)
                trellis.ctrl.changed(self.__cells__['_schedule'])
                self._events[when] = e = trellis.Value(False)
            else:
                return False
        return self._events[when].value







    def __getitem__(self, interval):
        """Return a timer that's the given offset from the current time"""
        return _Timer(self._now + interval)

    def advance(self, interval):
        """Advance the current time by the given interval"""
        self._set(self._now + interval)

    def tick(self):
        """Update current time to match ``time.time()``"""
        self._set(self.time())

    def _set(self, when):
        trellis.change_attr(self, '_now', when)
        self._tick = when
    _set = trellis.modifier(_set)

    def _tick(self):
        if self.auto_update:
            tick = self._now = self.time()            
            trellis.poll()
            return tick
        return self._tick
    _tick = trellis.rule(_tick)

    def next_event_time(self, relative=False):
        """The time of the next event to occur, or ``None`` if none scheduled

        If `relative` is True, returns the number of seconds until the event;
        otherwise, returns the absolute ``time.time()`` of the event.
        """
        now = self._tick   # ensure recalc whenever time moves forward
        when = self._schedule[0]
        if when is Max:
            return None
        if relative:
            return when - now
        return when

    def time(self): return time.time()


cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help