""" |
""" |
|
|
|
|
from xml.sax import ContentHandler, parse |
from xml.sax.saxutils import XMLGenerator, quoteattr, escape |
from xml.sax.saxutils import XMLGenerator |
from protocols import Interface, advise, Adapter |
from peak.interface import Interface, implements |
|
from kjbuckets import kjGraph |
from kjbuckets import kjGraph |
|
|
__all__ = [ |
__all__ = [ |
'load', 'ISOXNode', 'ISOXNode_NS', 'ObjectMakingHandler', 'NSHandler', |
'load', 'ISOXNode', 'ISOXNode_NS', 'IXMLBuilder', 'ExpatBuilder', |
'Node', 'Node_NS', 'Document', 'Document_NS', 'IndentedXML', |
'Node', 'Node_NS', 'Document', 'Document_NS', 'IndentedXML', |
] |
] |
|
|
|
|
class ISOXNode(Interface): |
class ISOXNode(Interface): |
|
|
"""Object mapping from an XML element |
"""Object mapping from an XML element |
def _addText(text): |
def _addText(text): |
"""Add text string 'text' to node""" |
"""Add text string 'text' to node""" |
|
|
def _addNode(subObj): |
def _addNode(name,subObj): |
"""Add finished sub-node 'subObj' to node""" |
"""Add finished sub-node 'subObj' to node""" |
|
|
def _finish(): |
def _finish(): |
"""Add text string 'text' to node""" |
"""Add text string 'text' to node""" |
|
|
|
|
def _addNode(subObj): |
def _addNode(name,subObj): |
"""Add finished sub-node 'subObj' to node""" |
"""Add finished sub-node 'subObj' to node""" |
|
|
|
|
|
|
|
|
|
|
class ObjectMakingHandler(ContentHandler): |
class IXMLBuilder(Interface): |
|
|
"""SAX handler that makes a pseudo-DOM""" |
def _xml_addChild(data): |
|
"""Add 'data' to element's children""" |
|
|
def __init__(self,documentRoot): |
def _xml_finish(): |
self.stack = [documentRoot] |
"""Return finished value to be passed to parent's 'addChild()'""" |
ContentHandler.__init__(self) |
|
|
|
def startElement(self, name, atts): |
|
top = self.stack[-1] |
|
node = top._newNode(name,atts) |
|
node._acquireFrom(top) |
|
self.stack.append(node) |
|
|
|
def characters(self, ch): |
def _xml_newTag(name,attrs,newPrefixes,parser): |
self.stack[-1]._addText(ch) |
"""Create and return a subnode for a tag""" |
|
|
def endElement(self, name): |
def _xml_addText(xml): |
stack = self.stack |
"""Return a new subnode for text""" |
top = stack.pop() |
|
|
|
if top._name != name: |
def _xml_addLiteral(xml): |
raise SyntaxError,"End tag '%s' found when '%s' was wanted" % (name, top._name) |
"""Return a new subnode for literals such as comments, PIs, etc.""" |
|
|
out = top._finish() |
|
|
|
if out is not None: |
|
stack[-1]._addNode(name,out) |
|
|
|
def endDocument(self): |
|
self.document = self.stack[0]._finish() |
|
del self.stack |
|
|
|
|
|
|
|
|
|
|
|
|
|
class NSHandler(ObjectMakingHandler): |
|
|
|
"""Namespace-handling SAX handler; uses newer interface""" |
|
|
|
def __init__(self,documentRoot): |
|
|
|
ObjectMakingHandler.__init__(self,documentRoot) |
|
|
|
self.ns2uri = {} |
|
self.uri2ns = kjGraph() |
|
self.nsStack = [] |
|
|
|
|
|
def startElement(self, name, atts): |
|
|
|
a = {}; prefix=None |
|
|
|
for k,v in atts.items(): |
|
a[k]=v |
|
|
|
if k.startswith('xmlns'): |
|
|
|
rest = k[5:] |
|
|
|
if rest: |
|
if rest[0]==':': |
|
prefix=rest[1:] |
|
else: |
|
continue |
|
else: |
|
prefix='' |
|
|
|
del a[k] |
|
self.add_prefix(prefix,v) |
|
|
|
top = self.stack[-1] |
class SoxNodeAsXMLBuilder(Adapter): |
node = top._newNode(name,a) |
|
self.stack.append(node) |
|
if prefix is not None: node._setNS(self.ns2uri, self.uri2ns) |
|
|
|
|
advise( |
|
instancesProvide=[IXMLBuilder], |
|
asAdapterForProtocols=[ISOXNode] |
|
) |
|
|
def add_prefix(self, prefix, uri): |
def _xml_addText(self,text): |
|
self.subject._addText(text) |
|
|
while len(self.nsStack) <= len(self.stack): |
def _xml_addLiteral(self,text): |
self.nsStack.append( (self.ns2uri, self.uri2ns) ) |
pass |
|
|
self.ns2uri = self.ns2uri.copy() |
def _xml_finish(self): |
self.ns2uri[prefix] = uri |
return self.subject._finish() |
self.uri2ns = ~kjGraph( self.ns2uri.items() ) |
|
|
|
|
def _xml_addChild(self,node): |
|
self.subject._addNode(self.lastName,node) # XXX |
|
|
|
def _xml_newTag(self,name,attrs,newPrefixes,parser): |
|
node = self.subject._newNode(name,dict(attrs)) |
|
node._acquireFrom(self.subject) |
|
self.lastName = name |
|
return node |
|
|
def endElement(self, name): |
|
|
|
while len(self.nsStack) >= len(self.stack): |
|
self.ns2uri, self.uri2ns = self.nsStack.pop() |
|
|
|
ObjectMakingHandler.endElement(self, name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NSNodeAsXMLBuilder(Adapter): |
|
|
|
advise( |
|
instancesProvide=[IXMLBuilder], |
|
asAdapterForProtocols=[ISOXNode_NS] |
|
) |
|
|
|
def _xml_addText(self,text): |
|
self.subject._addText(text) |
|
|
|
def _xml_addLiteral(self,text): |
|
pass |
|
|
|
def _xml_finish(self): |
|
return self.subject._finish() |
|
|
|
def _xml_addChild(self,node): |
|
self.subject._addNode(self.lastName,node) # XXX |
|
|
|
def _xml_newTag(self,name,attrs,newPrefixes,parser): |
|
node = self.subject._newNode(name,dict(attrs)) |
|
if newPrefixes: |
|
ns2uri = dict( |
|
[(prefix,stack[-1]) for prefix,stack in parser.nsInfo.items()] |
|
) |
|
node._setNS(ns2uri, ~kjGraph(ns2uri.items())) |
|
self.lastName = name |
|
return node |
|
|
|
|
|
|
|
|
|
|
|
|
"""Simple, DOM-like ISOXNode implementation""" |
"""Simple, DOM-like ISOXNode implementation""" |
|
|
implements(ISOXNode) |
advise( instancesProvide = [ISOXNode] ) |
|
|
def __init__(self,name='',atts={},**kw): |
def __init__(self,name='',atts={},**kw): |
self._name = name |
self._name = name |
d = n._findFirst(name) |
d = n._findFirst(name) |
if d: return d |
if d: return d |
|
|
|
def _finish(self): |
def _finish(self): return self |
return self |
|
|
|
|
_acquiredAttrs = () |
_acquiredAttrs = () |
def _newNode(self,name,atts): |
def _newNode(self,name,atts): |
return Node(name,atts) |
return Node(name,atts) |
|
|
|
|
class Node_NS(Node): |
class Node_NS(Node): |
|
|
|
advise( instancesProvide = [ISOXNode_NS] ) |
ns2uri = {} |
ns2uri = {} |
uri2ns = kjGraph() |
uri2ns = kjGraph() |
|
|
if documentObject is None: |
if documentObject is None: |
documentObject = Document_NS() |
documentObject = Document_NS() |
|
|
handler = NSHandler(documentObject) |
|
|
|
else: |
else: |
if documentObject is None: |
if documentObject is None: |
documentObject = Document() |
documentObject = Document() |
|
|
handler = ObjectMakingHandler(documentObject) |
|
|
|
parse(filename_or_stream, handler) |
if isinstance(filename_or_stream,str): |
return handler.document |
filename_or_stream = open(filename_or_stream,'rt') |
|
|
|
elif hasattr(filename_or_stream,'getByteStream'): |
|
filename_or_stream = filename_or_stream.getByteStream() |
|
|
|
return ExpatBuilder().parseFile(filename_or_stream,documentObject) |
|
|
|
|
|
|
|
|
|
|
|
|
|
class ExpatBuilder: |
|
|
|
"""Parser that assembles a document""" |
|
|
|
def __init__(self): |
|
self.parser = self.makeParser() |
|
self.stack = [] # "object being assembled" stack |
|
self.nsStack = [] |
|
self.nsInfo = {} # URI stack for each NS prefix |
|
|
|
def makeParser(self): |
|
from xml.parsers.expat import ParserCreate |
|
p = ParserCreate() |
|
p.ordered_attributes = True |
|
p.returns_unicode = True |
|
p.specified_attributes = True |
|
p.StartElementHandler = self.startElement |
|
p.EndElementHandler = self.endElement |
|
p.CommentHandler = self.comment |
|
p.DefaultHandler = self.buildLiteral |
|
# We don't use: |
|
# .StartDoctypeDeclHandler |
|
# .StartNamespaceDeclHandler |
|
# .EndNamespaceDeclHandler |
|
# .XmlDeclHandler(version, encoding, standalone) |
|
# .ElementDeclHandler(name, model) |
|
# .AttlistDeclHandler(elname, attname, type, default, required) |
|
# .EndDoctypeDeclHandler() |
|
# .ProcessingInstructionHandler(target, data) |
|
# .UnparsedEntityDeclHandler(entityN,base,systemId,publicId,notationN) |
|
# .EntityDeclHandler( |
|
# entityName, is_parameter_entity, value, base, |
|
# systemId, publicId, notationName) |
|
# .NotationDeclHandler(notationName, base, systemId, publicId) |
|
# .StartCdataSectionHandler() |
|
# .EndCdataSectionHandler() |
|
# .NotStandaloneHandler() |
|
return p |
|
|
|
|
|
|
|
def parseFile(self, stream, rootNode): |
|
self.__init__() |
|
self.stack.append(IXMLBuilder(rootNode)) |
|
self.parser.CharacterDataHandler = self.stack[-1]._xml_addText |
|
self.parser.ParseFile(stream) |
|
return self.stack[-1]._xml_finish() |
|
|
|
|
|
def comment(self,data): |
|
self.buildLiteral(u'<!--%s-->' % data) |
|
|
|
def buildLiteral(self,xml): |
|
self.stack[-1]._xml_addLiteral(xml) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def startElement(self, name, attrs): |
|
|
|
prefixes = []; a = [] |
|
pop = attrs.pop |
|
append = a.append |
|
|
|
while attrs: |
|
k = pop(0); v=pop(0) |
|
append((k,v)) |
|
|
|
if not k.startswith('xmlns'): |
|
continue |
|
|
|
rest = k[5:] |
|
if not rest: |
|
ns = '' |
|
elif rest.startswith(':'): |
|
ns = rest[1:] |
|
else: |
|
continue |
|
|
|
self.nsInfo.setdefault(ns,[]).append(v) |
|
prefixes.append(ns) |
|
|
|
self.nsStack.append(prefixes) |
|
element = self.stack[-1]._xml_newTag(name, a, prefixes, self) |
|
self.stack.append(IXMLBuilder(element)) |
|
self.parser.CharacterDataHandler = self.stack[-1]._xml_addText |
|
|
|
def endElement(self, name): |
|
last = self.stack.pop() |
|
self.parser.CharacterDataHandler = self.stack[-1]._xml_addText |
|
self.stack[-1]._xml_addChild(last._xml_finish()) |
|
for prefix in self.nsStack.pop(): |
|
self.nsInfo[prefix].pop() |
|
|
|
|
|
|
|
|
|
|
|
|