"""Element handling as described in RFC 4825. This module implements * location of an element in xml document * location of insertion point for a new element in xml document This allows to implement GET/PUT/DELETE for elements in XCAP server. Syntax for element selectors is a subset of XPATH, but an XPATH implementation was not used. One reason is that XPATH only implements locating an element but not an insertion point for an element selector which does not point to an existing element (but will point to the inserted element after PUT). For element selectors of type *[@att="value"] insertion point depends on the content of a new element. For RFC compliant behavior, fix such requests by replacing '*' with the root tag of the new element. """ import sys from StringIO import StringIO from application import log from xcap import uri def make_parser(): parser = sax.make_parser() parser.setFeature(sax.handler.feature_namespaces, 1) parser.setFeature(sax.handler.feature_namespace_prefixes, 1) return parser try: # we need feature_namespace_prefixes that is implemented in _xmlplus's expat # parser, but not in the stock xml package from _xmlplus import sax except ImportError: # let's hope for a miracle from xml import sax try: make_parser() except sax._exceptions.SAXNotSupportedException: # no miracle today, complain about the original error log.fatal("Package _xmlplus was not found on your system. Please install pyxml library") # comment the following line out if you don't need element operations sys.exit(1) class ThrowEventsAway(sax.ContentHandler): pass def check_xml_fragment(element_str): """Run SAX parser on element_str to check its well-formedness. Ignore unbound namespaces prefixes. >>> check_xml_fragment("") >>> check_xml_fragment(''' ... Test ... ''') >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:7: mismatched tag >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:5: not well-formed (invalid token) >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:7: junk after document element >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:4: not well-formed (invalid token) """ parser = sax.make_parser() # ignore namespaces and prefixes parser.setFeature(sax.handler.feature_namespaces, 0) parser.setFeature(sax.handler.feature_namespace_prefixes, 0) parser.setContentHandler(ThrowEventsAway()) parser.parse(StringIO(element_str)) class Step: # to be matched against uri.Step def __init__(self, name, position = 0): self.name = name # this integer holds index of a child element currently in processing self.position = position def __repr__(self): return '%s[pos=%s]' % (self.name, self.position) class ContentHandlerBase(sax.ContentHandler): def __init__(self, selector): sax.ContentHandler.__init__(self) self.selector = selector self.state = None self.locator = None def setDocumentLocator(self, locator): self.locator = locator def pos(self): return self.locator._ref._parser.CurrentByteIndex def set_state(self, new_state): #print new_state, 'at %s' % str(self.pos()) self.state = new_state def set_end_pos(self, end_pos, end_tag = None, end_pos_2 = None): self.end_pos = end_pos self.end_tag = end_tag self.end_pos_2 = end_pos_2 def fix_end_pos(self, document): if self.end_tag is not None and self.end_tag in document[self.end_pos:self.end_pos_2]: if self.end_pos_2 is None: self.end_pos = 1 + document.index('>', self.end_pos) else: self.end_pos = 1 + document.index('>', self.end_pos, self.end_pos_2) def __repr__(self): return '<%s selector=%r state=%r>' % (self.__class__.__name__, self.selector, self.state) class ElementLocator(ContentHandlerBase): """Locates element in a document by element selector expression (subset of XPATH defined in RFC 4825) There's an intentional difference from XPATH (at least as implemented in lxml): tail following the closing tag is not included in the end result (this doesn't make sense for XCAP and incompatible with some of the requirements in RFC). """ def startDocument(self): if self.locator is None: raise RuntimeError("The parser doesn't support locators") self.path = [] self.state = 'LOOKING' self.curstep = 0 self.skiplevel = 0 self.set_end_pos(None, None, None) def startElementNS(self, name, qname, attrs): #print '-' * (len(self.path) + self.skiplevel), '<', name, '/' + '/'.join(map(str, self.path)) if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel += 1 return if self.curstep>=len(self.selector): self.skiplevel = 1 return if self.path: parent = self.path[-1] else: parent = None curstep = self.selector[self.curstep] #print `name`, `curstep.name` if curstep.name == '*' or curstep.name == name: if parent: parent.position += 1 else: self.skiplevel = 1 return if parent is None: if curstep.position not in [None, 1]: self.skiplevel = 1 return else: if curstep.position is not None and curstep.position != parent.position: self.skiplevel = 1 return if curstep.att_name is not None and attrs.get(curstep.att_name)!=curstep.att_value: self.skiplevel = 1 return #print '%r matched' % curstep self.curstep += 1 self.path.append(Step(qname)) if len(self.path)==len(self.selector): if self.state=='LOOKING': self.set_state('FOUND') self.start_pos = self.pos() elif self.state=='DONE': self.set_state('MANY') def endElementNS(self, name, qname): #print '-' * (len(self.path) + self.skiplevel-1), '>', name, '/' + '/'.join(map(str, self.path)) if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel -= 1 return if len(self.path)==len(self.selector) and self.state=='FOUND': self.set_state('DONE') # QQQ why qname passed to endElementNS is None? qname = self.path[-1].name self.set_end_pos(self.pos(), '') # where does pos() point to? two cases: # 1. ....*HERE* # 2. *HERE*... # If it's the first case we need to adjust pos() by len('') # To determine the case, let's mark the position of the next startElement/endElement # and see if there '' substring right after end_pos limited by end_pos_2 # 1. ....*end_pos*...*end_pos_2*<... # 2. *end_pos*...*end_pos_2*<... element = self.path.pop() self.curstep -= 1 class InsertPointLocator(ContentHandlerBase): """Locate the insertion point -- where in the document a new element should be inserted. It operates under assumption that the request didn't yield any matches with ElementLocator (its state was 'LOOKING' after parsing). Note, that this class doesn't know what will be inserted and therefore may do not do what you want with requests like 'labels/*[att="new-att"]'. """ def startDocument(self): if self.locator is None: raise RuntimeError("The parser doesn't support locators") self.path = [] self.state = 'LOOKING' self.curstep = 0 self.skiplevel = 0 self.set_end_pos(None, None, None) def startElementNS(self, name, qname, attrs): #print '<' * (1+len(self.path) + self.skiplevel), name, '/' + '/'.join(map(str, self.path)), #print self.curstep, self.skiplevel if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel += 1 return if self.curstep>=len(self.selector): self.skiplevel = 1 return if self.path: parent = self.path[-1] else: parent = None curstep = self.selector[self.curstep] if curstep.name == '*' or curstep.name == name: if parent: parent.position += 1 else: self.skiplevel = 1 return is_last_step = len(self.path)+1 == len(self.selector) if not is_last_step: if curstep.position is not None and curstep.position != parent.position: self.skiplevel = 1 return if curstep.att_name is not None and \ attrs.get(curstep.att_name)!=curstep.att_value: self.skiplevel = 1 return else: if curstep.position == 1 and parent.position == 1: self.set_state('DONE') self.set_end_pos(self.pos(), end_pos_2=self.pos()) self.curstep += 1 self.path.append(Step(qname)) def endElementNS(self, name, qname): #print '>' * (1+len(self.path)+self.skiplevel-1), name, '/' + '/'.join(map(str, self.path)), #print self.curstep, self.skiplevel if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel -= 1 return qname = self.path[-1].name curstep = self.selector[-1] if len(self.path)==len(self.selector): parent = self.path[-2] if curstep.position is None: if self.state=='DONE': self.set_state('MANY') else: self.set_state('CLOSED') self.set_end_pos(self.pos(), '') elif curstep.position-1 == parent.position: if self.state=='DONE': self.set_state('MANY') else: self.set_state('DONE') self.set_end_pos(self.pos(), '') elif len(self.path)+1==len(self.selector): if self.state == 'CLOSED': self.set_state('DONE') if curstep.name=='*' and curstep.position is None: self.set_end_pos(self.pos(), end_pos_2 = self.pos()) elif self.state == 'LOOKING': self.set_state('DONE') self.set_end_pos(self.pos(), end_pos_2 = self.pos()) element = self.path.pop() self.curstep -= 1 class LocatorError(ValueError): def __init__(self, msg, handler=None): ValueError.__init__(self, msg) self.handler = handler @staticmethod def generate_error(locator, element_selector): if locator.state == 'LOOKING': return None elif locator.state == 'MANY': raise SelectorError(element_selector._original_string, locator) else: raise LocatorError('Internal error in %s' % locator.__class__.__name__, locator) class SelectorError(LocatorError): http_error = 404 def __init__(self, selector, handler=None): msg = 'The requested node selector %s matches more than one element' % selector LocatorError.__init__(self, msg, handler) def find(document, element_selector): """Return an element as (first index, last index+1) If it couldn't be found, return None. If there're several matches, raise SelectorError. """ parser = make_parser() el = ElementLocator(element_selector) parser.setContentHandler(el) parser.parse(StringIO(document)) if el.state == 'DONE': el.fix_end_pos(document) return (el.start_pos, el.end_pos) else: return LocatorError.generate_error(el, element_selector) def get(document, element_selector): """Return an element as a string. If it couldn't be found, return None. If there're several matches, raise SelectorError. """ location = find(document, element_selector) if location is not None: start, end = location return document[start:end] def delete(document, element_selector): """Return document with element deleted. If it couldn't be found, return None. If there're several matches, raise SelectorError. """ location = find(document, element_selector) if location is not None: start, end = location return document[:start] + document[end:] def put(document, element_selector, element_str): """Return a 2-items tuple: (new_document, created). new_document is a copy of document with element_str inside. created is True if insertion was performed as opposed to replacement. If element_selector matches an existing element, it is replaced with element_str. If not, it is inserted at appropriate place. If it's impossible to insert at this location, return None. If element_selector matches more than one element or more than one possible place to insert and there're no rule to resolve the ambiguity then SelectorError is raised. """ location = find(document, element_selector) if location is None: ipl = InsertPointLocator(element_selector) parser = make_parser() parser.setContentHandler(ipl) parser.parse(StringIO(document)) if ipl.state == 'DONE': ipl.fix_end_pos(document) start, end = ipl.end_pos, ipl.end_pos created = True else: return LocatorError.generate_error(ipl, element_selector) else: start, end = location created = False return (document[:start] + element_str + document[end:], created) # Q: why create a new parser for every parsing? # A: when sax.make_parser() was called once, I've occasionaly encountered an exception like this: # # File "/usr/lib/python2.5/site-packages/xcap/appusage/__init__.py", line 178, in _cb_get_element # result = XCAPElement.get(response.data, uri.node_selector.element_selector) # File "/usr/lib/python2.5/site-packages/xcap/element.py", line 323, in get # location = cls.find(document, element_selector) # File "/usr/lib/python2.5/site-packages/xcap/element.py", line 308, in find # cls.parser.setContentHandler(el) # File "/usr/lib/python2.5/site-packages/_xmlplus/sax/expatreader.py", line 128, in setContentHandler # self._reset_cont_handler() # File "/usr/lib/python2.5/site-packages/_xmlplus/sax/expatreader.py", line 234, in _reset_cont_handler # self._cont_handler.processingInstruction # exceptions.AttributeError: 'NoneType' object has no attribute 'ProcessingInstructionHandler' # # I have no idea what does that mean, but probably something to do with parser's state becoming invalid # under some circumstances. class _test: source1 = """ hello hi! """ source2 = """ """ rls_services_xml = """ http://xcap.example.com/resource-lists/users/sip:joe@example.com/index/~~/resource-lists/list%5b@name=%22l1%22%5d presence presence """ @staticmethod def trim(s0): "remove tail from the result" s = s0 while s and s[-1]!='>': s = s[:-1] if s: return s else: return s0 @classmethod def lxml_xpath_get(cls, xpath_expr, source=source1, namespace=None, namespaces={}): "First, use xpath from lxml, which should produce the same results for existing nodes" assert '/'.startswith(xpath_expr[:1]), xpath_expr doc = etree.parse(StringIO(source)) try: # where to put namespace? r = doc.xpath(xpath_expr, namespaces=namespaces) except etree.XPathEvalError: return uri.NodeParsingError except Exception, ex: traceback.print_exc() return ex if len(r)==1: return cls.trim(etree.tostring(r[0])) elif len(r)>1: return SelectorError @staticmethod def xcap_get(xpath_expr, source=source1, namespace=None, namespaces={}): "Second, use xpath_get_element" try: selector = uri.parse_node_selector(xpath_expr, namespace, namespaces)[0] return get(source, selector) except (uri.NodeParsingError, SelectorError), ex : return ex.__class__ except Exception, ex: traceback.print_exc() return ex @staticmethod def xcap_put(xpath_expr, element, source=source1, namespace=None, namespaces={}): try: selector = uri.parse_node_selector(xpath_expr, namespace, namespaces)[0] return put(source, selector, element)[0] except (uri.NodeParsingError, SelectorError), ex : return ex.__class__ except Exception, ex: traceback.print_exc() return ex @classmethod def test_get(cls): emph1 = 'Midwinter Spring' thomas = 'Thomas Eliot' ezra = 'Ezra Pound' hi = 'hi!' yesterday = '