from peak import context |
from peak import context |
from peak.events import trellis |
from peak.events import trellis |
from peak.util import addons |
from peak.util import addons, decorators |
from peak.util.extremes import Min, Max |
from peak.util.extremes import Min, Max |
import heapq, weakref, time |
import heapq, weakref, time |
|
|
|
|
trellis.values( |
trellis.values( |
running = False, |
running = False, |
stop_requested = False |
stop_requested = False, _call_queue = None |
) |
) |
trellis.rules( |
trellis.rules( |
_call_queue = lambda self: [] |
_call_queue = lambda self: [], |
|
_next_time = lambda self: Time.next_event_time(True), |
) |
) |
|
_callback_active = False |
|
|
def run(self): |
def run(self): |
"""Loop updating the time and invoking requested calls""" |
"""Loop updating the time and invoking requested calls""" |
assert not self.running, "EventLoop is already running" |
assert not self.running, "EventLoop is already running" |
queue = self._call_queue |
assert not trellis.ctrl.active, "Event loop can't be run atomically" |
|
trellis.atomically(self._setup) |
self.stop_requested = False |
self.stop_requested = False |
self.running = True |
self.running = True |
get_delay = Time.next_event_time |
|
try: |
try: |
while (queue or get_delay(True)) and not self.stop_requested: |
self._loop() |
if queue: |
|
f, args, kw = queue.pop(0) |
|
f(*args, **kw) |
|
if not queue: |
|
if Time.auto_update: |
|
Time.tick() |
|
else: |
|
Time.advance(get_delay(True) or 0) |
|
finally: |
finally: |
self.running = False; self.stop_requested = False |
self.running = False |
|
self.stop_requested = False |
|
|
def stop(self): |
def stop(self): |
"""Stop the event loop at the next opportunity""" |
"""Stop the event loop at the next opportunity""" |
assert self.running, "EventLoop isn't running" |
assert self.running, "EventLoop isn't running" |
self.stop_requested = True |
self.stop_requested = True |
|
|
|
decorators.decorate(trellis.modifier) |
def call(self, func, *args, **kw): |
def call(self, func, *args, **kw): |
"""Call `func(*args, **kw)` at the next opportunity""" |
"""Call `func(*args, **kw)` at the next opportunity""" |
self._call_queue.append((func, args, kw)) |
self._call_queue.append((func, args, kw)) |
|
self._setup() |
|
trellis.on_undo(self._call_queue.pop) |
|
self._callback_if_needed() |
|
|
|
|
|
|
|
def poll(self): |
|
"""Execute up to a single pending call""" |
|
self.flush(1) |
|
|
|
def flush(self, count=0): |
|
"""Execute the specified number of pending calls (0 for all)""" |
|
assert not trellis.ctrl.active, "Event loop can't be run atomically" |
|
queue = self._split_queue(count) |
|
for (f, args, kw) in queue: |
|
f(*args, **kw) |
|
self._callback_if_needed() |
|
if Time.auto_update: |
|
Time.tick() |
|
else: |
|
Time.advance(self._next_time or 0) |
|
|
|
decorators.decorate(trellis.modifier) |
|
def _callback_if_needed(self): |
|
if self._call_queue and not self._callback_active: |
|
self._arrange_callback(self._callback) |
|
self._callback_active = True |
|
|
|
decorators.decorate(trellis.modifier) |
|
def _split_queue(self, count): |
|
queue = self._call_queue |
|
count = count or len(queue) |
|
if queue: |
|
head, self._call_queue = queue[:count], queue[count:] |
|
return head |
|
return () |
|
|
|
def _callback(self): |
|
self._callback_active = False |
|
self.flush(1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _loop(self): |
|
"""Subclasses should invoke their external loop here""" |
|
queue = self._call_queue |
|
while (queue or self._next_time) and not self.stop_requested: |
|
self.flush(1) |
|
|
|
def _setup(self): |
|
"""Subclasses should import/setup their external loop here |
|
|
|
Note: must be inherently thread-safe, or else use a cell attribute in |
|
order to benefit from locking. This method is called atomically, but |
|
you should not log any undo actions.""" |
|
|
|
def _arrange_callback(self, func): |
|
"""Subclasses should register `func` to be called by external loop |
|
|
|
Note: Must be safe to call this from a 'foreign' thread.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TwistedEventLoop(EventLoop): |
class TwistedEventLoop(EventLoop): |
context.replaces(EventLoop) |
context.replaces(EventLoop) |
reactor = _delayed_call = None |
reactor = _delayed_call = None |
|
|
trellis.rules( |
|
_next_time = lambda self: Time.next_event_time(True) |
|
) |
|
|
|
def _ticker(self): |
def _ticker(self): |
if Time.auto_update and self.running: |
if self.running: |
|
if Time.auto_update: |
if self._next_time is not None: |
if self._next_time is not None: |
if self._delayed_call and self._delayed_call.active(): |
if self._delayed_call and self._delayed_call.active(): |
self._delayed_call.reset(self._next_time) |
self._delayed_call.reset(self._next_time) |
self._delayed_call = self.reactor.callLater( |
self._delayed_call = self.reactor.callLater( |
self._next_time, Time.tick |
self._next_time, Time.tick |
) |
) |
_ticker = trellis.rule(_ticker) |
if self.stop_requested: |
|
self.reactor.stop() |
|
|
def run(self): |
_ticker = trellis.observer(_ticker) |
|
|
|
def _loop(self): |
"""Loop updating the time and invoking requested calls""" |
"""Loop updating the time and invoking requested calls""" |
assert not self.running, "EventLoop is already running" |
|
if self.reactor is None: |
|
self.setup_reactor() |
|
self.stop_requested = False |
|
Time.tick() |
Time.tick() |
self.running = True |
|
try: |
|
self.reactor.run() |
self.reactor.run() |
finally: |
|
self.running = False |
|
self.stop_requested = False |
|
|
|
def stop(self): |
|
"""Stop the event loop at the next opportunity""" |
|
assert self.running, "EventLoop isn't running" |
|
self.stop_requested = True |
|
self.reactor.stop() |
|
|
|
def call(self, func, *args, **kw): |
|
"""Call `func(*args, **kw)` at the next opportunity""" |
|
if not self._call_queue: |
|
if self.reactor is None: |
|
self.setup_reactor() |
|
self.reactor.callLater(0, self._purge_queue) |
|
self._call_queue.append((func, args, kw)) |
|
|
|
def _purge_queue(self): |
def _arrange_callback(self, func): |
# twisted doesn't guarantee sequential callbacks, but this does... |
self.reactor.callLater(0, func) |
f, args, kw = self._call_queue.pop(0) |
|
f(*args, **kw) |
|
if self._call_queue: |
|
self.reactor.callLater(0, self._purge_queue) |
|
|
|
def setup_reactor(self): |
def _setup(self): |
if self.reactor is None: |
if self.reactor is None: |
from twisted.internet import reactor |
from twisted.internet import reactor |
self.reactor = reactor |
self.reactor = reactor |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WXEventLoop(EventLoop): |
class WXEventLoop(EventLoop): |
"""wxPython version of the event loop |
"""wxPython version of the event loop |
|
|
own risk. :( |
own risk. :( |
""" |
""" |
context.replaces(EventLoop) |
context.replaces(EventLoop) |
|
wx = None |
|
|
wx = _call_queue = None |
|
trellis.rules( |
|
_next_time = lambda self: Time.next_event_time(True) |
|
) |
|
def _ticker(self): |
def _ticker(self): |
if self.running: |
if self.running: |
if Time.auto_update: |
if Time.auto_update: |
if self._next_time is not None: |
if self._next_time is not None: |
self.wx.FutureCall(self._next_time*1000, Time.tick) |
self.wx.FutureCall(self._next_time*1000, Time.tick) |
_ticker = trellis.rule(_ticker) |
if self.stop_requested: |
|
self.wx.GetApp().ExitMainLoop() |
|
_ticker = trellis.observer(_ticker) |
|
|
def run(self): |
def _loop(self): |
"""Loop updating the time and invoking requested calls""" |
"""Loop updating the time and invoking requested calls""" |
assert not self.running, "EventLoop is already running" |
|
if not self.wx: |
|
import wx |
|
self.wx = wx |
|
app = self.wx.GetApp() |
app = self.wx.GetApp() |
assert app is not None, "wx.App not created" |
assert app is not None, "wx.App not created" |
self.running = True |
|
try: |
|
while not self.stop_requested: |
while not self.stop_requested: |
app.MainLoop() |
app.MainLoop() |
if app.ExitOnFrameDelete: # handle case where windows exist |
if app.ExitOnFrameDelete: # handle case where windows exist |
self.stop_requested = True |
self.stop_requested = True |
else: |
else: |
app.ProcessPendingEvents() # ugh |
app.ProcessPendingEvents() # ugh |
finally: |
|
self.running = False |
|
self.stop_requested = False |
|
|
|
def stop(self): |
def _arrange_callback(self, func): |
"""Stop the event loop at the next opportunity""" |
|
assert self.running, "EventLoop isn't running" |
|
self.stop_requested = True |
|
self.wx.GetApp().ExitMainLoop() |
|
|
|
def call(self, func, *args, **kw): |
|
"""Call `func(*args, **kw)` at the next opportunity""" |
"""Call `func(*args, **kw)` at the next opportunity""" |
|
self.wx.CallAfter(func) |
|
|
|
def _setup(self): |
if not self.wx: |
if not self.wx: |
import wx |
import wx |
self.wx = wx |
self.wx = wx |
self.wx.CallAfter(func, *args, **kw) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Time(trellis.Component, context.Service): |
class Time(trellis.Component, context.Service): |
|
|
trellis.values( |
trellis.values( |
_tick = EPOCH._when, |
_tick = EPOCH._when, |
auto_update = True |
auto_update = True, |
|
_schedule = None, |
) |
) |
_now = EPOCH._when |
_now = EPOCH._when |
|
|
_events = lambda self: weakref.WeakValueDictionary(), |
_events = lambda self: weakref.WeakValueDictionary(), |
) |
) |
def _updated(self): |
def _updated(self): |
if self._updated is None: |
|
updated = set() |
|
else: |
|
# Always return the same object, so this rule NEVER changes value! |
|
# This ensures that only the most-recently fired event rules recalc |
|
updated = self._updated |
|
updated.clear() |
|
|
|
while self._tick >= self._schedule[0]: |
while self._tick >= self._schedule[0]: |
key = heapq.heappop(self._schedule) |
key = heapq.heappop(self._schedule) |
if key in self._events: |
if key in self._events: |
updated.add(key) |
self._events.pop(key).value = True |
self._events.pop(key).ensure_recalculation() |
|
return updated |
|
|
|
_updated = trellis.rule(_updated) |
_updated = trellis.rule(_updated) |
|
|
if when not in self._events: |
if when not in self._events: |
if self._now >= when: |
if self._now >= when: |
return True |
return True |
|
if trellis.ctrl.current_listener is not None: |
heapq.heappush(self._schedule, when) |
heapq.heappush(self._schedule, when) |
self._events[when] = e = \ |
trellis.ctrl.changed(self.__cells__['_schedule']) |
trellis.Cell(lambda: e.value or when in self._updated, False) |
self._events[when] = e = trellis.Value(False) |
|
else: |
|
return False |
return self._events[when].value |
return self._events[when].value |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __getitem__(self, interval): |
def __getitem__(self, interval): |
"""Return a timer that's the given offset from the current time""" |
"""Return a timer that's the given offset from the current time""" |
return _Timer(self._now + interval) |
return _Timer(self._now + interval) |
self._set(self.time()) |
self._set(self.time()) |
|
|
def _set(self, when): |
def _set(self, when): |
self._now = when |
trellis.change_attr(self, '_now', when) |
self._tick = when |
self._tick = when |
|
_set = trellis.modifier(_set) |
|
|
def _tick(self): |
def _tick(self): |
if self.auto_update: |
if self.auto_update: |
trellis.poll() |
trellis.poll() |
return tick |
return tick |
return self._tick |
return self._tick |
|
|
_tick = trellis.rule(_tick) |
_tick = trellis.rule(_tick) |
|
|
def next_event_time(self, relative=False): |
def next_event_time(self, relative=False): |