View of /PEAK/src/peak/config/config_components.py
Parent Directory
| Revision Log
Revision:
1848 -
(
download)
(
as text)
Mon Oct 11 21:38:07 2004 UTC (19 years, 6 months ago) by
pje
File size: 16401 byte(s)
Add more aggressive caching to 'config.Namespace()'. Add 'findLocation()'
to 'web.ITraversalContext' (without an implementation, as yet).
from __future__ import generators
from peak.api import *
from peak.util.imports import importString, importObject, whenImported
from peak.binding.components import Component, Make, getParentComponent
from peak.binding.components import iterParents, Configurable, Require
from peak.binding.interfaces import IAttachable, IRecipe
from peak.naming.interfaces import IStreamFactory
from peak.util.EigenData import EigenCell,AlreadyRead
from peak.util.FileParsing import AbstractConfigParser
from registries import FactoryFor
from interfaces import *
from protocols.advice import getMRO, determineMetaclass
__all__ = [
'ConfigMap', 'LazyRule', 'fileNearModule', 'packageFile', 'IniLoader',
'Value', 'iterKeys', 'Namespace', 'iterValues',
'CreateViaFactory', 'parentsProviding', 'parentProviding', 'lookup',
'ServiceArea',
]
def _setCellInDict(d,key,value):
cell = d.get(key)
if cell is None:
cell = d[key] = EigenCell()
cell.set(value)
_emptyRuleCell = EigenCell()
_emptyRuleCell.set(lambda *args: NOT_FOUND)
_emptyRuleCell.exists()
def fileNearModule(moduleName,filename):
"""DEPRECATED: please switch to 'config.packageFile()' or a URL"""
filebase = importString(moduleName+':__file__')
import os; return os.path.join(os.path.dirname(filebase), filename)
def packageFile(moduleName,filename):
"""Return 'naming.IStreamFactory' for 'filename' in 'moduleName' package"""
module = importString(moduleName)
if hasattr(module,'__loader__') and hasattr(module.__loader__,'get_data'):
from peak.naming.factories.openable import ImportLoaderFactory
return ImportLoaderFactory(module.__loader__,moduleName,filename)
from peak.naming.factories.openable import FileFactory
import os.path
return FileFactory(
filename=os.path.join(
os.path.dirname(module.__file__), *filename.split('/')
)
)
class StreamSource(protocols.Adapter):
protocols.advise(
instancesProvide=[IStreamSource],
asAdapterForTypes=[str,unicode]
)
def getFactory(self, context):
from peak.naming.factories.openable import FileFactory,FileURL
try:
url = FileURL.fromFilename(self.subject)
except exceptions.InvalidName:
url = naming.toName(self.subject, FileURL.fromFilename)
if isinstance(url,FileURL):
return FileFactory(filename=url.getFilename())
return naming.lookup(context,url)
class FactorySource(protocols.Adapter):
protocols.advise(
instancesProvide=[IStreamSource],
asAdapterForProtocols=[IStreamFactory]
)
def getFactory(self, context):
return self.subject
def iterValues(component, configKey):
"""Return iterator over all values of'configKey' for 'component'"""
forObj = component
configKey = adapt(configKey,IConfigKey)
for component in iterParents(component):
try:
gcd = component._getConfigData
except AttributeError:
continue
value = gcd(forObj, configKey)
if value is not NOT_FOUND:
yield value
adapt(
component,IConfigurationRoot,NullConfigRoot
).noMoreValues(component, configKey, forObj)
def lookup(component, configKey, default=NOT_GIVEN):
"""Return value for 'configKey' in context of 'component', or 'default'"""
for value in iterValues(component, configKey):
return value
if default is NOT_GIVEN:
raise exceptions.NameNotFound(configKey, resolvedObj = component)
return default
def parentsProviding(component, protocol):
"""Iterate over all parents of 'component' that adapt to 'protocol'"""
for parent in iterParents(component):
c = adapt(parent,protocol,None)
if c is not None:
yield c
def parentProviding(component, protocol, default=NOT_GIVEN):
"""Return first parent providing 'protocol' for 'component', or 'default'"""
for u in parentsProviding(component, protocol):
return u
if default is NOT_GIVEN:
raise exceptions.NameNotFound(protocol, resolvedObj = component)
return default
def iterKeys(component, configKey):
"""Iterate sub-keys of 'configKey' that are available from 'component'"""
yielded = {}
for ob in parentsProviding(component,IConfigSource):
for key in ob._configKeysMatching(configKey):
if key in yielded:
continue
yielded[key] = 1
yield key
class CreateViaFactory(object):
"""'IRule' for one-time creation of target interface using FactoryFor()"""
protocols.advise(
classProvides=[IRule]
)
__slots__ = 'configKey'
def __init__(self,configKey):
self.configKey = adapt(configKey,IConfigKey)
def __call__(self, propertyMap, configKey, targetObj):
serviceArea = parentProviding(targetObj, IServiceArea)
def create():
factory = lookup(serviceArea, FactoryFor(self.configKey))
if factory is NOT_FOUND:
return factory
instance = factory()
binding.suggestParentComponent(serviceArea, None, instance)
return instance
return serviceArea.getService(self.configKey, create)
class ConfigMap(Component):
rules = depth = keyIndex = lockedNamespaces = Make(dict)
protocols.advise(
instancesProvide=[IConfigurable]
)
def registerProvider(self, configKey, provider):
"""Register 'provider' under 'configKey'"""
for key,depth in adapt(configKey, IConfigKey).registrationKeys():
if self.depth.get(key,depth)>=depth:
# The new provider is at least as good as the one we have
lockedNamespaces = self.lockedNamespaces
ckey = adapt(key, IConfigKey)
for k in ckey.parentKeys():
if k in lockedNamespaces:
raise AlreadyRead(
"A namespace containing %r "
"has already been examined" % (configKey,)
)
for k in ckey.parentKeys():
self.keyIndex.setdefault(k,{})[ckey] = True
_setCellInDict(self.rules, key, provider)
self.depth[key]=depth
def _configKeysMatching(self, configKey):
"""Iterable over defined keys that match 'configKey'
A key 'k' in the map is considered to "match" 'configKey' if any of
'k.parentKeys()' are listed as keys in 'configKey.registrationKeys()'.
You must not change the configuration map while iterating over the
keys. Also, keep in mind that only explicitly-registered keys are
returned; for instance, load-on-demand rules will only show up as
wildcard keys."""
index = self._getBinding('keyIndex')
if not index:
return
for key,depth in adapt(configKey,IConfigKey).registrationKeys():
self.lockedNamespaces[key] = True
for k in index.get(key,()):
yield k
def _getConfigData(self, forObj, configKey):
"""Look up the requested value"""
rules = self.rules
value = NOT_FOUND
xRules = []
for name in configKey.lookupKeys():
rule = rules.get(name)
if rule is None:
xRules.append(name) # track unspecified rules
elif rule is not _emptyRuleCell:
value = rule.get()(self, configKey, forObj)
if value is not NOT_FOUND:
break
# ensure that unspecified rules stay that way, if they
# haven't been replaced in the meanwhile by a higher-level
# wildcard rule
for name in xRules:
rules.setdefault(name,_emptyRuleCell)
return value
def Value(v):
"""Return an 'IRule' that always returns 'v'"""
return lambda *args: v
class LazyRule(object):
loadNeeded = True
def __init__(self, loadFunc, prefix='*', **kw):
self.load = loadFunc
self.prefix = prefix
self.__dict__.update(kw)
def __call__(self, propertyMap, propName, targetObj):
if self.loadNeeded:
try:
self.loadNeeded = False
return self.load(propertyMap, self.prefix, propName)
except:
del self.loadNeeded
raise
return NOT_FOUND
from peak.naming.interfaces import IState
class NamingStateAsSmartProperty(protocols.Adapter):
protocols.advise(
instancesProvide = [ISmartProperty],
asAdapterForProtocols = [IState],
)
def computeProperty(self, propertyMap, name, prefix, suffix, targetObject):
from peak.naming.factories.config_ctx import PropertyPath
from peak.naming.factories.config_ctx import PropertyContext
ctx = PropertyContext(targetObject,
creationParent = targetObject,
nameInContext = PropertyPath(prefix[:-1]), # strip any trailing '.'
)
result = self.subject.restore(ctx, PropertyPath(suffix))
rule = adapt(result, ISmartProperty, None)
if rule is not None:
result = rule.computeProperty(
propertyMap, name, prefix, suffix, targetObject
)
return result
class IniLoader(Configurable):
"""Component that lazily loads its configuration from .ini file(s)"""
protocols.advise(
classProvides=[naming.IObjectFactory],
)
def __instance_offers__(self,d,a):
pm = d[a] = ConfigMap(self)
self.setupDefaults(pm)
return pm
__instance_offers__ = Make(__instance_offers__,
offerAs=[IConfigurable], uponAssembly = True
)
iniFiles = Require("Sequence of filenames/URLs/factories to load")
def setupDefaults(self, propertyMap):
"""Set up 'propertyMap' with default contents loaded from 'iniFiles'"""
for file in self.iniFiles:
if isinstance(file,tuple):
# XXX do we really want to continue supporting this, now that
# XXX you can call pkgFile directly?
file = packageFile(*file)
config.loadConfigFile(propertyMap, file)
def getObjectInstance(klass, context, refInfo, name, attrs=None):
return klass(iniFiles = refInfo.addresses)
getObjectInstance = classmethod(getObjectInstance)
class ServiceArea(Configurable):
"""Component that acts as a home for "global"-ish services"""
protocols.advise(instancesProvide=[IServiceArea])
__services = binding.Make('peak.util.EigenData:EigenDict')
def getService(self,ruleKey,factory):
return self.__services.get(ruleKey,NOT_FOUND,factory=factory)
class ConfigurationRoot(IniLoader, ServiceArea):
"""Default implementation for a configuration root.
If you think you want to subclass this, you're probably wrong. Note that
you can have whatever setup code you want, called automatically from .ini
files loaded by this class. We recommend you try that approach first."""
protocols.advise(instancesProvide=[IConfigurationRoot])
iniFiles = ( packageFile('peak','peak.ini'), )
def noMoreValues(self,root,configKey,forObj):
pass
def nameNotFound(self,root,name,forObj):
return naming.lookup(forObj, name, creationParent=forObj)
class Namespace(object):
"""Traverse to another property namespace
Use this in .ini files (e.g. '__main__.* = config.Namespace("environ.*")')
to create a rule that looks up undefined properties in another property
namespace.
Or, use this as a way to treat a property namespace as a mapping object::
myNS = config.Namespace("some.prefix", aComponent)
myNS['spam.bayes'] # property 'some.prefix.spam.bayes'
myNS.get('something',default) # property 'some.prefix.something'
Or use this in a component class to allow traversing to a property space::
class MyClass(binding.Component):
appConfig = binding.Make(
config.Namespace('MyClass.conf')
)
something = binding.Obtain('appConfig/foo.bar.baz')
In the example above, 'something' will be the component's value for the
property 'MyClass.conf.foo.bar.baz'. Note that you may not traverse to
names beginning with an '_', and traversing to the name 'get' will give you
the namespace's 'get' method, not the 'get' property in the namespace. To
obtain the 'get' property, or properties beginning with '_', you must use
the mapping style of access, as shown above.
NOTE: By default, 'Namespace' instances cache every key that's looked up in
them. If you are holding a reference to a namespace, and you expect an
unbounded number of potential lookups, do not want references held to the
results, or are looking up dynamically changing or dynamically created
properties, you should disable caching via the 'cache=False' keyword arg."""
def __init__(self, prefix, target=NOT_GIVEN, cache=True):
self._prefix = PropertyName(prefix).asPrefix()
self._target = target
self._cache = cache; self._data = {}
def __call__(self, suffix):
"""Return a sub-namespace for 'suffix'"""
return self.__class__(
PropertyName.fromString(self._prefix+suffix),self._target
)
def __getattr__(self, attr):
if not attr.startswith('_'):
ob = self.get(attr, NOT_FOUND)
if ob is not NOT_FOUND:
if self._cache and not hasattr(self.__class__,attr):
setattr(self,attr,ob) # Cache for future use
return ob
raise AttributeError,attr
def __getitem__(self, key):
"""Return the value of property 'key' within this namespace"""
ob = self.get(key,NOT_FOUND)
if ob is not NOT_FOUND:
return ob
raise KeyError,key
def get(self,key,default=None):
"""Return property 'key' within this namespace, or 'default'"""
if self._target is not NOT_GIVEN:
if key in self._data:
return self._data[key]
result = lookup(
self._target,PropertyName.fromString(self._prefix+key),default
)
if self._cache and result is not default:
self._data[key] = result
return result
return default
def __repr__(self):
return "config.Namespace(%r,%r)" % (self._prefix,self._target)
def keys(self):
items = []
if self._target is not NOT_GIVEN:
prel = len(self._prefix)
append = items.append
yielded = {}
for key in iterKeys(self._target,self._prefix+'*'):
key = key[prel:]
if key.endswith('?'):
key = key[:-1]
elif key.endswith('*'):
continue
if key not in yielded:
append(key)
yielded[key]=1
return items
class __NamespaceExtensions(protocols.Adapter):
protocols.advise(
instancesProvide = [ISmartProperty, IAttachable, IRecipe],
asAdapterForTypes = [Namespace]
)
def computeProperty(self, propertyMap, name, prefix, suffix, targetObject):
return config.lookup(
targetObject, self.subject._prefix+suffix, default=NOT_FOUND
)
def setParentComponent(self, parentComponent, componentName=None,
suggest=False
):
pc = self.subject._target
if pc is NOT_GIVEN:
self.subject._target = parentComponent
return
elif suggest:
return
raise AlreadyRead(
"%r already has target %r; tried to set %r"
% (self.subject,pc,parentComponent)
)
def __call__(self,client,instDict,attrName):
subject = self.subject
return subject.__class__(subject._prefix[:-1], client, subject._cache)