[Subversion] / PEAK / src / peak / util / fmtparse.py  

View of /PEAK/src/peak/util/fmtparse.py

Parent Directory | Revision Log
Revision: 1797 - (download) (as text)
Mon Aug 16 00:17:47 2004 UTC (19 years, 8 months ago) by pje
File size: 21212 byte(s)
Removed memoization from 'fmtparse': empirical testing showed that the
cache hit rate was incredibly low: only 18 hits in the entire PEAK test
suite.  Analysis suggests that only grammars having alternatives with
similar prefixes are likely to actually receive any benefit from
memoization.  By contrast, URLs and other simple, structural syntaxes of
the kind 'fmtparse' is intended for, almost never require backtracking,
and therefore don't really need the memoization.  Removing the memoization
and related code appears to shave almost a tenth of a second off the test
suite's run time.
"""Parsing and formatting based on production rules

 This module provides some special parsing and formatting features not found
 in other Python parsing libraries.  Specifically it:

    * supports reversing the production process to convert parsed output back
      to a canonical version of the input

    * Supports "smart tokenizing" based on contextually-available delimiters

    * does not separate "lexing" or "tokenizing" from "parsing", so lexical
      analysis can be parse-context dependent

    * is designed to accomodate parsing things other than strings (e.g. object
      streams, SAX event lists,...?)

    * allows the definition of arbitrary "rule", "input" and "state" objects
      that can be fitted into the framework to handle specialized input types,
      context passing, etc.

 The first three features were critical requirements for PEAK's URL-parsing
 tools.  We wanted to make it super-easy to create robust URL syntaxes that
 would produce canonical representations of URLs from input data as well as
 sensibly parse input strings.  And part of "super-easy" meant not having
 to write bazillions of regular expressions to parse every field in a URL.

 Limitations:

    - The framework isn't designed for "formatting" to non-strings.
      Specifically, most rules assume that their sub-rules will only
      'write()' things that can be joined with '"".join()' when formatting.

    - Some parts of the framework may not be 100% Unicode-safe, even if a
      UnicodeInput type were implemented.  Code review and patches appreciated.

 TODO:

    * Docstrings, docstrings, docstrings...  and a test suite!

    * ParseError should provide line/column info about the error location, not
      just offset, and it should be provided by input.error() rather than by
      the rule signalling the error.  Of course, all the rules should be
      calling input.error() instead of creating ParseError instances...

    * Perform timing tests and investigate parsing speedups for:

        - "Compiling" rules to regular expressions + postprocessors

        - "Optimizing" rules (e.g. convert Optional(user, '@') to something
          that forward-scans for '@' before trying to match 'user').

        - Moving speed-critical parts to C
"""

from peak.util.symbols import NOT_GIVEN
import protocols; from protocols import adapt

import re

__all__ = [
    'ParseError', 'MissingData', 'Rule', 'Sequence', 'Repeat', 'Optional',
    'Tuple', 'Named', 'ExtractString', 'Conversion', 'Alternatives',
    'Set', 'MatchString', 'parse', 'format', 'Input', 'StringInput',
    'StringConstant', 'Epsilon', 'IRule', 'IInput',
]

class ParseError(Exception):
    pass

class MissingData(Exception):
    pass

from peak.binding.once import Make  # XXX Core being used in a utility!

def uniquechars(s):
    return ''.join(dict([(c,c) for c in s]))






class IRule(protocols.Interface):

    """Production rule protocol for translation between data and strings"""

    def parse(input, produce, startState):
        """Parse 'input' at 'startState', call 'produce(result)', return state

        Return a 'ParseError' instance if no match"""

    def format(data, write):
        """Pass formatted 'data' to 'write()'"""

    def withTerminators(terminators, memo):
        """Return a version of this syntax using the specified terminators"""

    def getOpening(closing, memo):
        """Possible opening characters for this match, based on 'closing'"""


protocols.declareAdapter(
    lambda o: Optional(*o),
    provides=[IRule],
    forTypes=[tuple]
)

















class Rule(object):

    """Production rule protocol for translation between data and strings"""

    protocols.advise(
        instancesProvide=[IRule]
    )

    __slots__ = ()

    def parse(self, input, produce, startState):
        raise NotImplementedError

    def format(self, data, write):
        raise NotImplementedError

    def withTerminators(self, terminators, memo):
        raise NotImplementedError

    def getOpening(self,closing,memo):
        raise NotImplementedError




















class IInput(protocols.Interface):

    def reset():
        """"""

    def parse(rule,produce,state):
        """"""

    def error(rule,state,*args):
        """"""

    def startState():
        """"""

    def finished(state):
        """"""

























class Input(object):

    protocols.advise(
        instancesProvide=[IInput]
    )

    def __init__(self, *__args,**__kw):
        super(Input,self).__init__(*__args,**__kw)

    def reset(self):
        pass

    def parse(self, rule, produce, state):
        return rule.parse(self, produce, state)

    def error(self, rule, state, *args):
        return ParseError(rule=rule, state=state, *args)

    def startState(self):
        raise NotImplementedError

    def finished(self,state):
        raise NotImplementedError


















class Epsilon(Rule):

    """Simplest possible rule: matches nothing, formats nothing"""

    def parse(self, input, produce, startState):
        return startState

    def format(self,data,write):
        pass

    def withTerminators(self,terminators,memo):
        return self

    def getOpening(self,closing,memo):
        return closing


























class Sequence(Rule):

    closingChars = ''

    def __init__(self, *__args, **__kw):
        self.__dict__.update(__kw)
        self.initArgs = __args

    def parse(self, input, produce, startState):

        state = startState
        data = []; append = data.append

        for rule in self.rules:
            state = rule.parse(input, append, state)
            if isinstance(state,ParseError):
                return state

        map(produce, data)
        return state

    def format(self, data, write):
        for rule in self.rules:
            rule.format(data,write)


    def withTerminators(self, terminators, memo):
        key = id(self), terminators
        if key in memo:
            return memo[key][0]

        r = self.__class__(*self.initArgs, **{'closingChars':terminators})
        memo[key] = r, self
        r._computeRules(memo)
        return r

    def getOpening(self,closing,memo):
        if 'rules' not in self.__dict__:
            self._computeRules(memo)
        return self.openingChars

    def _computeRules(self, memo=None):

        obs = [adapt(ob,IRule) for ob in self.initArgs]

        obs.reverse()

        closeAll = self.closingChars
        closeCtx = ''

        syn = []
        self.__dict__.setdefault('rules',syn)

        if not memo:
            memo = { (id(self),closeAll): (self, self) }

        if obs:
            self.openingChars = obs[-1].getOpening('',memo)

        for ob in obs:
            terms = uniquechars(closeAll+closeCtx)
            key = id(ob), terms
            if key in memo:
                ob = memo[key][0]
            else:
                ob = ob.withTerminators(terms, memo)
                memo[key] = ob, self
            closeCtx = ob.getOpening(closeCtx,memo)
            syn.append(ob)

        self.openingChars = closeCtx
        syn.reverse()
        self.rules = syn
        return syn

    rules = Make(lambda self: self._computeRules(), attrName='rules')

    openingChars = ''




class Optional(Sequence):

    """Wrapper that makes a construct optional"""

    def parse(self, input, produce, startState):
        state = super(Optional,self).parse(input,produce,startState)
        if isinstance(state,ParseError):
            return startState
        return state

    def format(self, data, write):

        """Return a value if any non-empty non-constant parts"""

        out = []
        const = []

        for rule in self.rules:

            try:
                rule.format(data,out.append)
            except MissingData:
                out.append('')

            if isinstance(rule,str):
                const.append(rule)

        if ''.join(out) != ''.join(const):
            # output if there are any "variable" contents
            map(write,out)

    def getOpening(self,closing,memo):
        if 'rules' not in self.__dict__:
            self._computeRules(memo)
        return self.openingChars+closing






class StringConstant(Rule, str):

    protocols.advise(instancesProvide=[IRule], asAdapterForTypes=[str])

    def __new__(klass, value, caseSensitive=True):
        self=super(StringConstant,klass).__new__(klass,value)
        self.caseSensitive = caseSensitive
        return self

    def parse(self, inputStr, produce, startState):

        state = startState+len(self)
        if self.caseSensitive:
            if inputStr[startState:state]==self:
                return state
        elif inputStr[startState:state].lower()==self.lower():
            return state

        return ParseError(
            "Expected %r, found %r at position %d in %r" %
            (self, inputStr[startState:state], startState, inputStr)
        )


    def format(self,data,write):
        write(self)


    def withTerminators(self,terminators,memo):
        return self


    def getOpening(self,closing,memo):
        if self:
            if self.caseSensitive:
                return self[0]
            else:
                return self[0].lower()+self[0].upper()+self[0]
        return closing


class Repeat(Rule):
    minCt = 0
    maxCt = None
    sepMayTerm = False  # is separator allowed as terminator?
    closingChars = ''
    separator = Epsilon()

    def parse(self, input, produce, startState):

        rule = self.rule.parse
        state = startState
        data = []; append = data.append; sep=self.sep.parse

        newState = rule(input, append, state)

        if isinstance(newState,ParseError):
            if self.minCt>0:
                return newState  # it's an error
            else:
                return state  # non-error empty match

        ct = 1; maxCt = self.maxCt; state = newState
        while not input.finished(state) and (maxCt is None or ct<maxCt):

            newState = sep(input, append, state)
            if isinstance(newState,ParseError):
                break   # no more items

            state = newState
            newState = rule(input, append, state)
            if isinstance(newState,ParseError):
                if self.sepMayTerm:
                    break   # if sep can be term, it's okay to end here
                return newState  # otherwise pass the error up

            ct += 1
            state = newState

        produce(data)   # treat repeat as a list
        return state

    def format(self, data, write):

        if not data:
            return

        rule = self.rule

        for d in data[:-1]:
            rule.format(d,write)
            self.sep.format(d,write)

        rule.format(data[-1],write)


    def withTerminators(self,terminators,memo):

        key = id(self), terminators
        if key in memo:
            return memo[key][0]

        kw = self.__dict__.copy()
        if 'rule' in kw:
            del kw['rule']
        del kw['initArgs']

        kw['closingChars'] = uniquechars(terminators+self.closingChars)
        r = self.__class__(
            self.rule.withTerminators(kw['closingChars'],memo),
            **kw
        )
        memo[key] = r, self
        r._computeRule(memo)
        return r








    def _computeRule(self, memo=None):

        rule = self.rule = adapt(self.initArgs[0],IRule)

        closeAll = uniquechars(self.closingChars)

        if not memo:
            memo = { (id(self),closeAll): (self, self) }

        key = id(rule), closeAll

        if key in memo:
            rule = memo[key][0]
        else:
            rule = rule.withTerminators(closeAll, memo)
            memo[key] = rule, self

        return rule

    rule = Make(lambda self: self._computeRule(), attrName='rule')

    closingChars = ''

    def __init__(self, *__args, **__kw):
        if len(__args)<>1:
            return self.__init__(Sequence(*__args),**__kw)
        self.__dict__.update(__kw)
        self.initArgs = __args
        self.sep = adapt(self.separator, IRule)
        if 'closingChars' not in self.__dict__:
            self.closingChars = self.sep.getOpening('',{})

    def getOpening(self,closing,memo):
        if 'rule' not in self.__dict__:
            self.rule = self._computeRule(memo)
        return self.rule.getOpening(closing,memo)





class Tuple(Sequence):

    """Sequence of unnamed values, rendered as a tuple (e.g. key/value)"""

    def parse(self, input, produce, startState):

        out = []
        state = super(Tuple,self).parse(input, out.append, startState)

        if isinstance(state,ParseError):
            return state

        produce( tuple(out) )
        return state


    def format(self, data, write):
        p = 0
        for rule in self.rules:
            rule.format(data[p],write)
            if not isinstance(rule,str):
                p+=1



















class MatchString(Rule):

    """Match a regex, or longest string that doesn't include a terminator"""

    output = ''
    openingChars = ''
    terminators = ''
    explicit_pattern = False

    def __init__(self,pattern=None,**kw):

        self.__dict__.update(kw)

        if pattern is None:
            self.explicit_pattern = False
            if self.terminators:
                pattern = '[^%s]*' % re.escape(
                    ''.join(dict([(t[:1],1) for t in self.terminators]))
                )
            else:
                pattern = '.+'

        else:
            self.explicit_pattern = True

        if isinstance(pattern,str):
            pattern = re.compile(pattern)

        self.pattern = pattern


    def withTerminators(self,terminators,memo):
        if terminators==self.terminators:
            return self
        d = self.__dict__.copy()
        if not self.explicit_pattern:
            del d['pattern']
        d['terminators'] = terminators
        return self.__class__(**d)


    def parse(self, inputStr, produce, startAt):
        m = self.pattern.match(inputStr, startAt)
        if not m:
            return ParseError(
                "Failed match for %r at position %d in %r" %
                (self.pattern.pattern, startAt, inputStr)
            )
        state = m.end()
        return state


    def getOpening(self,closing,memo):
        return self.openingChars


    def format(self,data,write):

        if self.pattern is None:
            write(data)
        elif self.output is not None:
            write(self.output)
        else:
            raise MissingData   # we don't know how to format!


    # Hash/compare based on pattern so that equivalent rules
    # will memoize the same

    def __hash__(self):
        return hash(self.pattern)

    def __cmp__(self,other):
        return cmp(self.pattern, other)








class ExtractString(Rule):

    """Return matched subrule as a string"""

    __slots__ = 'rule', 'terminators'

    def __init__(self, rule=MatchString(), terminators=''):
        self.rule = adapt(rule,IRule)
        self.terminators = terminators

    def parse(self, input, produce, startState):
        out = []
        state = self.rule.parse(input, out.append, startState)
        if isinstance(state,ParseError):
            return state

        produce(input[startState:state])
        return state


    def withTerminators(self,terminators,memo):
        if self.terminators==terminators:
            return self
        return self.__class__(
            self.rule.withTerminators(terminators,memo), terminators
        )

    def getOpening(self,closing,memo):
        return self.rule.getOpening(closing,memo)

    def format(self,data,write):
        write(data)









class Named(Rule):

    """Named value - converts to/from dictionary/items"""

    __slots__ = 'name', 'rule'

    def __init__(self, name, rule=ExtractString()):
        self.name = name
        self.rule = adapt(rule,IRule)

    def withTerminators(self,terminators,memo):
        return self.__class__(
            self.name, self.rule.withTerminators(terminators,memo)
        )

    def getOpening(self,closing,memo):
        return self.rule.getOpening(closing,memo)


    def parse(self, input, produce, startState):
        return self.rule.parse(
            input, lambda v: produce((self.name,v)), startState
        )

    def format(self, data, write):
        try:
            value = data[self.name]
        except KeyError:
            raise MissingData(self.name)
        return self.rule.format(value, write)











class Conversion(Rule):

    formatter = str
    converter = str
    canBeEmpty = False
    defaultValue = NOT_GIVEN


    def __init__(self, rule=ExtractString(), **kw):
        self.__dict__.update(kw)
        self.rule = adapt(rule, IRule)


    def parse(self, input, produce, startState):

        out = []
        state = self.rule.parse(input, out.append, startState)
        if isinstance(state,ParseError):
            return state

        out, = out
        if out=='' and self.canBeEmpty and self.defaultValue is not NOT_GIVEN:
            value = self.defaultValue
        else:
            value = self.converter(out)

        produce(value)
        return state


    def withTerminators(self, terminators,memo):

        kw = self.__dict__.copy()
        kw['rule'] = self.rule.withTerminators(terminators,memo)
        return self.__class__(**kw)


    def getOpening(self,closing,memo):
        return self.rule.getOpening(closing,memo)


    def format(self, data, write):

        if self.defaultValue is not NOT_GIVEN and data==self.defaultValue:
            if self.canBeEmpty:
                return
            else:
                raise MissingData

        value = self.formatter(data)
        if not value and not self.canBeEmpty:
            raise MissingData

        self.rule.format(value, write)




























class Alternatives(Rule):

    """Match one out of many alternatives"""

    __slots__ = 'alternatives'

    def __init__(self, *alternatives):
        self.alternatives = [adapt(a,IRule) for a in alternatives]

    def withTerminators(self,terminators,memo):
        return self.__class__(
            *tuple([a.withTerminators(terminators,memo) for a in self.alternatives])
        )

    def getOpening(self,closing,memo):
        return ''.join([a.getOpening('',memo) for a in self.alternatives])

    def parse(self, input, produce, startState):
        for rule in self.alternatives:
            state = rule.parse(input,produce,startState)
            if not isinstance(state,ParseError):
                return state
        else:
            return ParseError(
                "Parse error at position %d in %r" % (startState,input)
            )

    def format(self, data, write):
        for rule in self.alternatives:
            try:
                out = []
                rule.format(data,out.append)
            except MissingData:
                continue
            if out:
                map(write,out)
                return
        else:
            raise MissingData


class Set(Alternatives):

    def parse(self, input, produce, startState):

        rules = self.alternatives[:]
        state = startState

        while rules and not input.finished(state):
            for rule in rules:
                newPos = rule.parse(input,produce,state)
                if isinstance(newPos,ParseError):
                    continue
                else:
                    state = newPos    # XXX handle separator?
                    rules.remove(rule)
                    break
            else:
                return ParseError(
                    "No match found at position %d in %r" % (state,input)
                )

        return state

    def format(self, data, write):

        for rule in self.alternatives:
            try:
                out = []
                rule.format(data,out.append)
            except MissingData:
                continue
            if out:
                map(write,out)








    def withTerminators(self,terminators,memo):
        # make sure alternatives consider each other as possible terminators
        return super(Set,self).withTerminators(
            uniquechars(
                terminators+''.join(
                    [a.getOpening('',memo) for a in self.alternatives]
                )
            ), memo
        )
































class StringInput(Input, str):

    protocols.advise(
        instancesProvide=[IInput], asAdapterForTypes=[str]
    )

    def startState(self):
        return 0

    def finished(self,state):
        return state>=len(self)



def parse(input, rule):

    out = []
    input = adapt(input,IInput)
    state = rule.parse(input, out.append, input.startState())

    if isinstance(state,ParseError):
        raise state

    if not input.finished(state):
        raise ParseError(
            "Expected EOF, found %r at position %d in %r" %
            (input[state:], state, input)
        )

    return dict(out)


def format(aDict, syntax):
    out = []
    syntax.format(aDict, out.append)
    return ''.join(out)






cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help