[Subversion] / Contextual / context_tests.txt  

View of /Contextual/context_tests.txt

Parent Directory | Revision Log
Revision: 2279 - (download)
Sat Feb 24 05:37:44 2007 UTC (17 years, 1 month ago) by pje
File size: 13199 byte(s)
Major API overhaul.  Service classes now act like peak.binding.Singletons,
in that the class itself is a proxy for the current instance.  This
eliminates the need for two names to refer to the "same" object.  Settings
are now created with decorators, the module is peak.context instead of
peak.util.context, and many many other changes.  And there are still more
to come, but mostly additions and some tweaks to how the App context works.
==========================
``peak.context`` Internals
==========================

>>> from peak import context

Function Cloning
================

The ``_clonef()`` function makes a new function that "looks like" a source
function (i.e., has the same name, docstring, module, and attributes), but
whose implementation and arguments come from a different function::

    >>> def source_func(x,y,z):
    ...     """The doc is here"""
    ...     return 99
    >>> source_func.a = 99

    >>> help(source_func)
    Help on function source_func:
    <BLANKLINE>
    source_func(x, y, z)
        The doc is here...

    >>> def impl(a,b):
    ...     "foo"
    ...     return 42

    >>> f3 = context._clonef(source_func, impl)

    >>> help(f3)
    Help on function source_func:
    <BLANKLINE>
    source_func(a, b)
        The doc is here...

    >>> f3(1,2)
    42

    >>> f3.a
    99
    >>> source_func.a = 57
    >>> f3.a
    99

Parameter Lookup
================

Simple set/get::

    >>> key1 = object()
    >>> key2 = object()
    >>> context._set_param(key1, 42)
    >>> context._set_param(key2, 57)
    >>> context._get_param(key1), context._get_param(key2)
    (42, 57)


Values are per-thread::

    >>> def test_other_thread():
    ...     context._set_param(key1, "hey you")
    ...     context._set_param(key2, "whazzup?")
    ...     print (context._get_param(key1), context._get_param(key2))

    >>> from threading import Thread
    >>> t = Thread(target=test_other_thread)
    >>> t.start(); t.join()
    ('hey you', 'whazzup?')

    >>> context._get_param(key1), context._get_param(key2)
    (42, 57)

Setting a key to ``NOT_GIVEN`` deletes it::

    >>> context._set_param(key1, context.NOT_GIVEN)

Accessing an unset key raises a ``KeyError``::

    >>> context._get_param(key1)
    Traceback (most recent call last):
      ...
    KeyError: <object object at ...>

But deleting an unset key does *not*::

    >>> context._set_param(key1, context.NOT_GIVEN)


Parameter Objects
=================

``@context.parameter`` is a decorator that turns a zero-argument function into
a dynamic variable, whose default value in a new context is the function's
return value when called in that context::

    >>> def my_param():
    ...     """A Parameter"""
    ...     return 42   # default value

    >>> my_param = context.parameter(my_param)

    >>> help(my_param)
    Help on function my_param...:
    <BLANKLINE>
    my_param(value=NOT_GIVEN)
        A Parameter...

    >>> my_param()
    42

Calling a parameter function with an argument returns a context manager that
temporarily sets the parameter to that value (so you can say, e.g.,
``with my_param(99):`` and have it work)::

    >>> cm = my_param(99)
    >>> cm.__enter__()
    99
    >>> my_param()
    99
    >>> cm.__exit__(None, None, None)
    >>> my_param()
    42

It should also save the un-computed state of an as-yet-unused parameter::

    >>> def p2():
    ...     print "computing"
    ...     return 69
    >>> p2 = context.parameter(p2)
    >>> cm = p2(57)
    >>> cm.__enter__()
    57
    >>> p2()
    57
    >>> cm.__exit__(None, None, None)
    >>> p2()
    computing
    69



Delegated Contexts
==================

    >>> class dg(context.Delegated):
    ...     def __context__(self):
    ...         print "beginning"
    ...         yield 42
    ...         print "ending"
    ...     __context__ = context.manager(__context__)

    >>> d1 = dg()
    >>> d1.__enter__()
    beginning
    42
    >>> d2 = dg()
    >>> d2.__enter__()
    beginning
    42
    >>> d2.__exit__(None, None, None)
    ending
    >>> d1.__exit__(None, None, None)
    ending

    >>> d1.__enter__()
    beginning
    42
    >>> d2.__enter__()
    beginning
    42
    >>> d1.__exit__(None, None, None)
    Traceback (most recent call last):
      ...
    AssertionError: context stack is corrupted

    >>> d1.__exit__(None, None, None)
    ending


Services
========

    >>> class S1(context.Service): pass
    >>> class S2(context.Service): context.replaces(S1)

    >>> S1.get is S2.get
    True

    >>> S1.get() is S2.get()
    True

    >>> s1 = S1.get()
    >>> type(s1)
    <class 'S1'>

    >>> s2 = S2()
    >>> s2.__enter__()
    <S2 object at ...>
    >>> s2 is S1.get() is S2.get()
    True
    >>> s2.__exit__(None,None,None)
    >>> s1 is S1.get()
    True

The ``__default__()`` classmethod of a service is called to create its default
instance in a given context::

    >>> class S3(context.Service):
    ...     def __default__(cls):
    ...         print "creating default instance of", cls
    ...         return cls()
    ...     __default__ = classmethod(__default__)

    >>> s3 = S3.get()
    creating default instance of <class 'S3'>

``context.replaces()`` can only be called inside a ``context.Service`` subclass
definition, and only once::

    >>> context.replaces(S1)
    Traceback (most recent call last):
      ...
    SyntaxError: Class decorators may only be used inside a class statement

    >>> class X:
    ...     context.replaces(S1)
    Traceback (most recent call last):
      ...
    TypeError: context.replaces() can only be used in a context.Service subclass

    >>> class X(context.Service):   # doctest: +NORMALIZE_WHITESPACE
    ...     context.replaces(S1)
    ...     context.replaces(S3)
    Traceback (most recent call last):
      ...
    ValueError: replaces() must be used only once per class; there is already a
    value for ``get``: <bound method ...get of <class 'S1'>>



Configuration Objects
=====================

    >>> c = context.Config()

    >>> c.parent is context.Config.root
    True

    >>> c2 = context.Config(c)
    >>> c2.parent is c
    True

    >>> c2.__enter__()
    <...Config object ...>

    >>> c3 = context.Config()
    >>> c3.parent is c2
    True
    >>> c2.__exit__(None, None, None)

    >>> c2[1] = 2
    >>> c2[1]
    2
    >>> c2[1] = 2
    >>> c2[1] = 3
    Traceback (most recent call last):
      ...
    SettingConflict: a different value for 1 is already defined

    >>> c3[1] = 9
    >>> c3[1]
    9

    >>> def aSetting(cfg, key):
    ...     print "Computing for ", cfg, key
    ...     return 42

    >>> aSetting = context.setting(aSetting)
    >>> aSetting()  # gets computed & cached in the root
    Computing for  <...context.Config object...> <function aSetting...>
    42

    >>> c.__enter__()
    <...context.Config object ...>

    >>> aSetting()
    42
    >>> c[aSetting] = 42
    >>> context.Config[aSetting] = 42
    
    >>> c.__exit__(None, None, None)

    >>> c3[aSetting] = 99
    >>> c3[aSetting]
    99
    >>> c3[aSetting] = 99



Applications and Services
=========================

By default, service factories are always executed in the context of the
application's main configuration, even if a different configuration is active
at the time the service is requested::

    >>> a = context.App(context.Config())

    >>> def factory():
    ...     print aSetting()
    ...     return S2()
    >>> a.config[S1] = factory
    >>> context.with_(a.swap(), lambda a: S1.get())
    42
    <S2 object ...>

    >>> a = context.App(context.Config())
    >>> a.config[S1] = factory
    >>> c = context.Config(a.config)
    >>> c[aSetting] = 999
    >>> context.with_(a.swap(), lambda a: context.with_(c, lambda c: S1.get()))
    42
    <S2 object ...>



Namespaces
==========

    >>> ns1 = context.namespace(aSetting)
    >>> ns1['foo']
    <function aSetting.foo ...>

    >>> foo = ns1['foo']
    >>> foo is ns1['foo']
    True

    >>> foo.bar
    <function aSetting.foo.bar ...>
    >>> foo.bar is foo['bar']
    True

    >>> 'spam' in foo
    False

    >>> 'bar' in foo
    True

    >>> list(foo)
    ['bar']

    >>> foo.bar.spam is foo['bar.spam']
    True

    >>> 'bar.spam' in foo
    True

    >>> def handler(cfg,key):
    ...     print "looking up", cfg, key
    ...     return key.__name__

    >>> context.Config.root[foo['*']] = handler

    >>> foo.baz()
    looking up <...context.Config object ...> <function aSetting.foo.baz ...>
    'aSetting.foo.baz'

    >>> foo.bar.bang()  # 2nd level rule lookup
    looking up <...context.Config ...> <function aSetting.foo.bar.bang ...>
    'aSetting.foo.bar.bang'

    >>> ns1.whee()
    Traceback (most recent call last):
      ...
    KeyError: <function aSetting.whee ...>

    >>> foo['really.really.long.name.needing.multiple.rules']()
    looking up ...foo.really.really.long.name.needing.multiple.rules ...
    'aSetting.foo.really.really.long.name.needing.multiple.rules'

    >>> sorted(foo)
    ['bar', 'baz', 'really']


Actions
=======

    >>> print context.Action.get()
    Traceback (most recent call last):
      ...
    RuntimeError: No Action is currently active

    >>> cfg = context.Config()
    >>> act = context.Action()

    >>> class TestResource(object):
    ...     def __enter__(self):
    ...         print "Setting up"
    ...         return self
    ...     def __exit__(self, typ, val, tb):
    ...         print "Tearing down", map(str,(typ,val,tb))

    >>> res = context.resource(lambda: TestResource())

    >>> res()
    Traceback (most recent call last):
      ...
    RuntimeError: No Action is currently active

    >>> act.__enter__()
    >>> context.Action.get() is act
    True

    >>> act.__enter__()
    Traceback (most recent call last):
      ...
    RuntimeError: Action is already in use

    >>> r1 = res()
    Setting up

    >>> r1
    <TestResource object...>

    >>> r2 = res()
    >>> r1 is r2
    True

    >>> act.__exit__(None,None,None)
    Tearing down ['None', 'None', 'None']

    >>> print context.Action.get()
    Traceback (most recent call last):
      ...
    RuntimeError: No Action is currently active

    >>> act.__exit__(None,None,None)
    Traceback (most recent call last):
      ...
    RuntimeError: Action is not currently in use

    >>> act.__enter__()
    >>> res()
    Setting up
    <TestResource object...>

    >>> r3 = res()
    >>> r4 = res()
    >>> r3 is r4
    True
    >>> r3 is r2
    False

    >>> act.__exit__(TypeError,TypeError("Foo"),None)
    Tearing down [...'exceptions.TypeError'..., 'Foo', 'None']

    >>> c1 = context.Config()
    >>> def my_factory():
    ...     if context.Config.get() is c1:
    ...         print "Factory running in c1"
    ...     return 42
    >>> c1[res] = my_factory
    >>> c1[res]
    <function my_factory ...>

    >>> act = context.Action(c1)
    >>> act.__enter__()
    >>> res()
    Factory running in c1
    42
    >>> res()
    42

    >>> class Failure(object):
    ...     def __enter__(self): raise RuntimeError("Foo!")
    ...     def __exit__(self,*exc):
    ...         raise AssertionError("This shouldn't get called!")

    >>> res2 = context.resource(lambda: Failure())
    >>> res2()
    Traceback (most recent call last):
      ...
    RuntimeError: Foo!

    >>> def recursive_factory(): return recursive_resource()
    >>> recursive_resource = context.resource(recursive_factory)
    >>> recursive_resource()    # doctest: +NORMALIZE_WHITESPACE
    Traceback (most recent call last):
      ...
    RuntimeError: Circular dependency for recursive_factory (via <function
    recursive_factory ...>)

    >>> act.__exit__(None,None,None)  # no __exit__ for failed __enter__




PEP 343 Implementation Tests
============================

An example context manager::

    >>> def demo_manager(value):
    ...     print "before"
    ...
    ...     try:
    ...         yield value
    ...         context.reraise()   # propagate error, if any
    ...     except TypeError, exc:
    ...         print "caught TypeError"
    ...     print "after"
    >>> demo_manager = context.manager(demo_manager)

Applied to a simple function usign ``with_()``::

    >>> def something(ob):
    ...     print "got", ob
    ...     return 42
    >>> context.with_(demo_manager(99), something)
    before
    got 99
    after
    42

Errors in the called function get passed to ``__exit__()`` and reraised::

    >>> def fail(v):
    ...     raise TypeError("foo")

    >>> context.with_(demo_manager(57), fail)
    before
    caught TypeError
    after

    >>> def fail(v):
    ...     raise KeyError("foo")
    >>> try:
    ...     print context.with_(demo_manager(57), fail)
    ... except KeyError:
    ...     print "KeyError escaped!"
    before
    KeyError escaped!

You don't have to decorate a new function to use ``call_with`` or ``with_()``::

    >>> def something(ob):
    ...     print "got", ob
    ...     return 42

Just wrap the context object, then pass the result the function to call::

    >>> context.call_with(demo_manager(None))(something)
    before
    got None
    after
    42

Or use ``with_()``, passing in the context and the function in one call::

    >>> context.with_(demo_manager(None),something)
    before
    got None
    after
    42

Finally, notice that ``__enter__`` may return a different object, which will be
passed in to the called function as its sole argument::

    >>> context.call_with(demo_manager(99))(something)
    before
    got 99
    after
    42

    >>> context.with_(demo_manager(99),something)
    before
    got 99
    after
    42


cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help