View of /PEAK/src/peak/binding/components.py
Parent Directory
| Revision Log
Revision:
2781 -
(
download)
(
as text)
Sun Nov 1 00:48:40 2015 UTC (8 years, 4 months ago) by
pje
File size: 26314 byte(s)
Fix component acquisition defaults and doc/test lookupComponent()
"""Basic binding tools"""
from __future__ import generators
from peak.api import *
from once import *
from once import _warnIfPermission
from interfaces import *
from attributes import *
from types import ModuleType
from peak.naming.names import toName, AbstractName, COMPOUND_KIND, IName
from peak.naming.syntax import PathSyntax
from peak.util.EigenData import AlreadyRead
from peak.config.interfaces import IConfigKey, IConfigurationRoot, \
NullConfigRoot, IConfigurable
from peak.config.registries import ImmutableConfig
from peak.util.imports import importString, whenImported
from peak.util.decorators import decorate
__all__ = [
'Component', 'Obtain', 'Require', 'Delegate', 'Configurable',
'getRootComponent', 'getParentComponent', 'lookupComponent',
'acquireComponent', 'notifyUponAssembly', 'PluginsFor', 'PluginKeys',
'getComponentName', 'getComponentPath', 'ComponentName', 'iterParents',
'hasParent',
]
from _once import BaseDescriptor
class _proxy(BaseDescriptor):
def __init__(self,attrName):
self.attrName = attrName
def usageError(self):
raise AttributeError, self.attrName
def computeValue(self,ob,d,a): raise AttributeError, a
def iterParents(component,max_depth=100):
"""Iterate over all parents of 'component', up to 'max_depth'"""
ct = max_depth
while component is not None:
yield component
ct -=1
if ct:
component = getParentComponent(component)
else:
raise RuntimeError("maximum recursion limit exceeded", component)
def hasParent(component,parent):
"""Is 'component' within the hierarchy of 'parent'?"""
for c in iterParents(component):
if c is parent:
return True
return False
def _setupCriterion(strategy):
global HasParentCriterion, dispatch_by_hierarchy
def dispatch_by_hierarchy(table, ob):
for comp in iterParents(ob):
oid = id(comp)
if oid in table:
return table[oid]
return table[None]
class HasParentCriterion(strategy.IdentityCriterion):
__slots__ = ()
ptr = strategy.IdentityCriterion.subject # alias
dispatch_function = staticmethod(dispatch_by_hierarchy)
matches = strategy.AbstractCriterion.matches.im_func
def __init__(self,component):
super(HasParentCriterion,self).__init__(strategy.Pointer(component))
def parent_criteria(self):
return [HasParentCriterion(p) for p in iterParents(self.ptr.ref())]
def __contains__(self,ob):
if ob is not None:
for comp in iterParents(ob.ref()):
if id(comp) == self.ptr:
return True
return False
def _setupParse(predicates):
[predicates.expressionSignature.when(
# matches 'hasParent(expr,Const)'
"expr in predicates.Call and expr.function == hasParent"
" and len(expr.argexprs)==2 and expr.argexprs[1] in predicates.Const"
)]
def convertHasParentToCriterion(expr,criterion):
typecheck = HasParentCriterion(expr.argexprs[1].value)
if not criterion.truth:
typecheck = ~typecheck
return dispatch.strategy.Signature([(expr.argexprs[0],typecheck)])
whenImported('dispatch.strategy',_setupCriterion)
whenImported('dispatch.predicates',_setupParse)
def getComponentPath(component, relativeTo=None):
"""Get 'ComponentName' that would traverse from 'relativeTo' to 'component'
If 'relativeTo' is 'None' or not supplied, the path returned is relative
to the root component of 'component'. Note that if supplied, 'relativeTo'
must be an ancestor (parent, parent's parent, etc.) of 'component'."""
path = []; root=None
if relativeTo is None:
root = getRootComponent(component)
for c in iterParents(component):
if c is root:
path.append(''); break
elif c is relativeTo:
break
path.append(getComponentName(c) or '*')
path.reverse()
return ComponentName(path)
[dispatch.on('component')]
def getParentComponent(component):
"""Return parent of 'component', or 'None' if unknown or non-component
This also works for module objects, and 'binding.ActiveClass' objects,
for which the containing module or package is returned.
This is a generic function, so you can add cases for additional object
types using 'binding.getParentComponent.when()' as a decorator.
"""
[getParentComponent.when(IComponent)]
def get_parent_of_node(component):
return component.getParentComponent()
[getParentComponent.when(ModuleType)]
def get_parent_of_module(component):
m = '.'.join(component.__name__.split('.')[:-1])
if m: return importString(m)
return None
[getParentComponent.when(ActiveClass)]
def get_parent_of_ActiveClass(component):
return component.__parent__[0]
[getParentComponent.when(object)]
def get_parent_of_object(component):
return None
[dispatch.on('component')]
def getComponentName(component):
"""Return name of 'component', or 'None' if root or non-component
This also works for module objects, and 'binding.ActiveClass' objects,
for which the module or class' '__name__' is returned.
This is a generic function, so you can add cases for additional object
types using 'binding.getComponentName.when()' as a decorator.
"""
[getComponentName.when(IComponent)]
def get_name_of_node(component):
return component.getComponentName()
[getComponentName.when(ModuleType)]
def get_name_of_module(component):
return component.__name__.split('.')[-1]
[getComponentName.when(ActiveClass)]
def get_name_of_ActiveClass(component):
return component.__cname__
[getComponentName.when(object)]
def get_name_of_object(component):
return None
def getRootComponent(component):
"""Return the root component of the tree 'component' belongs to"""
for component in iterParents(component):
pass
return component
def notifyUponAssembly(parent,child):
"""Call 'child.uponAssembly()' as soon as 'parent' knows all its parents"""
try:
nua = parent.notifyUponAssembly
except AttributeError:
parent = getParentComponent(parent)
if parent is None:
child.uponAssembly()
else:
notifyUponAssembly(parent,child)
else:
nua(child)
def acquireComponent(component, name, default=NOT_GIVEN):
"""Acquire 'name' relative to 'component', w/fallback to naming.lookup()
'name' is looked for as an attribute of 'component'. If not found,
the component's parent will be searched, and so on until the root component
is reached. If 'name' is still not found, and the root component
implements 'config.IConfigurationRoot', the name will be looked up in the
default naming context, if any. Otherwise, a 'NameNotFound' error will be
raised."""
prev = component
for target in iterParents(component):
ob = getattr(target, name, NOT_FOUND)
if ob is not NOT_FOUND:
return ob
prev = target
else:
return adapt(
prev, IConfigurationRoot, NullConfigRoot
).nameNotFound(
prev, name, component, default=default
)
class ComponentName(AbstractName):
"""Path between components
Component Path Syntax
Paths are '"/"' separated attribute names. Path segments of '"."' and
'".."' mean the same as they do in URLs. A leading '"/"' (or a
compound name beginning with an empty path segment), will be treated
as an "absolute path" relative to the component's root component.
Paths beginning with anything other than '"/"', '"./"', or '"../"' are
acquired, which means that the first path segment will be looked
up using 'acquireComponent()' before processing the rest of the path.
(See 'acquireComponent()' for more details.) If you do not want
a name to be acquired, simply prefix it with './' so it is relative
to the starting object.
All path segments after the first are interpreted as attribute names
to be looked up, beginning at the component referenced by the first
path segment. '.' and '..' are interpreted the same as for the first
path segment.
"""
nameKind = COMPOUND_KIND
syntax = PathSyntax(
direction = 1,
separator = '/',
)
protocols.advise(
instancesProvide=[IComponentKey]
)
def findComponent(self, component, default=NOT_GIVEN):
if not self: # empty name refers to self
return component
parts = iter(self)
attr = parts.next() # first part
pc = _getFirstPathComponent(attr)
if pc:
ob = pc(component)
elif default is NOT_GIVEN:
ob = acquireComponent(component, attr) # let the error happen
elif len(self)>1:
ob = acquireComponent(component, attr, NOT_FOUND)
else:
return acquireComponent(component, attr, default)
resolved = []
append = resolved.append
try:
for attr in parts:
if ob is NOT_FOUND: break
pc = _getNextPathComponent(attr)
if pc: ob = pc(ob)
else: ob = getattr(ob,attr)
append(attr)
else:
return ob
except AttributeError:
pass
if default is not NOT_GIVEN:
return default
raise exceptions.NameNotFound(
resolvedName = ComponentName(resolved),
remainingName = ComponentName([attr] + [a for a in parts]),
resolvedObj = ob
)
_getFirstPathComponent = dict( (
('', getRootComponent),
('.', lambda x:x),
('..', getParentComponent),
) ).get
_getNextPathComponent = dict( (
('', lambda x:x),
('.', lambda x:x),
('..', getParentComponent),
) ).get
def lookupComponent(component, name, default=NOT_GIVEN, adaptTo=None,
creationName=None, suggestParent=True):
"""Lookup 'name' as a component key relative to 'component'
'name' can be any object that implements or is adaptable to 'IComponentKey'.
Such objects include 'peak.naming' names, interface objects, property
names, and any custom objects you may create that implement 'IComponentKey'.
Strings will be converted to a URL, or to a 'ComponentName' if they have
no URL prefix. If the key cannot be found, an 'exceptions.NameNotFound'
error will be raised unless a 'default' other than 'NOT_GIVEN' is provided.
"""
result = adapt(name, IComponentKey).findComponent( component, default )
if adaptTo is not None:
result = adapt(result,adaptTo)
if suggestParent:
suggestParentComponent(component,creationName,result)
return result
# Declare that strings should be converted to names (with a default class
# of ComponentName), in order to use them as component keys
#
protocols.declareAdapter(
lambda ob: toName(ob, ComponentName, 1),
provides = [IComponentKey],
forTypes = [str, unicode],
)
class ConfigFinder(object):
"""Look up utilities or properties"""
__slots__ = 'ob'
protocols.advise(
instancesProvide = [IComponentKey],
asAdapterForProtocols = [IConfigKey]
)
def __init__(self, ob):
self.ob = ob
def findComponent(self, component, default=NOT_GIVEN):
return config.lookup(component, self.ob, default)
def __repr__(self):
return repr(self.ob)
class PluginKeys(object):
"""Component key that finds the keys of plugins matching a given key
Usage::
# get a sorted list of the keys to all 'foo.bar' plugins
pluginNames = binding.Obtain( binding.PluginKeys('foo.bar') )
# get an unsorted list of the keys to all 'foo.bar' plugins
pluginNames = binding.Obtain(
binding.PluginKeys('foo.bar', sortBy=None)
)
'sortBy' is either a false value or a callable that will be applied to
each key to get a value for sorting purposes. If set to a false value,
the keys will be in the same order as yielded by 'config.iterKeys()'.
'sortBy' defaults to 'str', which means the keys will be sorted based
on their string form.
"""
protocols.advise(
instancesProvide = [IComponentKey],
)
def __init__(self, configKey, sortBy=str):
self.configKey = adapt(configKey, IConfigKey)
self.sortBy = sortBy
def findComponent(self, component, default=NOT_GIVEN):
keys = config.iterKeys(component, self.configKey)
if self.sortBy:
sortBy = self.sortBy
keys = [(sortBy(k),k) for k in keys]
keys.sort()
return [k for (sortedBy,k) in keys]
return list(keys)
class PluginsFor(PluginKeys):
"""Component key that finds plugins matching a configuration key
Usage::
# get a list of 'my.plugins.X' plugins, sorted by property name
myPlugins = binding.Obtain( binding.PluginsFor('my.plugins') )
# get an unsorted list of all 'foo.bar' plugins
myPlugins = binding.Obtain(
binding.PluginsFor('foo.bar', sortKeys=False)
)
This key type works similarly to 'PluginKeys()', except that it returns the
plugins themselves, rather than their configuration keys.
'sortBy' is either a false value or a callable that will be applied to
each plugin's key to get a value for sorting purposes. If set to a false
value, plugins will be in the same order as their keys are yielded by
'config.iterKeys()'. 'sortBy' defaults to 'str', which means the plugins
will be sorted based on the string form of the keys used to retrieve them.
"""
def findComponent(self, component, default=NOT_GIVEN):
keys = super(PluginsFor,self).findComponent(component)
return [adapt(k,IComponentKey).findComponent(component) for k in keys]
class Obtain(Attribute):
"""'Obtain(componentKey,[default=value])' - finds/caches a needed component
Usage examples::
class someClass(binding.Component):
thingINeed = binding.Obtain("path/to/service")
otherThing = binding.Obtain(IOtherThing)
aProperty = binding.Obtain(PropertyName('some.prop'), default=42)
'someClass' instances can then refer to their attributes, such as
'self.thingINeed', instead of repeatedly calling
'self.lookupComponent(someKey)'.
The initial argument to the 'Obtain' constructor must be adaptable to
'binding.IComponentKey'. If a 'default' keyword argument is supplied,
it will be used as the default in case the specified component key is not
found.
XXX need to document IComponentKey translations somewhere... probably
w/IComponentKey"""
default = NOT_GIVEN
targetName = None
def __init__(self,targetName,metadata=None,**kw):
self.targetName = adapt(targetName, IComponentKey)
kw['metadata']=metadata
_warnIfPermission(kw)
super(Obtain,self).__init__(**kw)
def computeValue(self, obj, instanceDict, attrName):
return self.targetName.findComponent(obj, self.default)
def __repr__(self):
if self.__doc__:
return "binding.Obtain(%r):\n\n%s" % (self.targetName,self.__doc__)
else:
return "binding.Obtain(%r)" % (self.targetName,)
class SequenceFinder(object):
"""Look up sequences of component keys"""
__slots__ = 'ob'
protocols.advise(
instancesProvide = [IComponentKey],
asAdapterForProtocols = [protocols.sequenceOf(IComponentKey)]
)
def __init__(self, ob):
self.ob = ob
def findComponent(self, component, default=NOT_GIVEN):
return tuple([ob.findComponent(component, default) for ob in self.ob])
class Delegate(Make):
"""Delegate attribute to the same attribute of another object
Usage::
class PasswordFile(binding.Component):
shadow = binding.Obtain('config:etc.shadow/')
checkPwd = changePwd = binding.Delegate('shadow')
The above is equivalent to this longer version::
class PasswordFile(binding.Component):
shadow = binding.Obtain('config:etc.shadow/')
checkPwd = binding.Obtain('shadow/checkPwd')
changePwd = binding.Obtain('shadow/changePwd')
Because 'Delegate' uses the attribute name being looked up, you do not
need to create a separate binding for each attribute that is delegated,
as you do when using 'Obtain()'."""
delegateAttr = None
def __init__(self, delegateAttr, metadata=None, **kw):
def delegate(s,d,a):
return getattr(getattr(s,delegateAttr),a)
kw['metadata']=metadata
_warnIfPermission(kw)
super(Delegate,self).__init__(delegate,delegateAttr=delegateAttr,**kw)
def __repr__(self):
if self.__doc__:
return "binding.Delegate(%r):\n\n%s" % (
self.delegateAttr,self.__doc__
)
else:
return "binding.Delegate(%r)" % (self.delegateAttr,)
class Require(Attribute):
"""Placeholder for a binding that should be (re)defined by a subclass"""
description = ''
def __init__(self, description="", metadata=None, **kw):
kw['description'] = description
kw['metadata']=metadata
_warnIfPermission(kw)
super(Require,self).__init__(**kw)
def computeValue(self, obj, instanceDict, attrName):
raise NameError("Class %s must define %s; %s"
% (obj.__class__.__name__, attrName, self.description)
)
def __repr__(self):
if self.__doc__:
return "binding.Require(%r):\n\n%s" % (
self.description,self.__doc__
)
else:
return "binding.Require(%r)" % (self.description,)
class _Base(object):
"""Basic attribute management and "active class" support"""
__metaclass__ = ActiveClass
protocols.advise(
instancesProvide = [IBindableAttrs]
)
def _setBinding(self, attr, value, useSlot=False):
self._bindingChanging(attr,value,useSlot)
if useSlot:
getattr(self.__class__,attr).__set__(self,value)
else:
self.__dict__[attr] = value
def _getBinding(self, attr, default=None, useSlot=False):
if useSlot:
val = getattr(self,attr,default)
else:
val = self.__dict__.get(attr,default)
if val is not default:
val = self._postGet(attr,val,useSlot)
if val is NOT_FOUND:
return default
return val
def _getBindingFuncs(klass, attr, useSlot=False):
if useSlot:
d = getattr(klass,attr)
else:
d = _proxy(attr)
return d.__get__, d.__set__, d.__delete__
_getBindingFuncs = classmethod(_getBindingFuncs)
def _delBinding(self, attr, useSlot=False):
self._bindingChanging(attr, NOT_FOUND, useSlot)
if useSlot:
d = getattr(self.__class__,attr).__delete__
try:
d(self)
except AttributeError:
pass
elif attr in self.__dict__:
del self.__dict__[attr]
def _hasBinding(self,attr,useSlot=False):
if useSlot:
return hasattr(self,attr)
else:
return attr in self.__dict__
def _bindingChanging(self,attr,newval,isSlot=False):
pass
def _postGet(self,attr,value,isSlot=False):
return value
class Component(_Base):
"""Thing that can be composed into a component tree, w/binding & lookups"""
protocols.advise(
classProvides = [IComponentFactory],
instancesProvide = [IComponent]
)
def __init__(self, parentComponent=NOT_GIVEN, componentName=None, **kw):
# Set up keywords first, so state is sensible
if kw:
initAttrs(self,kw.iteritems())
# set our parent component and possibly invoke assembly events
if parentComponent is not NOT_GIVEN or componentName is not None:
self.setParentComponent(parentComponent,componentName)
lookupComponent = lookupComponent
decorate(classmethod)
def fromZConfig(klass, section):
"""Classmethod: Create an instance from a ZConfig 'section'"""
# ZConfig uses unicode for keys and defaults unsupplied values to None
data = dict([(str(k),v) for k,v in section.__dict__.items()
if v is not None])
for skip in '_name','_matcher','_attributes':
if skip in data and not hasattr(klass,skip):
del data[skip]
return klass(**data)
def setParentComponent(self, parentComponent, componentName=None,
suggest=False):
pc = self.__parentSetting
if pc is NOT_GIVEN:
self.__parentSetting = parentComponent
self.__componentName = componentName
self.__parentComponent # lock and invoke assembly events
return
elif suggest:
return
raise AlreadyRead(
"Component %r already has parent %r; tried to set %r"
% (self,pc,parentComponent)
)
__parentSetting = NOT_GIVEN
__componentName = None
def __parentComponent(self,d,a):
parent = self.__parentSetting
if parent is NOT_GIVEN:
parent = self.__parentSetting = None
d[a] = parent
if parent is None:
self.uponAssembly()
elif (self.__class__.__attrsToBeAssembled__
or self._getBinding('__objectsToBeAssembled__')):
notifyUponAssembly(parent,self)
return parent
__parentComponent = Make(__parentComponent, suggestParent=False)
def getParentComponent(self):
return self.__parentComponent
def getComponentName(self):
return self.__componentName
def _configKeysMatching(self, configKey):
"""Iterable over defined keys that match 'configKey'
A key 'k' in the map is considered to "match" 'configKey' if any of
'k.parentKeys()' are listed as keys in 'configKey.registrationKeys()'.
You must not change the configuration map while iterating over the
keys. Also, keep in mind that only explicitly-registered keys are
returned; for instance, load-on-demand rules will only show up as
wildcard keys."""
yielded = {}
for cMap in self._config_maps():
for key in cMap._configKeysMatching(configKey):
if key in yielded:
continue
yield key
yielded[key] = 1
def _config_maps(self):
return [self.__class__.__class_offers__]
def _getConfigData(self, forObj, configKey):
attr = self.__class__.__class_offers__.lookup(configKey)
if attr:
return getattr(self, attr, NOT_FOUND)
return NOT_FOUND
def __class_offers__(klass,d,a):
return ImmutableConfig(
baseMaps = getInheritedRegistries(klass, '__class_offers__'),
items = [(adapt(key,IConfigKey), attrName)
for attrName, descr in klass.__class_descriptors__.items()
for key in getattr(descr,'offerAs',())
]
)
__class_offers__ = classAttr(Make(__class_offers__))
def notifyUponAssembly(self,child):
tba = self.__objectsToBeAssembled__
if tba is None:
child.uponAssembly() # assembly has already occurred
else:
tba.append(child) # save reference to child for callback
if (len(tba)==1 and self.__parentSetting is not NOT_GIVEN
and len(tba)==1 and not self.__class__.__attrsToBeAssembled__
):
# Make sure our parent calls us, since we need to call a
# child now, but would not have been registered ourselves.
notifyUponAssembly(self.getParentComponent(),self)
def uponAssembly(self):
"""Don't override this unless you can handle the reentrancy issues!"""
tba = self.__objectsToBeAssembled__
if tba is None:
return
self.__objectsToBeAssembled__ = None
try:
while tba:
ob = tba.pop()
try:
ob.uponAssembly()
except:
tba.append(ob)
raise
for attr in self.__class__.__attrsToBeAssembled__:
getattr(self,attr)
except:
self.__objectsToBeAssembled__ = tba
raise
__objectsToBeAssembled__ = Make(list)
def __attrsToBeAssembled__(klass,d,a):
aa = {}
map(aa.update, getInheritedRegistries(klass, '__attrsToBeAssembled__'))
for attrName, descr in klass.__class_descriptors__.items():
notify = getattr(descr,'uponAssembly',False)
if notify: aa[attrName] = True
return aa
__attrsToBeAssembled__ = classAttr(Make(__attrsToBeAssembled__))
class Configurable(Component):
protocols.advise(
instancesProvide = [IConfigurable]
)
__instance_offers__ = Make(
'peak.config.config_components:ConfigMap', offerAs=[IConfigurable]
)
def _getConfigData(self, forObj, configKey):
value = self.__instance_offers__._getConfigData(forObj, configKey)
if value is not NOT_FOUND:
return value
attr = self.__class__.__class_offers__.lookup(configKey)
if attr:
return getattr(self, attr, NOT_FOUND)
return NOT_FOUND
registerProvider = Delegate('__instance_offers__')
def _config_maps(self):
return [self.__class__.__class_offers__, self.__instance_offers__]