[Subversion] / EccoChemistry / ecco_chemistry.py  

View of /EccoChemistry/ecco_chemistry.py

Parent Directory | Revision Log
Revision: 2612 - (download) (as text)
Fri May 28 06:24:09 2010 UTC (13 years, 11 months ago) by pje
File size: 18649 byte(s)
Add .depth attribute to folders, iter/contains on folder types
(so you can e.g. find all numeric folders or all date folders)
from ecco_dde import *
from peak.util.decorators import decorate, classy
import datetime, operator
from decimal import Decimal

Ecco = EccoDDE()

__all__ = [
    'Ecco', 'Item', 'CheckmarkFolder', 'TextFolder', 'PopupFolder',
    'DateFolder', 'NumericFolder', 'Folder', 'Parent', 'Children'
]

def intersect(first, second, *rest):
    ok = set(second)
    for s in rest:
        ok &= set(s)
    for item in first:
        if item in ok:
            yield item

def union(*sequences):
    seen = {}
    for seq in sequences:
        for item in seq:
            if item not in seen:
                seen[item] = 1
                yield item














class _ItemChildren(object):
    """Container for an item's children"""

    def __init__(self, itemtype, parentid, depth=1):
        self.itemtype = itemtype
        self.parentid = parentid
        self.depth = depth

    def __iter__(self):
        for depth, id in Ecco.GetItemSubs(self.parentid, self.depth):
            cls = _find_item_subclass(self.itemtype, id)
            if cls is not None: yield cls(id, __class__=cls)        

    def __nonzero__(self):
        for sub in self: return True
        return False

    def __len__(self):
        return len(list(iter(self)))    # iter prevents recursion

    def __contains__(self, item):
        if isinstance(item, self.itemtype):
            parents = Ecco.GetItemParents(int(item))
            return self.parentid in parents[:-self.depth or len(parents)]
        return False

    def extend(self, items):
        items = map(int, items)[::-1]
        subs = Ecco.GetItemSubs(self.parentid, 1)
        if subs:
            Ecco.InsertItem(subs[-1][1], items, InsertLevel.Same)
        else:
            Ecco.InsertItem(self.parentid, items)

    def append(self, item):
        self.extend([item])

    def prepend(self, item):
        Ecco.InsertItem(self.parentid, int(item))


class Children(object):
    """Property for parent item of a given type"""

    def __init__(self, itemtype=None, depth=1):
        assert itemtype is None or isinstance(itemtype, ItemClass)
        self.itemtype = itemtype
        self.depth = depth

    def __get__(self, ob, typ):
        if ob is None:
            return self
        return _ItemChildren(self.itemtype or typ, int(ob), self.depth)
        
    def __set__(self, ob, value):
        value = list(value)[::-1]
        # unlink children
        Ecco.InsertItem(0, [
            int(i) for i in _ItemChildren(self.itemtype, int(ob), 1) if i!=ob
        ])
        if value:
            it = self.itemtype or type(ob)
            Ecco.InsertItem(int(ob), [int(it(i)) for i in value if i!=ob])

    def __delete__(self, ob):
        self.__set__(ob, ())
















class Parent(object):
    """Property for parent item of a given type"""

    def __init__(self, itemtype=None):
        assert itemtype is None or isinstance(itemtype, ItemClass)
        self.itemtype = itemtype

    def __get__(self, ob, typ):
        if ob is None:
            return self
        parents = Ecco.GetItemParents(int(ob))
        if parents:
            cls = _find_item_subclass(self.itemtype or typ, parents[-1])
            if cls:
                return cls(parents[-1], __class__ = cls)
        return None

    def __set__(self, ob, value):
        if value is None:
            parent = 0
        else:
            assert self.itemtype is None or isinstance(value, self.itemtype)
            parent = int(value)
        Ecco.InsertItem(parent, int(ob))

    def __delete__(self, ob):
        self.__set__(ob, None)














class ItemClass(type(classy)):
    """General item class"""

    __container__ = None
    
    def _query(self, *criteria):
        return self.__container__._query(*criteria)

    def __iter__(self):
        return iter(self._query())

    def startswith(self, value):
        return self._query("IB", value)

    def with_text(self, value):
        return self._query("IC", value)

    def without_text(self, value):
        return self._query("IN", value)

    def __pos__(self):
        return self._query("ia")

    def __neg__(self):
        return self._query("id")


_folder_bits = {}
_folder_bit = 1

def _folder_mask(fid):
    try:
        return _folder_bits[fid]
    except KeyError:
        global _folder_bit
        result = _folder_bits[fid] = _folder_bit
        _folder_bit <<= 1
        return result



class Item(classy, int):
    """Base class for Ecco items"""
    __metaclass__ = ItemClass
    __slots__ = ()  # XXX should keep in subclasses
    def __new__(cls, id_or_text, **kw):
        vals = attrs = ()
        if isinstance(id_or_text, basestring):
            d = cls.default_values.copy()
            d.update(kw)
            if d: vals, attrs, extra = cls._attrvalues(d)
            cls = _find_item_subclass(cls, None, vals, True)
            id_or_text = Ecco.CreateItem(id_or_text,vals)
        else:
            if kw: vals, attrs, extra = cls._attrvalues(kw)
            if '__class__' in kw:
                cls = kw.pop('__class__')   # fast path for collections
            else:
                cls = _find_item_subclass(cls, id_or_text, vals, True)
            if vals: Ecco.SetFolderValues(id_or_text, *zip(*vals))
        self = super(Item, cls).__new__(cls, id_or_text)
        if attrs:
            for k, v in attrs: setattr(self, k, v)
        return self

    required_values = dict()
    default_values = dict()
    id = property(int)

    def _check_fields():
        return True

    def text(self):
        return Ecco.GetItemText(int(self))
    def _set_text(self, value):
        Ecco.SetItemText(int(self), value)

    text = property(text, _set_text)

    def __repr__(self):
        return "%s(%d)" % (self.__class__.__name__, int(self))

    def __class_init__(cls, name, bases, cdict, supr):
        supr()(cls, name, bases, cdict, supr)
        defaults = cls.required_values.copy()
        required = {}
        for base in bases[::-1]:
            if isinstance(base, ItemClass):
                defaults.update(base.default_values)
                required.update(base.required_values)

        defaults.update(cls.default_values)
        cls.default_values = defaults
        required.update(cls.required_values)
        if cls.__container__:
            required.setdefault('__container__', None)
            if isinstance(cls.__container__.folder, CheckmarkFolder):
                defaults.setdefault('__container__',True)

        vals, attrs, extra = cls._attrvalues(required)
        assert not attrs    # XXX error message
        cls._exclusion_mask = 0
        required = cls._required_values = dict(vals)
        for f, v in extra:
            if v is False:
                cls._exclusion_mask |= _folder_mask(f)
                del required[f]
            elif v is None:
                required[f] = None
            elif v is True and cls.__container__ is None:
                cls.__container__ = Folder(f)

        cls._folder_mask = reduce(operator.or_, map(_folder_mask, required), 0)
        checker = cls._check_fields.im_func  # XXX error handling
        decoders = []
        code = checker.func_code
        names = code.co_varnames[:code.co_argcount]
        assert not checker.func_defaults    # XXX error message
        for attr, default in zip(names, defaults):
            folder = getattr(cls, attr).folder  # XXX error handling
            _folder_mask(folder.id) # ensure the value will be retrieved
            decoders.append((folder.id, folder.decode))

        def _validate_fields(values):
            return True

        if decoders:
            def _validate_fields(values, decoders=decoders):
                vget = values.get
                return checker(*[d(vget(f)) for f,d in decoders])

        cls._validate_fields = staticmethod(_validate_fields)

    decorate(classmethod)
    def _attrvalues(cls, d):
        vals, attrs, original = [], [], []
        for k, v in d.items():
            if not hasattr(cls, k):
                raise TypeError("No such attribute: ", k)
            descr = getattr(cls, k)
            if isinstance(descr, Container):
                fid = descr.folder.id
                vals.append((fid, descr.encode(v)))
                original.append((fid, v))
            else:
                attrs.append((k, v))
        return vals, attrs, original

    def update(self, **kw):
        """Set multiple attributes at once"""
        vals, attrs, extra = self._attrvalues(kw)
        if vals: Ecco.SetFolderValues(int(self), *zip(*vals))
        for k, v in attrs: setattr(self, k, v)











    decorate(classmethod)
    def upgrade(cls, itemid, **kw):
        """Upgrade `itemid` to this class by initializing required values"""
        fids = dict.fromkeys(Ecco.GetItemFolders(itemid))
        d = cls.default_values.copy()
        for k, v in d.items():
            if v is not None:
                descr = getattr(cls, k)
                if isinstance(descr, Container) and descr.folder.id in fids:
                    del d[k]
        d.update(kw)
        return cls(itemid, **d)

    parent = Parent()
    children = Children()
    all_children = Children(depth=0)

























class Container(object):
    """Find items in a given folder/itemtype"""
    def __init__(self, itemtype, folder, criteria=()):
        self.folder = folder
        self.itemtype = itemtype
        self.encode = self.folder.encode
        self.criteria = criteria

    def _query(self, *criteria):
        return Container(self.itemtype, self.folder, self.criteria+criteria)

    def __iter__(self):
        for id in Ecco.GetFolderItems(self.folder.id, *self.criteria):
            cls = _find_item_subclass(self.itemtype, id)
            if cls is not None: yield cls(id, __class__=cls)

    def __gt__(self, value):
        return self._query("GT", self.encode(value))
    def __ge__(self, value):
        return self._query("GE", self.encode(value))
    def __lt__(self, value):
        return self._query("LT", self.encode(value))
    def __le__(self, value):
        return self._query("LE", self.encode(value))
    def __eq__(self, value):
        return self._query("EQ", self.encode(value))
    def __ne__(self, value):
        return self._query("NE", self.encode(value))

    def startswith(self, value):
        return self._query("TB", value)
    def with_text(self, value):
        return self._query("TC", value)
    def without_text(self, value):
        return self._query("TN", value)
    def __pos__(self):
        return self._query("va")
    def __neg__(self):
        return self._query("vd")


    def __repr__(self):
        return "Container(%s, %r, %r)" % (self.itemtype.__name__, self.folder, self.criteria)

    def setdefault(self, __key, text, **defaults):
        """Look up item by unique key, and create if non-existent"""
        item = self.get(__key)
        if item is None:
            item = self.itemtype(text, **defaults)
            self.folder.__set__(item, __key)
        return item

    def get(self, key, default=None):
        """Look up item by unique key, or return default"""
        items = list(self==key)
        if len(items)>1:
            raise KeyError("Multiple items for", key)
        if items:
            return items.pop()
        return default

    def __getitem__(self, key):
        """Look up item by unique key"""
        item = self.get(key)
        if item is None:
            raise KeyError(key)
        return item

    def __contains__(self, __key):
        """Does at least one item exist with the given value?"""
        for item in (self==__key):
            return True
        return False

    def __and__(self, other):  return intersect(self, other)
    def __rand__(self, other): return intersect(other, self)
    def __or__(self, other):  return union(self, other)
    def __ror__(self, other): return union(other, self)




class FolderClass(type):
    """Operator support for folders"""

    def __iter__(self):
        ftype = self.ftype
        for fid, depth in Ecco.GetFolderOutline():
            if ftype is None or Ecco.GetFolderType(fid)==ftype:
                yield self(fid)

    def __contains__(self, f):
        return Ecco.GetFolderType(int(f))==(self.ftype or FolderType.CheckMark)

   




























class Folder(object):
    """Folder-based property/item container"""
    __metaclass__ = FolderClass
    ftype = None

    def __init__(self, name_or_id, create=False):
        if create and self.ftype is None:
            raise TypeError("You can only create Folder subclasses")
        if isinstance(name_or_id, basestring):
            fids = Ecco.GetFoldersByName(name_or_id)
            self.name = name_or_id
            if not fids:
                if create:
                    self.id = Ecco.CreateFolder(name_or_id, self.ftype)
                else:
                    raise KeyError(name_or_id)
            else:
                self.id, = fids
        else:
            self.id = name_or_id
            self.name = Ecco.GetFolderName(self.id)

        ftype = Ecco.GetFolderType(self.id)
        if self.ftype is None:
            self.__class__ = folder_classes[ftype]
        elif ftype != self.ftype:
            raise TypeError("%s is not a %s" %
                (name_or_id, self.__class__.__name__)
            )

    def __set__(self, ob, value):
        Ecco.SetFolderValues(int(ob), self.id, self.encode(value))

    def __get__(self, ob, typ=None):
        if ob is None:
            return Container(typ, self)
        return self.decode(Ecco.GetFolderValues(int(ob), self.id))

    def __delete__(self, ob):
        Ecco.SetFolderValues(int(ob), self.id, '')

    decorate(staticmethod)
    def encode(value):
        if value is None: return ''
        return value

    decorate(staticmethod)
    def decode(value):
        return value

    def __getitem__(self, key):
        """cls->Container or item->value"""
        if isinstance(key, ItemClass):
            return Container(key, self)
        if isinstance(key, int):
            return self.decode(Ecco.GetFolderValues(int(key), self.id))
        raise TypeError, key

    def __setitem__(self, key, value):
        if isinstance(key, int):
            return Ecco.SetFolderValues(int(key), self.id, self.encode(value))
        raise TypeError, key

    def __contains__(self, item):
        """Is item in this folder?"""
        return self.id in Ecco.GetItemFolders(int(item))

    def __repr__(self):
        return "%s(%r)" % (self.__class__.__name__, self.name)

    children = property(lambda self: map(Folder, all_folders()[self.id][1]))
    parent   = property(lambda self: Folder(all_folders()[self.id][0]))
    depth    = property(lambda self: all_folders()[self.id][2])

    def __iter__(self):
        return iter(Container(Item, self))

    def __int__(self):
        return self.id



class TextFolder(Folder):
    ftype = FolderType.Text

class PopupFolder(Folder):
    ftype = FolderType.PopUpList

class CheckmarkFolder(Folder):
    ftype = FolderType.CheckMark

    decorate(staticmethod)
    def encode(value):
        if value: return '1'
        return ''

    decorate(staticmethod)
    def decode(value):
        return bool(value)

class DateFolder(Folder):
    ftype = FolderType.Date

    decorate(staticmethod)
    def encode(value):
        if value is None: return ''
        if isinstance(value, datetime.datetime):
            return format_datetime(value)
        elif isinstance(value, datetime.date):
            return format_date(value)
        return value

    decorate(staticmethod)
    def decode(value):
        if not value:
            return None
        y,m,d = map(int, (value[:4], value[4:6], value[6:8]))
        if len(value)==8:
            return datetime.date(y,m,d)
        value = value[8:]
        return datetime.datetime(y,m,d, int(value[:2]), int(value[2:4]))


class NumericFolder(Folder):
    ftype = FolderType.Number

    def encode(self, value):
        if value is None: return ''
        return str(Decimal(value))

    def decode(self, value):
        if not value:
            return None
        if '.' in value:
            return Decimal(value)
        return int(value)


folder_classes = [
    TextFolder, PopupFolder, CheckmarkFolder, DateFolder, NumericFolder
]

folder_classes = dict([(f.ftype, f) for f in folder_classes])
folder_decoders = dict([(t, f.decode) for t,f in folder_classes.items()])

def all_folders():
    """Return a mapping of folder ids to (parentid,[childids]) pairs"""
    info, stack = {}, []
    parent, children = None, []
    for fid, depth in Ecco.GetFolderOutline():
        while depth<len(stack):
            parent, children = stack.pop()
            #ignore, children = info[parent]
        children.append(fid)
        stack.append((parent,children))
        children = []
        info[fid] = parent, children, depth
        parent = fid
    return info





def _find_item_subclass(cls, itemid=None, data=(), required=False):
    get = _folder_bits.get
    mask, values = 0, []
    if itemid is not None:
        fids = Ecco.GetItemFolders(itemid)
        for fid in fids:
            bit = get(fid, 0)
            if bit:
                values.append(fid)
                mask |= bit
        values = dict(zip(values, Ecco.GetFolderValues(itemid, values)))
    else:
        values = {}
    if data:
        values.update(data)
        mask |= reduce(operator.or_, [get(fid,0) for fid,val in data], 0)

    matches = []
    match = None
    candidates = [cls]
    while True:
        for subclass in candidates:
            m = subclass._folder_mask
            if (mask & m)!=m or (mask & subclass._exclusion_mask):
                continue
            for k, v in subclass._required_values.iteritems():
                if v!=values[k] and v is not None:
                    break
            else:
                if subclass._validate_fields(values):
                    matches.append(subclass)
        if matches:
            if len(matches)>1:
                raise TypeError("Validation ambiguity:",itemid or None,matches)
            match = matches.pop()
            candidates = [c for c in match.__subclasses__() if '_validate_fields' in c.__dict__]
        elif match is None and required:
            raise TypeError # XXX error message
        else:
            return match

def additional_tests():
    import doctest
    return doctest.DocFileSuite(
        'README.txt',
        optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE,
    )




































cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help