[Subversion] / Contextual / peak / util / context.py  

View of /Contextual/peak/util/context.py

Parent Directory | Revision Log
Revision: 2226 - (download) (as text)
Thu Jul 20 02:31:46 2006 UTC (17 years, 9 months ago) by pje
File size: 16797 byte(s)
Use ProxyTypes' proxy types instead of defining local ones.
"""Dynamic, contextual variables and services"""

__all__ = [
    'Action','Resource', 'Config','Setting','SettingConflict','NoValueFound',
    'Namespace', 'Global', 'new', 'snapshot', 'swap',   # system
    'Proxy', 'call_with', 'with_', 'manager', 'gen_exc_info',    # PEP 343 impl.
]
    # XXX: clonef, qname, default_fallback, Replaceable, Scope, Globals, replace

from peak.util.symbols import Symbol, NOT_GIVEN, NOT_FOUND
from peak.util.proxies import ObjectWrapper, CallbackProxy

try:
    from thread import get_ident
except ImportError:
    from dummy_thread import get_ident

import sys
from new import function
contexts = {}


def _read_ctx():
    tid = get_ident()
    try:
        return contexts[tid]
    except KeyError:
        d = contexts[tid] = {}
        return d

def _write_ctx():
    tid = get_ident()
    try:
        d = contexts[tid]
    except KeyError:
        d = contexts[tid] = {}
    if '__frozen__' in d:
        d = contexts[tid] = d.copy()
        del d['__frozen__']
    return d

def with_(ctx, func):
    """Perform PEP 343 "with" logic for Python versions <2.5

    The following examples do the same thing at runtime::

        Python 2.5+          Python 2.4
        ------------         -------------
        with x as y:         z = with_(x,f)
            z = f(y)

    This function is used to implement the ``call_with()`` decorator, but
    can also be used directly.  It's faster and more compact in the case where
    the function ``f`` already exists.
    """
    inp = ctx.__enter__()
    try:
        retval = func(inp)
    except:
        if not ctx.__exit__(*sys.exc_info()):
            raise
    else:
        ctx.__exit__(None, None, None)
        return retval

def reraise():
    """Reraise the current contextmanager exception, if any"""
    typ,val,tb = gen_exc_info()
    if typ:
        try:
            raise typ,val,tb
        finally:
            del typ,val,tb

def replaces(target):
    def decorator(cls):
        assert issubclass(cls,Replaceable)
        cls.current = staticmethod(target.current)
        return cls
    from peak.util.decorators import decorate_class
    decorate_class(decorator)

def call_with(ctxmgr):
    """Emulate the PEP 343 "with" statement for Python versions <2.5

    The following examples do the same thing at runtime::

        Python 2.5+          Python 2.4
        ------------         -------------
        with x as y:         @call_with(x)
            print y          def do_it(y):
                                 print y

    ``call_with(foo)`` returns a decorator that immediately invokes the
    function it decorates, passing in the same value that would be bound by
    the ``as`` clause of the ``with`` statement.  Thus, by decorating a
    nested function, you can get most of the benefits of "with", at a cost of
    being slightly slower and perhaps a bit more obscure than the 2.5 syntax.

    Note: because of the way decorators work, the return value (if any) of the
    ``do_it()`` function above will be bound to the name ``do_it``.  So, this
    example prints "42"::

        @call_with(x)
        def do_it(y):
            return 42

        print do_it

    This is rather ugly, so you may prefer to do it this way instead, which
    more explicitly calls the function and gets back a value::

        def do_it(y):
            return 42

        print with_(x,do_it)
    """
    return with_.__get__(ctxmgr, type(ctxmgr))





class _GeneratorContextManager(object):
    """Helper class for ``@context.manager``"""

    __slots__ = "geniter"

    def __init__(self, geniter):
        self.geniter = geniter

    def __enter__(self):
        for value in self.geniter:
            return value
        else:
            raise RuntimeError("Generator didn't yield a value")

    def __exit__(self,*exc):
        try:
            old = gen_exc_info()
            _write_ctx()[gen_exc_info] = exc
            try:
                for value in self.geniter:
                    break
                else:
                    return True     # generator swallowed exception
            except:
                if not exc or sys.exc_info()[1] is not exc[1]: raise
                return False
            raise RuntimeError("Generator didn't stop")
        finally:
            _write_ctx()[gen_exc_info] = old


def manager(func):
    """Emulate 2.5 ``@contextmanager`` decorator"""
    def helper(*args, **kwds):
        return _GeneratorContextManager(func(*args, **kwds))
    helper.__name__ = func.__name__
    helper.__doc__  = func.__doc__
    return helper



def make_variable(name, factory, doc, module, scope):
    _not_given = NOT_GIVEN
    _qname = qname
    read_scope = scope.get_read_scope_getter()
    write_scope = scope.get_write_scope_getter()

    def var(value = _not_given):
        if value is _not_given:
            _scope = read_scope()
            if _scope is None:
                raise RuntimeError(
                    "%s cannot be used outside of %s scope" %
                    (_qname(var), _qname(scope))
                )
            try:
                return _scope[var]
            except KeyError:
                value = factory()
                write_scope()[var] = value
                return value

        write_scope()[var] = value

    var = clonef(var,name,doc,module,level=3)
    var.__clone__ = lambda name, factory=factory: make_variable(
        name, factory, doc, module, scope
    )
    return var

def clonef(f, name, doc, module, frame=None, level=2):
    """Clone a func with reset globals to look like it came from elsewhere"""
    if module is not None:
        __import__(module)
        _globals = sys.modules[module].__dict__
    else:
        _globals = (frame or sys._getframe(level)).f_globals
    # clone the function using the targeted module (or caller's) globals
    f = function(f.func_code, _globals, name, f.func_defaults, f.func_closure)
    f.__doc__  = doc  or f.__doc__
    return f

def qname(f):
    if hasattr(f,'__module__'):
        m = f.__module__
    else:
        m = f.func_globals.get('__name__')
    if m:
        return '%s.%s' % (m,f.__name__)
    return f.__name__

@manager
def _pusher(f,val):
    """Helper that sets and resets a context global"""
    old = f()
    _write_ctx()[f] = val
    yield None  # XXX should this be old, or val instead?
    _write_ctx()[f] = old
    reraise()

def default_fallback(config,key):
    """Look up the key in the config's parent scope, or error message"""
    try:
        return config.parent[key]
    except TypeError:
        if config.parent is None:
            raise NoValueFound(key)
        raise


_ctx_stack = object()

def delegated_enter(self):
    ctx = self.__context__()
    _write_ctx().setdefault(_ctx_stack,[]).append(ctx)
    return ctx.__enter__()

def delegated_exit(self, typ, val, tb):
    ctx = _read_ctx()[_ctx_stack].pop()
    return ctx.__exit__(typ, val, tb)



class ScopedClass(type):
    def __init__(cls, name, bases, cdict):
        super(ScopedClass, cls).__init__(cls,name,bases,cdict)
        if cls.scope is not None and 'current' not in cdict:
            cls.current = staticmethod(
                make_variable(
                    name+".current", cls.__default__,
                    "Get or set the current "+name+" scope",
                    cls.__module__, cls.scope
                )
            )


class Globals(object):

    @classmethod
    def get_read_scope_getter(cls):
        return _read_ctx

    @classmethod
    def get_write_scope_getter(cls):
        return _write_ctx

    def __new__(*args):
        raise TypeError("Globals can't be instantiated")


@manager
def replace(func, val):
    """Context that temporarily replaces a scope, variable, or proxy"""
    if type(func).__name__=='FunctionProxy' and hasattr(func,'__func__'):
        func = func.__func__
    elif isinstance(func, ScopedClass) and hasattr(func, 'current'):
        func = func.current
    old = func()
    func(val)
    yield val
    func(old)
    reraise()


class Replaceable(object):

    __slots__ = ()
    __metaclass__ = ScopedClass

    scope = Globals

    @classmethod
    def __default__(cls):
        return cls()

    def __context__(self):
        return replace(self.current, self)

    __enter__ = delegated_enter
    __exit__  = delegated_exit

    @classmethod
    def proxy(cls):
        return Proxy(cls.current)


class Scope(Replaceable):
    __slots__ = ()

    @classmethod
    def get_read_scope_getter(cls):
        return cls.current

    @classmethod
    def get_write_scope_getter(cls):
        return cls.current









class Config(Scope):
    __slots__ = 'parent', 'data'

    def __init__(self, parent=NOT_GIVEN):
        if parent is NOT_GIVEN:
            parent = self.current()
        self.parent = parent
        self.data = {}

    @classmethod
    def __default__(cls):
        return cls.root     # default Config is the root config

    def __getitem__(self,key):
        try:
            return self.data[key]
        except KeyError:
            fallback = getattr(key, '__config_fallback__', default_fallback)
            self[key] = value = fallback(self,key)
            return value

    def __setitem__(self,key,val):
        old = self.data.setdefault(key,val)
        if old is not val and old != val:
            raise SettingConflict(
                "a different value for %s is already defined" % qname(key)
            )

Config.root = Config(None)












class Action(Scope):
    __slots__ = 'config', 'managers', 'cache', 'status'

    @classmethod
    def get_write_scope_getter(cls):
        return Config.current

    @classmethod
    def __default__(cls):
        return None     # no default Action

    def __init__(self, config=None):
        self.managers = []
        self.cache = {}
        self.status = {}
        if config is None:
            config = self.get_write_scope_getter()()
        self.config = config

    def __getitem__(self,key):
        try:
            res = self.cache[key]
        except KeyError:
            cfg = self.config
            factory = cfg[key]
            status = self.status.get(key)
            if status:
                raise RuntimeError(
                    "Circular dependency for %s (via %s)"
                    % (qname(key),factory)
                )
            self.status[key] = 1    # recursion guard
            try:
                res = self.cache[key] = self.manage(
                    with_(cfg, lambda arg: factory())
                )
            finally:
                del self.status[key]
        return res


    def manage(self, ob):
        try:
            enter = ob.__enter__
        except AttributeError:
            return ob
        ctx = ob
        ob = ctx.__enter__()

        # don't call __exit__ unless __enter__ succeeded
        # (if there was an error, we wouldn't have gotten this far)
        self.managers.append(ctx)
        return ob

    def __enter__(self):
        if self.managers:
            raise RuntimeError("Action is already in use")
        self.manage(replace(self.current, self))

    def __exit__(self, *exc):
        if not self.managers:
            raise RuntimeError("Action is not currently in use")

        managers = self.managers
        while managers:
            managers.pop().__exit__(*exc)  # XXX how do we handle errors?

        self.cache.clear()

    # TODO: prevent closed resource access during __exit__












class SettingConflict(Exception):
    """Attempt to set conflicting value in a scope"""

class NoValueFound(LookupError):
    """No value was found for the setting or resource"""

def Setting(name="setting", default=None, doc=None, module=None, scope=Config):
    doc = doc or \
        """A context.Setting that was defined without a docstring

        Call with zero arguments to get its value in the current scope, or with
        one argument to change the setting to the passed-in value.  Note that
        settings may only be changed once in a given scope, and then only if
        they have not been read within that scope.  Settings that are read
        without having been set inherit their value from the parent
        configuration of the current configuration.
        """
    setting = make_variable(name, lambda:default, doc, module, scope)
    if default is not NOT_GIVEN:
        Config.root[setting] = default
    return setting

def Resource(name="resource", factory=None, doc=None, module=None, scope=Action):
    doc = doc or \
        """A context.Resource that was defined without a docstring

        Call with zero arguments to get the instance for the current action
        scope, or with one argument to set the resource's factory in the
        current *configuration* scope (which may not be the same configuration
        being used by the current action scope).
        """
    resource = make_variable(name, lambda:factory, doc, module, scope)

    if factory is not NOT_GIVEN:
        Config.root[resource] = factory

    return resource




class Namespace(ObjectWrapper):

    def __getattr__(self, key):
        try:
            return self.__dict__[key]
        except KeyError:
            if key.startswith('func_') or key.startswith('__'):
                return getattr(self.__subject__,key)
            return self[key]

    def __getitem__(self,key):
        if '.' in key:
            for key in key.split('.'):
                self = self[key]
            return self
        try:
            return self.__dict__[key]
        except KeyError:
            # TODO: verify syntax of key: nonempty, valid chars, ...?
            val = self.__clone__("%s.%s" % (self.__name__,key), NOT_GIVEN)
            if key=='*':
                if hasattr(self,'__namespace__'):
                    ns = self.__namespace__
                    val.__config_fallback__ = lambda m,k: m[ns['*']]
                else:
                    val.__config_fallback__ = lambda m,k: default_fallback
            else:
                val.__config_fallback__ = lambda m,k: m[self['*']](m,k)

            val.__namespace__ = self
            self.__dict__[key] = val = Namespace(val)
            return val

    def __contains__(self,key):
        for key in key.split('.'):
            if key not in self.__dict__:
                return False
            self = self[key]
        return True


    def __iter__(self):
        for key in self.__dict__:
            if not key.startswith('__') and key!='*':
                yield key


def Global(name="unnamed_global", default=None, doc=None, module=None):

    _not_given = NOT_GIVEN
    __read = _read_ctx
    __pusher = _pusher

    def f(value = _not_given):
        """A context.Global that was defined without a docstring

        Call with zero arguments to get its value, or with one argument to
        receive a contextmanager that temporarily sets the global to the
        passed-in value, for the duration of the "with:" block it's used to
        create.
        """
        if value is _not_given:
            return __read().get(f,default)
        else:
            return __pusher(f,value)

    f = clonef(f,name,doc,module)   # *must* rebind f to cloned function here
    f.__clone__ = (
        lambda name=name,default=default,doc=doc,
        module=f.func_globals.get('__name__'): Global(name,default,doc,module))
    return f

gen_exc_info = Global("gen_exc_info", (None,None,None))

Proxy = CallbackProxy







def new():
    """Return a new, empty thread state"""
    return {'__frozen__':True}

def snapshot():
    """Return a snapshot of the all of this thread's current state globals

    This returns an object that can be passed into ``context.swap()`` to
    restore all context globals to what they were at this point in time.
    A copy-on-write strategy is used, so that snapshots can always be taken
    in constant time, and no extra memory is used if multiple snapshots are
    taken during a period where no changes have occurred.
    """
    ctx = _read_ctx()
    ctx['__frozen__'] = True
    return ctx

def swap(state):
    """Set this thread's state to `snapshot`, returning a "before" snapshot

    `snapshot` should be a value previously returned by either the
    ``snapshot()`` function or by ``swap()``.  Before the thread state is set,
    a new snapshot is taken and returned.

    The passed-in snapshot is not modified by this routine, so you can reuse
    it as many times as you want to restore that particular state.
    """
    old = snapshot()
    contexts[get_ident()] = state
    return old


def doctest_suite():
    import doctest
    return doctest.DocFileSuite(
        'context.txt', optionflags=doctest.ELLIPSIS, module_relative=False,
    )





cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help