diff --git a/sipsimple/payloads/__init__.py b/sipsimple/payloads/__init__.py index 48535dec..6851667b 100644 --- a/sipsimple/payloads/__init__.py +++ b/sipsimple/payloads/__init__.py @@ -1,1201 +1,1203 @@ __all__ = ['ParserError', 'BuilderError', 'ValidationError', 'IterateTypes', 'IterateIDs', 'IterateItems', 'All', 'parse_qname', 'XMLDocument', 'XMLAttribute', 'XMLElementID', 'XMLElementChild', 'XMLElementChoiceChild', 'XMLStringChoiceChild', 'XMLElement', 'XMLRootElement', 'XMLSimpleElement', 'XMLStringElement', 'XMLLocalizedStringElement', 'XMLBooleanElement', 'XMLByteElement', 'XMLUnsignedByteElement', 'XMLShortElement', 'XMLUnsignedShortElement', 'XMLIntElement', 'XMLUnsignedIntElement', 'XMLLongElement', 'XMLUnsignedLongElement', 'XMLIntegerElement', 'XMLPositiveIntegerElement', 'XMLNegativeIntegerElement', 'XMLNonNegativeIntegerElement', 'XMLNonPositiveIntegerElement', 'XMLDecimalElement', 'XMLDateTimeElement', 'XMLAnyURIElement', 'XMLEmptyElement', 'XMLEmptyElementRegistryType', 'XMLListElement', 'XMLListRootElement', 'XMLStringListElement'] import os import sys import urllib.request, urllib.parse, urllib.error from collections import defaultdict, deque from copy import deepcopy from decimal import Decimal from itertools import chain from weakref import WeakValueDictionary from application.python import Null from application.python.descriptor import classproperty from application.python.types import MarkerType from application.python.weakref import weakobjectmap from lxml import etree from sipsimple.payloads.datatypes import Boolean, Byte, UnsignedByte, Short, UnsignedShort, Int, UnsignedInt, Long, UnsignedLong from sipsimple.payloads.datatypes import PositiveInteger, NegativeInteger, NonNegativeInteger, NonPositiveInteger, DateTime, AnyURI from sipsimple.util import All ## Exceptions class ParserError(Exception): pass class BuilderError(Exception): pass class ValidationError(ParserError): pass ## Markers class IterateTypes(metaclass=MarkerType): pass class IterateIDs(metaclass=MarkerType): pass class IterateItems(metaclass=MarkerType): pass class StoredAttribute(metaclass=MarkerType): pass ## Utilities def parse_qname(qname): if qname[0] == '{': qname = qname[1:] return qname.split('}') else: return None, qname ## XMLDocument class XMLDocumentType(type): def __init__(cls, name, bases, dct): cls.nsmap = {} cls.schema_map = {} cls.element_map = {} cls.root_element = None cls.schema = None cls.parser = None for base in reversed(bases): if hasattr(base, 'element_map'): cls.element_map.update(base.element_map) if hasattr(base, 'schema_map'): cls.schema_map.update(base.schema_map) if hasattr(base, 'nsmap'): cls.nsmap.update(base.nsmap) cls._update_schema() def __setattr__(cls, name, value): if name == 'schema_path': if cls is not XMLDocument: raise AttributeError("%s can only be changed on XMLDocument" % name) super(XMLDocumentType, cls).__setattr__(name, value) def update_schema(document_class): document_class._update_schema() for document_subclass in document_class.__subclasses__(): update_schema(document_subclass) update_schema(XMLDocument) else: super(XMLDocumentType, cls).__setattr__(name, value) def _update_schema(cls): if cls.schema_map: location_map = {ns: urllib.parse.quote(os.path.abspath(os.path.join(cls.schema_path, schema_file)).replace('\\', '//')) for ns, schema_file in list(cls.schema_map.items())} schema = """ %s """ % '\r\n'.join('' % (namespace, schema_location) for namespace, schema_location in list(location_map.items())) cls.schema = etree.XMLSchema(etree.XML(schema)) cls.parser = etree.XMLParser(schema=cls.schema, remove_blank_text=True) else: cls.schema = None cls.parser = etree.XMLParser(remove_blank_text=True) class XMLDocument(object, metaclass=XMLDocumentType): encoding = 'UTF-8' content_type = None schema_path = os.path.join(os.path.dirname(__file__), 'xml-schemas') @classmethod def parse(cls, document): try: if isinstance(document, str): xml = etree.XML(document, parser=cls.parser) elif isinstance(document, str): xml = etree.XML(document.encode('utf-8'), parser=cls.parser) else: xml = etree.parse(document, parser=cls.parser).getroot() if cls.schema is not None: cls.schema.assertValid(xml) return cls.root_element.from_element(xml, xml_document=cls) except (etree.DocumentInvalid, etree.XMLSyntaxError, ValueError) as e: raise ParserError(str(e)) @classmethod def build(cls, root_element, encoding=None, pretty_print=False, validate=True): if type(root_element) is not cls.root_element: raise TypeError("can only build XML documents from root elements of type %s" % cls.root_element.__name__) element = root_element.to_element() if validate and cls.schema is not None: cls.schema.assertValid(element) # Cleanup namespaces and move element NS mappings to the global scope. normalized_element = etree.Element(element.tag, attrib=element.attrib, nsmap=dict(chain(iter(list(element.nsmap.items())), iter(list(cls.nsmap.items()))))) normalized_element.text = element.text normalized_element.tail = element.tail normalized_element.extend(deepcopy(child) for child in element) etree.cleanup_namespaces(normalized_element) return etree.tostring(normalized_element, encoding=encoding or cls.encoding, method='xml', xml_declaration=True, pretty_print=pretty_print) @classmethod def create(cls, build_kw={}, **kw): return cls.build(cls.root_element(**kw), **build_kw) @classmethod def register_element(cls, xml_class): cls.element_map[xml_class.qname] = xml_class for child in cls.__subclasses__(): child.register_element(xml_class) @classmethod def get_element(cls, qname, default=None): return cls.element_map.get(qname, default) @classmethod def register_namespace(cls, namespace, prefix=None, schema=None): if prefix in cls.nsmap: raise ValueError("prefix %s is already registered in %s" % (prefix, cls.__name__)) if namespace in iter(list(cls.nsmap.values())): raise ValueError("namespace %s is already registered in %s" % (namespace, cls.__name__)) cls.nsmap[prefix] = namespace if schema is not None: cls.schema_map[namespace] = schema cls._update_schema() for child in cls.__subclasses__(): child.register_namespace(namespace, prefix, schema) @classmethod def unregister_namespace(cls, namespace): try: prefix = next((prefix for prefix in cls.nsmap if cls.nsmap[prefix]==namespace)) except StopIteration: raise KeyError("namespace %s is not registered in %s" % (namespace, cls.__name__)) del cls.nsmap[prefix] schema = cls.schema_map.pop(namespace, None) if schema is not None: cls._update_schema() for child in cls.__subclasses__(): try: child.unregister_namespace(namespace) except KeyError: pass ## Children descriptors class XMLAttribute(object): def __init__(self, name, xmlname=None, type=str, default=None, required=False, test_equal=True, onset=None, ondel=None): self.name = name self.xmlname = xmlname or name self.type = type self.default = default self.__xmlparse__ = getattr(type, '__xmlparse__', lambda value: value) self.__xmlbuild__ = getattr(type, '__xmlbuild__', str) self.required = required self.test_equal = test_equal self.onset = onset self.ondel = ondel self.values = weakobjectmap() def __get__(self, obj, objtype): if obj is None: return self try: return self.values[obj] except KeyError: value = self.values.setdefault(obj, self.default) if value is not None: obj.element.set(self.xmlname, self.build(value)) return value def __set__(self, obj, value): if value is not None and not isinstance(value, self.type): value = self.type(value) old_value = self.values.get(obj, self.default) if value == old_value: return if value is not None: obj.element.set(self.xmlname, self.build(value)) else: obj.element.attrib.pop(self.xmlname, None) self.values[obj] = value obj.__dirty__ = True if self.onset: self.onset(obj, self, value) def __delete__(self, obj): obj.element.attrib.pop(self.xmlname, None) try: value = self.values.pop(obj) except KeyError: pass else: if value != self.default: obj.__dirty__ = True if self.ondel: self.ondel(obj, self) def parse(self, xmlvalue): return self.__xmlparse__(xmlvalue) def build(self, value): return self.__xmlbuild__(value) class XMLElementID(XMLAttribute): """An XMLAttribute that represents the ID of an element (immutable).""" def __set__(self, obj, value): if obj in self.values: raise AttributeError("An XML element ID cannot be changed") super(XMLElementID, self).__set__(obj, value) def __delete__(self, obj): raise AttributeError("An XML element ID cannot be deleted") class XMLElementChild(object): def __init__(self, name, type, required=False, test_equal=True, onset=None, ondel=None): self.name = name self.type = type self.required = required self.test_equal = test_equal self.onset = onset self.ondel = ondel self.values = weakobjectmap() def __get__(self, obj, objtype): if obj is None: return self try: return self.values[obj] except KeyError: return None def __set__(self, obj, value): if value is not None and not isinstance(value, self.type): value = self.type(value) same_value = False old_value = self.values.get(obj) if value is old_value: return elif value is not None and value == old_value: value.__dirty__ = old_value.__dirty__ same_value = True if old_value is not None: obj.element.remove(old_value.element) if value is not None: obj._insert_element(value.element) self.values[obj] = value if not same_value: obj.__dirty__ = True if self.onset: self.onset(obj, self, value) def __delete__(self, obj): try: old_value = self.values.pop(obj) except KeyError: pass else: if old_value is not None: obj.element.remove(old_value.element) obj.__dirty__ = True if self.ondel: self.ondel(obj, self) class XMLElementChoiceChildWrapper(object): __slots__ = ('descriptor', 'type') def __init__(self, descriptor, type): self.descriptor = descriptor self.type = type def __getattribute__(self, name): if name in ('descriptor', 'type', 'register_extension', 'unregister_extension'): return super(XMLElementChoiceChildWrapper, self).__getattribute__(name) else: return self.descriptor.__getattribute__(name) def __setattr__(self, name, value): if name in ('descriptor', 'type'): super(XMLElementChoiceChildWrapper, self).__setattr__(name, value) else: setattr(self.descriptor, name, value) def __dir__(self): return dir(self.descriptor) + ['descriptor', 'type', 'register_extension', 'unregister_extension'] def register_extension(self, type): if self.extension_type is None: raise ValueError("The %s XML choice element of %s does not support extensions" % (self.name, self.type.__name__)) if not issubclass(type, XMLElement) or not issubclass(type, self.extension_type): raise TypeError("type is not a subclass of XMLElement and/or %s: %s" % (self.extension_type.__name__, type.__name__)) if type in self.types: raise ValueError("%s is already registered as a choice extension" % type.__name__) self.types.add(type) self.type._xml_children_qname_map[type.qname] = (self.descriptor, type) for child_class in self.type.__subclasses__(): child_class._xml_children_qname_map[type.qname] = (self.descriptor, type) def unregister_extension(self, type): if self.extension_type is None: raise ValueError("The %s XML choice element of %s does not support extensions" % (self.name, self.type.__name__)) try: self.types.remove(type) except ValueError: raise ValueError("%s is not a registered choice extension on %s" % (type.__name__, self.type.__name__)) del self.type._xml_children_qname_map[type.qname] for child_class in self.type.__subclasses__(): del child_class._xml_children_qname_map[type.qname] class XMLElementChoiceChild(object): def __init__(self, name, types, extension_type=None, required=False, test_equal=True, onset=None, ondel=None): self.name = name self.types = set(types) self.extension_type = extension_type self.required = required self.test_equal = test_equal self.onset = onset self.ondel = ondel self.values = weakobjectmap() def __get__(self, obj, objtype): if obj is None: return XMLElementChoiceChildWrapper(self, objtype) try: return self.values[obj] except KeyError: return None def __set__(self, obj, value): if value is not None and type(value) not in self.types: raise TypeError("%s is not an acceptable type for %s" % (value.__class__.__name__, obj.__class__.__name__)) same_value = False old_value = self.values.get(obj) if value is old_value: return elif value is not None and value == old_value: value.__dirty__ = old_value.__dirty__ same_value = True if old_value is not None: obj.element.remove(old_value.element) if value is not None: obj._insert_element(value.element) self.values[obj] = value if not same_value: obj.__dirty__ = True if self.onset: self.onset(obj, self, value) def __delete__(self, obj): try: old_value = self.values.pop(obj) except KeyError: pass else: if old_value is not None: obj.element.remove(old_value.element) obj.__dirty__ = True if self.ondel: self.ondel(obj, self) class XMLStringChoiceChild(XMLElementChoiceChild): """ A choice between keyword strings from a registry, custom strings from the other type and custom extensions. This descriptor will accept and return strings instead of requiring XMLElement instances for the values in the registry and the other type. Check XMLEmptyElementRegistryType for a metaclass for building registries of XMLEmptyElement classes for keywords. """ def __init__(self, name, registry=None, other_type=None, extension_type=None): self.registry = registry self.other_type = other_type self.extension_type = extension_type types = registry.classes if registry is not None else () types += (other_type,) if other_type is not None else () super(XMLStringChoiceChild, self).__init__(name, types, extension_type=extension_type, required=True, test_equal=True) def __get__(self, obj, objtype): value = super(XMLStringChoiceChild, self).__get__(obj, objtype) if obj is None or objtype is StoredAttribute or value is None or isinstance(value, self.extension_type or ()): return value else: return str(value) def __set__(self, obj, value): if isinstance(value, str): if self.registry is not None and value in self.registry.names: value = self.registry.class_map[value]() elif self.other_type is not None: value = self.other_type.from_string(value) super(XMLStringChoiceChild, self).__set__(obj, value) ## XMLElement base classes class XMLElementBase(object): """ This class is used as a common ancestor for XML elements and provides the means for super() to find at least dummy implementations for the methods that are supposed to be implemented by subclasses, even when they are not implemented by any other ancestor class. This is necessary in order to simplify access to these methods when multiple inheritance is involved and none or only some of the classes implement them. The methods declared here should to be implemented in subclasses as necessary. """ def __get_dirty__(self): return False def __set_dirty__(self, dirty): return def _build_element(self): return def _parse_element(self, element): return class XMLElementType(type): def __init__(cls, name, bases, dct): super(XMLElementType, cls).__init__(name, bases, dct) # set dictionary of xml attributes and xml child elements cls._xml_attributes = {} cls._xml_element_children = {} cls._xml_children_qname_map = {} for base in reversed(bases): if hasattr(base, '_xml_attributes'): cls._xml_attributes.update(base._xml_attributes) if hasattr(base, '_xml_element_children') and hasattr(base, '_xml_children_qname_map'): cls._xml_element_children.update(base._xml_element_children) cls._xml_children_qname_map.update(base._xml_children_qname_map) for name, value in list(dct.items()): if isinstance(value, XMLElementID): if cls._xml_id is not None: raise AttributeError("Only one XMLElementID attribute can be defined in the %s class" % cls.__name__) cls._xml_id = value cls._xml_attributes[value.name] = value elif isinstance(value, XMLAttribute): cls._xml_attributes[value.name] = value elif isinstance(value, XMLElementChild): cls._xml_element_children[value.name] = value cls._xml_children_qname_map[value.type.qname] = (value, value.type) elif isinstance(value, XMLElementChoiceChild): cls._xml_element_children[value.name] = value for type in value.types: cls._xml_children_qname_map[type.qname] = (value, type) # register class in its XMLDocument if cls._xml_document is not None: cls._xml_document.register_element(cls) class XMLElement(XMLElementBase, metaclass=XMLElementType): _xml_tag = None # To be defined in subclass _xml_namespace = None # To be defined in subclass _xml_document = None # To be defined in subclass _xml_extension_type = None # Can be defined in subclass _xml_id = None # Can be defined in subclass, or will be set by the metaclass to the XMLElementID attribute (if present) _xml_children_order = {} # Can be defined in subclass # dynamically generated _xml_attributes = {} _xml_element_children = {} _xml_children_qname_map = {} qname = classproperty(lambda cls: '{%s}%s' % (cls._xml_namespace, cls._xml_tag)) def __init__(self): self.element = etree.Element(self.qname, nsmap=self._xml_document.nsmap) self.__dirty__ = True def __get_dirty__(self): return (self.__dict__['__dirty__'] or any(child.__dirty__ for child in (getattr(self, name) for name in self._xml_element_children) if child is not None) or super(XMLElement, self).__get_dirty__()) def __set_dirty__(self, dirty): super(XMLElement, self).__set_dirty__(dirty) if not dirty: for child in (child for child in (getattr(self, name) for name in self._xml_element_children) if child is not None): child.__dirty__ = dirty self.__dict__['__dirty__'] = dirty __dirty__ = property(__get_dirty__, __set_dirty__) def check_validity(self): # check attributes for name, attribute in list(self._xml_attributes.items()): # if attribute has default but it was not set, will also be added with this occasion value = getattr(self, name, None) if attribute.required and value is None: raise ValidationError("required attribute %s of %s is not set" % (name, self.__class__.__name__)) # check element children for name, element_child in list(self._xml_element_children.items()): # if child has default but it was not set, will also be added with this occasion child = getattr(self, name, None) if child is None and element_child.required: raise ValidationError("element child %s of %s is not set" % (name, self.__class__.__name__)) def to_element(self): try: self.check_validity() except ValidationError as e: raise BuilderError(str(e)) # build element children for name in self._xml_element_children: descriptor = getattr(self.__class__, name) child = descriptor.__get__(self, StoredAttribute) if child is not None: child.to_element() self._build_element() return self.element @classmethod def from_element(cls, element, xml_document=None): obj = cls.__new__(cls) obj._xml_document = xml_document if xml_document is not None else cls._xml_document obj.element = element # set known attributes for name, attribute in list(cls._xml_attributes.items()): xmlvalue = element.get(attribute.xmlname, None) if xmlvalue is not None: try: setattr(obj, name, attribute.parse(xmlvalue)) except (ValueError, TypeError): raise ValidationError("got illegal value for attribute %s of %s: %s" % (name, cls.__name__, xmlvalue)) # set element children for child in element: element_child, type = cls._xml_children_qname_map.get(child.tag, (None, None)) if element_child is not None: try: value = type.from_element(child, xml_document=obj._xml_document) except ValidationError: pass # we should accept partially valid documents else: setattr(obj, element_child.name, value) obj._parse_element(element) obj.check_validity() obj.__dirty__ = False return obj @classmethod def _register_xml_attribute(cls, attribute, element): cls._xml_element_children[attribute] = element cls._xml_children_qname_map[element.type.qname] = (element, element.type) for subclass in cls.__subclasses__(): subclass._register_xml_attribute(attribute, element) @classmethod def _unregister_xml_attribute(cls, attribute): element = cls._xml_element_children.pop(attribute) del cls._xml_children_qname_map[element.type.qname] for subclass in cls.__subclasses__(): subclass._unregister_xml_attribute(attribute) @classmethod def register_extension(cls, attribute, type, test_equal=True): if cls._xml_extension_type is None: raise ValueError("XMLElement type %s does not support extensions (requested extension type %s)" % (cls.__name__, type.__name__)) elif not issubclass(type, cls._xml_extension_type): raise TypeError("XMLElement type %s only supports extensions of type %s (requested extension type %s)" % (cls.__name__, cls._xml_extension_type, type.__name__)) elif hasattr(cls, attribute): raise ValueError("XMLElement type %s already has an attribute named %s (requested extension type %s)" % (cls.__name__, attribute, type.__name__)) extension = XMLElementChild(attribute, type=type, required=False, test_equal=test_equal) setattr(cls, attribute, extension) cls._register_xml_attribute(attribute, extension) @classmethod def unregister_extension(cls, attribute): if cls._xml_extension_type is None: raise ValueError("XMLElement type %s does not support extensions" % cls.__name__) cls._unregister_xml_attribute(attribute) delattr(cls, attribute) def _insert_element(self, element): if element in self.element: return order = self._xml_children_order.get(element.tag, self._xml_children_order.get(None, sys.maxsize)) for i in range(len(self.element)): child_order = self._xml_children_order.get(self.element[i].tag, self._xml_children_order.get(None, sys.maxsize)) if child_order > order: position = i break else: position = len(self.element) self.element.insert(position, element) def __eq__(self, other): if isinstance(other, XMLElement): if self is other: return True for name, attribute in list(self._xml_attributes.items()): if attribute.test_equal: if not hasattr(other, name) or getattr(self, name) != getattr(other, name): return False for name, element_child in list(self._xml_element_children.items()): if element_child.test_equal: if not hasattr(other, name) or getattr(self, name) != getattr(other, name): return False try: __eq__ = super(XMLElement, self).__eq__ except AttributeError: return True else: return __eq__(other) elif self._xml_id is not None: return self._xml_id == other else: return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal def __hash__(self): if self._xml_id is not None: return hash(self._xml_id) else: return object.__hash__(self) class XMLRootElementType(XMLElementType): def __init__(cls, name, bases, dct): super(XMLRootElementType, cls).__init__(name, bases, dct) if cls._xml_document is not None: if cls._xml_document.root_element is not None: raise TypeError('there is already a root element registered for %s' % cls.__name__) cls._xml_document.root_element = cls class XMLRootElement(XMLElement, metaclass=XMLRootElementType): def __init__(self): XMLElement.__init__(self) self.__cache__ = WeakValueDictionary({self.element: self}) @classmethod def from_element(cls, element, xml_document=None): obj = super(XMLRootElement, cls).from_element(element, xml_document) obj.__cache__ = WeakValueDictionary({obj.element: obj}) return obj @classmethod def parse(cls, document): return cls._xml_document.parse(document) def toxml(self, encoding=None, pretty_print=False, validate=True): return self._xml_document.build(self, encoding=encoding, pretty_print=pretty_print, validate=validate) def xpath(self, xpath, namespaces=None): result = [] try: nodes = self.element.xpath(xpath, namespaces=namespaces) except etree.XPathError: raise ValueError("illegal XPath expression") for element in (node for node in nodes if isinstance(node, etree._Element)): if element in self.__cache__: result.append(self.__cache__[element]) continue if element is self.element: self.__cache__[element] = self result.append(self) continue for ancestor in element.iterancestors(): if ancestor in self.__cache__: container = self.__cache__[ancestor] break else: container = self notvisited = deque([container]) visited = set() while notvisited: container = notvisited.popleft() self.__cache__[container.element] = container if isinstance(container, XMLListMixin): children = set(child for child in container if isinstance(child, XMLElement) and child not in visited) visited.update(children) notvisited.extend(children) for child in container._xml_element_children: value = getattr(container, child) if value is not None and value not in visited: visited.add(value) notvisited.append(value) if element in self.__cache__: result.append(self.__cache__[element]) return result def get_xpath(self, element): raise NotImplementedError def find_parent(self, element): raise NotImplementedError ## Mixin classes class ThisClass(object): """ Special marker class that is used to indicate that an XMLListElement subclass can be an item of itself. This is necessary because a class cannot reference itself when defining _xml_item_type """ class XMLListMixinType(type): def __init__(cls, name, bases, dct): super(XMLListMixinType, cls).__init__(name, bases, dct) if '_xml_item_type' in dct: cls._xml_item_type = cls._xml_item_type # trigger __setattr__ def __setattr__(cls, name, value): if name == '_xml_item_type': if value is ThisClass: value = cls elif isinstance(value, tuple) and ThisClass in value: value = tuple(cls if type is ThisClass else type for type in value) if value is None: cls._xml_item_element_types = () cls._xml_item_extension_types = () else: item_types = value if isinstance(value, tuple) else (value,) cls._xml_item_element_types = tuple(type for type in item_types if issubclass(type, XMLElement)) cls._xml_item_extension_types = tuple(type for type in item_types if not issubclass(type, XMLElement)) super(XMLListMixinType, cls).__setattr__(name, value) class XMLListMixin(XMLElementBase, metaclass=XMLListMixinType): """A mixin representing a list of other XML elements""" _xml_item_type = None def __new__(cls, *args, **kw): if cls._xml_item_type is None: raise TypeError("The %s class cannot be instantiated because it doesn't define the _xml_item_type attribute" % cls.__name__) instance = super(XMLListMixin, cls).__new__(cls) instance._element_map = {} instance._xmlid_map = defaultdict(dict) return instance def __contains__(self, item): return item in iter(list(self._element_map.values())) def __iter__(self): return (self._element_map[element] for element in self.element if element in self._element_map) def __len__(self): return len(self._element_map) def __repr__(self): return '%s(%r)' % (self.__class__.__name__, list(self)) def __eq__(self, other): if isinstance(other, XMLListMixin): return self is other or (len(self) == len(other) and all(self_item == other_item for self_item, other_item in zip(self, other))) else: return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal def __getitem__(self, key): if key is IterateTypes: return (cls for cls, mapping in list(self._xmlid_map.items()) if mapping) if not isinstance(key, tuple): raise KeyError(key) try: cls, id = key except ValueError: raise KeyError(key) if id is IterateIDs: return iter(list(self._xmlid_map[cls].keys())) elif id is IterateItems: return iter(list(self._xmlid_map[cls].values())) else: return self._xmlid_map[cls][id] def __delitem__(self, key): if not isinstance(key, tuple): raise KeyError(key) try: cls, id = key except ValueError: raise KeyError(key) if id is All: for item in list(self._xmlid_map[cls].values()): self.remove(item) else: self.remove(self._xmlid_map[cls][id]) def __get_dirty__(self): return any(item.__dirty__ for item in list(self._element_map.values())) or super(XMLListMixin, self).__get_dirty__() def __set_dirty__(self, dirty): super(XMLListMixin, self).__set_dirty__(dirty) if not dirty: for item in list(self._element_map.values()): item.__dirty__ = dirty def _parse_element(self, element): super(XMLListMixin, self)._parse_element(element) self._element_map.clear() self._xmlid_map.clear() for child in element[:]: child_class = self._xml_document.get_element(child.tag, type(None)) if child_class in self._xml_item_element_types or issubclass(child_class, self._xml_item_extension_types): try: value = child_class.from_element(child, xml_document=self._xml_document) except ValidationError: pass else: if value._xml_id is not None and value._xml_id in self._xmlid_map[child_class]: element.remove(child) else: if value._xml_id is not None: self._xmlid_map[child_class][value._xml_id] = value self._element_map[value.element] = value def _build_element(self): super(XMLListMixin, self)._build_element() for child in list(self._element_map.values()): child.to_element() def add(self, item): if not (item.__class__ in self._xml_item_element_types or isinstance(item, self._xml_item_extension_types)): raise TypeError("%s cannot add items of type %s" % (self.__class__.__name__, item.__class__.__name__)) same_value = False if item._xml_id is not None and item._xml_id in self._xmlid_map[item.__class__]: old_item = self._xmlid_map[item.__class__][item._xml_id] if item is old_item: return elif item == old_item: item.__dirty__ = old_item.__dirty__ same_value = True self.element.remove(old_item.element) del self._xmlid_map[item.__class__][item._xml_id] del self._element_map[old_item.element] self._insert_element(item.element) if item._xml_id is not None: self._xmlid_map[item.__class__][item._xml_id] = item self._element_map[item.element] = item if not same_value: self.__dirty__ = True def remove(self, item): self.element.remove(item.element) if item._xml_id is not None: del self._xmlid_map[item.__class__][item._xml_id] del self._element_map[item.element] self.__dirty__ = True def update(self, sequence): for item in sequence: self.add(item) def clear(self): for item in list(self._element_map.values()): self.remove(item) ## Element types class XMLSimpleElement(XMLElement): _xml_value_type = None # To be defined in subclass def __new__(cls, *args, **kw): if cls._xml_value_type is None: raise TypeError("The %s class cannot be instantiated because it doesn't define the _xml_value_type attribute" % cls.__name__) return super(XMLSimpleElement, cls).__new__(cls) def __init__(self, value): XMLElement.__init__(self) self.value = value def __eq__(self, other): if isinstance(other, XMLSimpleElement): return self is other or self.value == other.value else: return self.value == other def __bool__(self): return bool(self.value) def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.value) def __str__(self): return str(self.value) def __unicode__(self): return str(self.value) @property def value(self): return self.__dict__['value'] @value.setter def value(self, value): if not isinstance(value, self._xml_value_type): value = self._xml_value_type(value) if self.__dict__.get('value', Null) == value: return self.__dict__['value'] = value self.__dirty__ = True def _parse_element(self, element): super(XMLSimpleElement, self)._parse_element(element) value = element.text or '' if hasattr(self._xml_value_type, '__xmlparse__'): self.value = self._xml_value_type.__xmlparse__(value) else: self.value = self._xml_value_type(value) def _build_element(self): super(XMLSimpleElement, self)._build_element() if hasattr(self.value, '__xmlbuild__'): self.element.text = self.value.__xmlbuild__() else: self.element.text = str(self.value) class XMLStringElement(XMLSimpleElement): _xml_value_type = str # Can be overwritten in subclasses def __len__(self): return len(self.value) class XMLLocalizedStringElement(XMLStringElement): lang = XMLAttribute('lang', xmlname='{http://www.w3.org/XML/1998/namespace}lang', type=str, required=False, test_equal=True) def __init__(self, value, lang=None): XMLStringElement.__init__(self, value) self.lang = lang def __eq__(self, other): if isinstance(other, XMLLocalizedStringElement): return self is other or (self.lang == other.lang and self.value == other.value) elif self.lang is None: return XMLStringElement.__eq__(self, other) else: return NotImplemented def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.value, self.lang) def _parse_element(self, element): super(XMLLocalizedStringElement, self)._parse_element(element) self.lang = element.get('{http://www.w3.org/XML/1998/namespace}lang', None) class XMLBooleanElement(XMLSimpleElement): _xml_value_type = Boolean class XMLByteElement(XMLSimpleElement): _xml_value_type = Byte class XMLUnsignedByteElement(XMLSimpleElement): _xml_value_type = UnsignedByte class XMLShortElement(XMLSimpleElement): _xml_value_type = Short class XMLUnsignedShortElement(XMLSimpleElement): _xml_value_type = UnsignedShort class XMLIntElement(XMLSimpleElement): _xml_value_type = Int class XMLUnsignedIntElement(XMLSimpleElement): _xml_value_type = UnsignedInt class XMLLongElement(XMLSimpleElement): _xml_value_type = Long class XMLUnsignedLongElement(XMLSimpleElement): _xml_value_type = UnsignedLong class XMLIntegerElement(XMLSimpleElement): _xml_value_type = int class XMLPositiveIntegerElement(XMLSimpleElement): _xml_value_type = PositiveInteger class XMLNegativeIntegerElement(XMLSimpleElement): _xml_value_type = NegativeInteger class XMLNonNegativeIntegerElement(XMLSimpleElement): _xml_value_type = NonNegativeInteger class XMLNonPositiveIntegerElement(XMLSimpleElement): _xml_value_type = NonPositiveInteger class XMLDecimalElement(XMLSimpleElement): _xml_value_type = Decimal class XMLDateTimeElement(XMLSimpleElement): _xml_value_type = DateTime class XMLAnyURIElement(XMLStringElement): _xml_value_type = AnyURI class XMLEmptyElement(XMLElement): def __repr__(self): return '%s()' % self.__class__.__name__ def __eq__(self, other): return type(self) is type(other) or NotImplemented def __hash__(self): return hash(self.__class__) class XMLEmptyElementRegistryType(type): """A metaclass for building registries of XMLEmptyElement subclasses from names""" def __init__(cls, name, bases, dct): super(XMLEmptyElementRegistryType, cls).__init__(name, bases, dct) typename = getattr(cls, '__typename__', name.partition('Registry')[0]).capitalize() class BaseElementType(XMLEmptyElement): def __str__(self): return self._xml_tag def __unicode__(self): return str(self._xml_tag) cls.__basetype__ = BaseElementType cls.__basetype__.__name__ = 'Base%sType' % typename cls.class_map = {} for name in cls.names: class ElementType(BaseElementType): _xml_tag = name _xml_namespace = cls._xml_namespace _xml_document = cls._xml_document _xml_id = name - ElementType.__name__ = typename + name.title().translate(None, '-_') + + translation_table = dict.fromkeys(map(ord, '-_'), None) + ElementType.__name__ = typename + name.title().translate(translation_table) cls.class_map[name] = ElementType cls.classes = tuple(cls.class_map[name] for name in cls.names) ## Created using mixins class XMLListElementType(XMLElementType, XMLListMixinType): pass class XMLListRootElementType(XMLRootElementType, XMLListMixinType): pass class XMLListElement(XMLElement, XMLListMixin, metaclass=XMLListElementType): def __bool__(self): if self._xml_attributes or self._xml_element_children: return True else: return len(self._element_map) != 0 class XMLListRootElement(XMLRootElement, XMLListMixin, metaclass=XMLListRootElementType): def __bool__(self): if self._xml_attributes or self._xml_element_children: return True else: return len(self._element_map) != 0 class XMLStringListElementType(XMLListElementType): def __init__(cls, name, bases, dct): if cls._xml_item_type is not None: raise TypeError("The %s class should not define _xml_item_type, but define _xml_item_registry, _xml_item_other_type and _xml_item_extension_type instead" % cls.__name__) types = cls._xml_item_registry.classes if cls._xml_item_registry is not None else () types += tuple(type for type in (cls._xml_item_other_type, cls._xml_item_extension_type) if type is not None) cls._xml_item_type = types or None super(XMLStringListElementType, cls).__init__(name, bases, dct) class XMLStringListElement(XMLListElement, metaclass=XMLStringListElementType): _xml_item_registry = None _xml_item_other_type = None _xml_item_extension_type = None def __contains__(self, item): if isinstance(item, str): if self._xml_item_registry is not None and item in self._xml_item_registry.names: item = self._xml_item_registry.class_map[item]() elif self._xml_item_other_type is not None: item = self._xml_item_other_type.from_string(item) return item in iter(list(self._element_map.values())) def __iter__(self): return (item if isinstance(item, self._xml_item_extension_types) else str(item) for item in super(XMLStringListElement, self).__iter__()) def add(self, item): if isinstance(item, str): if self._xml_item_registry is not None and item in self._xml_item_registry.names: item = self._xml_item_registry.class_map[item]() elif self._xml_item_other_type is not None: item = self._xml_item_other_type.from_string(item) super(XMLStringListElement, self).add(item) def remove(self, item): if isinstance(item, str): if self._xml_item_registry is not None and item in self._xml_item_registry.names: xmlitem = self._xml_item_registry.class_map[item]() try: item = next((entry for entry in super(XMLStringListElement, self).__iter__() if entry == xmlitem)) except StopIteration: raise KeyError(item) elif self._xml_item_other_type is not None: xmlitem = self._xml_item_other_type.from_string(item) try: item = next((entry for entry in super(XMLStringListElement, self).__iter__() if entry == xmlitem)) except StopIteration: raise KeyError(item) super(XMLStringListElement, self).remove(item) diff --git a/sipsimple/payloads/datatypes.py b/sipsimple/payloads/datatypes.py index 6e232f2c..603fc0a7 100644 --- a/sipsimple/payloads/datatypes.py +++ b/sipsimple/payloads/datatypes.py @@ -1,230 +1,230 @@ """Data types used for simple XML elements and for XML attributes""" __all__ = ['Boolean', 'DateTime', 'Byte', 'UnsignedByte', 'Short', 'UnsignedShort', 'Int', 'UnsignedInt', 'Long', 'UnsignedLong', 'PositiveInteger', 'NegativeInteger', 'NonNegativeInteger', 'NonPositiveInteger', 'ID', 'AnyURI', 'SIPURI', 'XCAPURI'] import re import urllib.request, urllib.parse, urllib.error import urllib.parse from sipsimple.util import ISOTimestamp class Boolean(int): def __new__(cls, value): return int.__new__(cls, bool(value)) def __repr__(self): return 'True' if self else 'False' __str__ = __repr__ @classmethod def __xmlparse__(cls, value): if value in ('True', 'true'): return int.__new__(cls, 1) elif value in ('False', 'false'): return int.__new__(cls, 0) else: raise ValueError("Invalid boolean string representation: %s" % value) def __xmlbuild__(self): return 'true' if self else 'false' class DateTime(ISOTimestamp): pass class Byte(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (-128 <= instance <= 127): raise ValueError("integer number must be a signed 8bit value") return instance class UnsignedByte(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (0 <= instance <= 255): raise ValueError("integer number must be an unsigned 8bit value") return instance class Short(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (-32768 <= instance <= 32767): raise ValueError("integer number must be a signed 16bit value") return instance class UnsignedShort(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (0 <= instance <= 65535): raise ValueError("integer number must be an unsigned 16bit value") return instance -class Int(long): +class Int(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (-2147483648 <= instance <= 2147483647): raise ValueError("integer number must be a signed 32bit value") return instance -class UnsignedInt(long): +class UnsignedInt(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (0 <= instance <= 4294967295): raise ValueError("integer number must be an unsigned 32bit value") return instance -class Long(long): +class Long(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (-9223372036854775808 <= instance <= 9223372036854775807): raise ValueError("integer number must be a signed 64bit value") return instance -class UnsignedLong(long): +class UnsignedLong(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (0 <= instance <= 18446744073709551615): raise ValueError("integer number must be an unsigned 64bit value") return instance -class PositiveInteger(long): +class PositiveInteger(int): def __new__(cls, value): instance = int.__new__(cls, value) if instance <= 0: raise ValueError("integer number must be a positive value") return instance -class NegativeInteger(long): +class NegativeInteger(int): def __new__(cls, value): instance = int.__new__(cls, value) if instance >= 0: raise ValueError("integer number must be a negative value") return instance -class NonNegativeInteger(long): +class NonNegativeInteger(int): def __new__(cls, value): instance = int.__new__(cls, value) if instance < 0: raise ValueError("integer number must be a non-negative value") return instance -class NonPositiveInteger(long): +class NonPositiveInteger(int): def __new__(cls, value): instance = int.__new__(cls, value) if instance > 0: raise ValueError("integer number must be a non-positive value") return instance class ID(str): _id_regex = re.compile(r'^[a-z_][a-z0-9_.-]*$', re.I) def __new__(cls, value): if not cls._id_regex.match(value): raise ValueError("illegal ID value: %s" % value) return str.__new__(cls, value) class AnyURI(str): @classmethod def __xmlparse__(cls, value): - return cls.__new__(cls, urllib.parse.unquote(value).decode('utf-8')) + return cls.__new__(cls, urllib.parse.unquote(value)) def __xmlbuild__(self): return urllib.parse.quote(self.encode('utf-8')) class SIPURI(AnyURI): _path_regex = re.compile(r'^((?P[^:@]+)(:(?P[^@]+))?@)?(?P.*)$') def __new__(cls, value): instance = AnyURI.__new__(cls, value) uri = urllib.parse.urlparse(instance) if uri.scheme not in ('sip', 'sips'): raise ValueError("illegal scheme for SIP URI: %s" % uri.scheme) instance.scheme = uri.scheme instance.__dict__.update(cls._path_regex.match(uri.path).groupdict()) instance.params = {} if uri.params: params = (param.split('=', 1) for param in uri.params.split(';')) for param in params: if not param[0]: raise ValueError("illegal SIP URI parameter name: %s" % param[0]) if len(param) == 1: param.append(None) elif '=' in param[1]: raise ValueError("illegal SIP URI parameter value: %s" % param[1]) instance.params[param[0]] = param[1] if uri.query: try: instance.headers = dict(header.split('=') for header in uri.query.split('&')) except ValueError: raise ValueError("illegal SIP URI headers: %s" % uri.query) else: for name, value in list(instance.headers.items()): if not name or not value: raise ValueError("illegal URI header: %s=%s" % (name, value)) else: instance.headers = {} return instance class XCAPURI(AnyURI): _path_regex = re.compile(r'^(?P/(([^/]+)/)*)?(?P[^/]+)/((?Pglobal)|(users/(?P[^/]+)))/(?P~?(([^~]+~)|([^~]+))*)(/~~(?P.*))?$') def __new__(cls, value): instance = AnyURI.__new__(cls, value) uri = urllib.parse.urlparse(instance) if uri.scheme not in ('http', 'https', ''): raise ValueError("illegal scheme for XCAP URI: %s" % uri.scheme) instance.scheme = uri.scheme instance.username = uri.username instance.password = uri.password instance.hostname = uri.hostname instance.port = uri.port instance.__dict__.update(cls._path_regex.match(uri.path).groupdict()) instance.globaltree = instance.globaltree is not None if uri.query: try: instance.query = dict(header.split('=') for header in uri.query.split('&')) except ValueError: raise ValueError("illegal XCAP URI query string: %s" % uri.query) else: for name, value in list(instance.query.items()): if not name or not value: raise ValueError("illegal XCAP URI query parameter: %s=%s" % (name, value)) else: instance.query = {} return instance relative = property(lambda self: self.scheme == '') diff --git a/sipsimple/payloads/rpid.py b/sipsimple/payloads/rpid.py index ce600b10..8583a24c 100644 --- a/sipsimple/payloads/rpid.py +++ b/sipsimple/payloads/rpid.py @@ -1,700 +1,700 @@ """ RPID handling according to RFC4480 This module provides an extension to PIDF to support rich presence. """ __all__ = ['namespace', 'ActivityElement', 'MoodElement', 'PlaceTypeElement', 'PrivacyElement', 'RelationshipElement', 'ServiceClassElement', 'SphereElement', 'Note', 'Other', 'Activities', 'Mood', 'PlaceIs', 'AudioPlaceInformation', 'VideoPlaceInformation', 'TextPlaceInformation', 'PlaceType', 'AudioPrivacy', 'TextPrivacy', 'VideoPrivacy', 'Privacy', 'Relationship', 'ServiceClass', 'Sphere', 'StatusIcon', 'TimeOffset', 'UserInput', 'Class'] from lxml import etree from sipsimple.payloads import ValidationError, XMLElementType, XMLEmptyElementRegistryType, XMLAttribute, XMLElementChild, XMLStringChoiceChild from sipsimple.payloads import XMLElement, XMLEmptyElement, XMLStringElement, XMLLocalizedStringElement, XMLStringListElement from sipsimple.payloads.pidf import PIDFDocument, ServiceExtension, PersonExtension, DeviceExtension, Note, NoteMap, NoteList, Service, Person, Device from sipsimple.payloads.datatypes import UnsignedLong, DateTime, ID namespace = 'urn:ietf:params:xml:ns:pidf:rpid' PIDFDocument.register_namespace(namespace, prefix='rpid', schema='rpid.xsd') ## Marker mixins class ActivityElement(object): pass class MoodElement(object): pass class PlaceTypeElement(object): pass class PrivacyElement(object): pass class RelationshipElement(object): pass class ServiceClassElement(object): pass class SphereElement(object): pass ## Attribute value types class AudioPlaceValue(str): def __new__(cls, value): if value not in ('noisy', 'ok', 'quiet', 'unknown'): raise ValueError("illegal value for audio place-is") return str.__new__(cls, value) class VideoPlaceValue(str): def __new__(cls, value): if value not in ('toobright', 'ok', 'dark', 'unknown'): raise ValueError("illegal value for video place-is") return str.__new__(cls, value) class TextPlaceValue(str): def __new__(cls, value): if value not in ('uncomfortable', 'inappropriate', 'ok', 'unknown'): raise ValueError("illegal value for text place-is") return str.__new__(cls, value) class UserInputValue(str): def __new__(cls, value): if value not in ('active', 'idle'): raise ValueError("illegal value for user-input") return str.__new__(cls, value) ## Elements class RPIDNote(XMLLocalizedStringElement): _xml_tag = 'note' _xml_namespace = namespace _xml_document = PIDFDocument def __unicode__(self): return Note(self.value, self.lang) @classmethod def from_string(cls, value): if isinstance(value, Note): return cls(value, value.lang) elif isinstance(value, str): return cls(value) else: raise ValueError("expected str/unicode instance, got %s instead" % value.__class__.__name__) class RPIDOther(XMLLocalizedStringElement): _xml_tag = 'other' _xml_namespace = namespace _xml_document = PIDFDocument def __unicode__(self): return Other(self.value, self.lang) @classmethod def from_string(cls, value): if isinstance(value, Other): return cls(value, value.lang) elif isinstance(value, str): return cls(value) else: raise ValueError("expected str/unicode instance, got %s instead" % value.__class__.__name__) class Other(Note): pass class ActivityRegistry(object, metaclass=XMLEmptyElementRegistryType): _xml_namespace = namespace _xml_document = PIDFDocument names = ('appointment', 'away', 'breakfast', 'busy', 'dinner', 'holiday', 'in-transit', 'looking-for-work', 'meal', 'meeting', 'on-the-phone', 'performance', 'permanent-absence', 'playing', 'presentation', 'shopping', 'sleeping', 'spectator', 'steering', 'travel', 'tv', 'vacation', 'working', 'worship', 'unknown') class Activities(XMLStringListElement, PersonExtension): _xml_tag = 'activities' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote.qname: 0} _xml_item_registry = ActivityRegistry _xml_item_other_type = RPIDOther _xml_item_extension_type = ActivityElement id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, activities=[], notes=[]): XMLElement.__init__(self) self.id = id self.since = since self.until = until self.update(activities) self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, Activities): return super(Activities, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, list(self), list(self.notes)) def _parse_element(self, element): super(Activities, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Activities, self)._build_element() self.notes._build_element() def add(self, activity): if isinstance(activity, str): if activity in self._xml_item_registry.names: activity = self._xml_item_registry.class_map[activity]() else: activity = self._xml_item_other_type.from_string(activity) unknown_activity = self._xml_item_registry.class_map['unknown']() if activity == unknown_activity or unknown_activity in iter(list(self._element_map.values())): self.clear() super(Activities, self).add(activity) def check_validity(self): if not self: raise ValidationError("Activity element must have at least one value") super(Activities, self).check_validity() Person.register_extension('activities', type=Activities) class MoodRegistry(object, metaclass=XMLEmptyElementRegistryType): _xml_namespace = namespace _xml_document = PIDFDocument names = ('afraid', 'amazed', 'angry', 'annoyed', 'anxious', 'ashamed', 'bored', 'brave', 'calm', 'cold', 'confused', 'contended', 'cranky', 'curious', 'depressed', 'disappointed', 'disgusted', 'distracted', 'embarrassed', 'excited', 'flirtatious', 'frustrated', 'grumpy', 'guilty', 'happy', 'hot', 'humbled', 'humiliated', 'hungry', 'hurt', 'impressed', 'in_awe', 'in_love', 'indignant', 'interested', 'invisible', 'jealous', 'lonely', 'mean', 'moody', 'nervous', 'neutral', 'offended', 'playful', 'proud', 'relieved', 'remorseful', 'restless', 'sad', 'sarcastic', 'serious', 'shocked', 'shy', 'sick', 'sleepy', 'stressed', 'surprised', 'thirsty', 'worried', 'unknown') class Mood(XMLStringListElement, PersonExtension): _xml_tag = 'mood' _xml_namespace = namespace _xml_document = PIDFDocument _xml_extension_type = MoodElement _xml_children_order = {RPIDNote.qname: 0} _xml_item_registry = MoodRegistry _xml_item_other_type = RPIDOther _xml_item_extension_type = MoodElement id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, moods=[], notes=[]): XMLElement.__init__(self) self.id = id self.since = since self.until = until self.update(moods) self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, Mood): return super(Mood, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, list(self), list(self.notes)) def _parse_element(self, element): super(Mood, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Mood, self)._build_element() self.notes._build_element() def add(self, mood): if isinstance(mood, str): if mood in self._xml_item_registry.names: mood = self._xml_item_registry.class_map[mood]() else: mood = self._xml_item_other_type.from_string(mood) unknown_mood = self._xml_item_registry.class_map['unknown']() if mood == unknown_mood or unknown_mood in iter(list(self._element_map.values())): self.clear() super(Mood, self).add(mood) def check_validity(self): if not self: raise ValidationError("Mood element must have at least one value") super(Mood, self).check_validity() Person.register_extension('mood', type=Mood) class AudioPlaceInformation(XMLStringElement): _xml_tag = 'audio' _xml_namespace = namespace _xml_document = PIDFDocument _xml_value_type = AudioPlaceValue class VideoPlaceInformation(XMLStringElement): _xml_tag = 'video' _xml_namespace = namespace _xml_document = PIDFDocument _xml_value_type = VideoPlaceValue class TextPlaceInformation(XMLStringElement): _xml_tag = 'text' _xml_namespace = namespace _xml_document = PIDFDocument _xml_value_type = TextPlaceValue class PlaceIs(XMLElement, PersonExtension): _xml_tag = 'place-is' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote.qname: 0, AudioPlaceInformation.qname: 1, VideoPlaceInformation.qname: 2, TextPlaceInformation.qname: 3} id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) audio = XMLElementChild('audio', type=AudioPlaceInformation, required=False, test_equal=True) video = XMLElementChild('video', type=VideoPlaceInformation, required=False, test_equal=True) text = XMLElementChild('text', type=TextPlaceInformation, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, audio=None, video=None, text=None, notes=[]): XMLElement.__init__(self) self.id = id self.since = since self.until = until self.audio = audio self.video = video self.text = text self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, PlaceIs): return super(PlaceIs, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, self.audio, self.video, self.text, list(self.notes)) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): self.notes._build_element() Person.register_extension('place_is', type=PlaceIs) class PlaceType(XMLElement, PersonExtension): _xml_tag = 'place-type' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote.qname: 0} id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) value = XMLStringChoiceChild('value', other_type=RPIDOther, extension_type=PlaceTypeElement) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, placetype=None, notes=[]): super(PlaceType, self).__init__() self.id = id self.since = since self.until = until self.value = placetype self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, PlaceType): return super(PlaceType, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, self.value, list(self.notes)) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): self.notes._build_element() Person.register_extension('place_type', type=PlaceType) class AudioPrivacy(XMLEmptyElement): _xml_tag = 'audio' _xml_namespace = namespace _xml_document = PIDFDocument def __init__(self, private=True): XMLEmptyElement.__init__(self) def __new__(cls, private=True): if not private: return None return XMLEmptyElement.__new__(cls) class TextPrivacy(XMLEmptyElement): _xml_tag = 'text' _xml_namespace = namespace _xml_document = PIDFDocument def __init__(self, private=True): XMLEmptyElement.__init__(self) def __new__(cls, private=True): if not private: return None return XMLEmptyElement.__new__(cls) class VideoPrivacy(XMLEmptyElement): _xml_tag = 'video' _xml_namespace = namespace _xml_document = PIDFDocument def __init__(self, private=True): XMLEmptyElement.__init__(self) def __new__(cls, private=True): if not private: return None return XMLEmptyElement.__new__(cls) class PrivacyType(XMLElementType): def __init__(cls, name, bases, dct): super(PrivacyType, cls).__init__(name, bases, dct) child_attributes = (getattr(cls, name) for name in dir(cls) if type(getattr(cls, name)) is XMLElementChild) cls._privacy_attributes = tuple(attr.name for attr in child_attributes if attr.name in ('audio', 'text', 'video') or issubclass(attr.type, PrivacyElement)) class Privacy(XMLElement, PersonExtension, metaclass=PrivacyType): _xml_tag = 'privacy' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote.qname: 0, AudioPrivacy.qname: 1, TextPrivacy.qname: 2, VideoPrivacy.qname: 3} id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) audio = XMLElementChild('audio', type=AudioPrivacy, required=False, test_equal=True) text = XMLElementChild('text', type=TextPrivacy, required=False, test_equal=True) video = XMLElementChild('video', type=VideoPrivacy, required=False, test_equal=True) unknown = property(lambda self: all(getattr(self, name) is None for name in self._privacy_attributes)) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, notes=[], audio=False, text=False, video=False): super(Privacy, self).__init__() self.id = id self.since = since self.until = until self.audio = audio self.text = text self.video = video self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, Privacy): return super(Privacy, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, list(self.notes), self.audio, self.text, self.video) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): if self.unknown: if self.element.find('{%s}unknown' % self._xml_namespace) is None: etree.SubElement(self.element, '{%s}unknown' % self._xml_namespace, nsmap=self._xml_document.nsmap) else: unknown_element = self.element.find('{%s}unknown' % self._xml_namespace) if unknown_element is not None: self.element.remove(unknown_element) self.notes._build_element() Person.register_extension('privacy', type=Privacy) class RelationshipRegistry(object, metaclass=XMLEmptyElementRegistryType): _xml_namespace = namespace _xml_document = PIDFDocument names = ('assistant', 'associate', 'family', 'friend', 'self', 'supervisor', 'unknown') class Relationship(XMLElement, ServiceExtension): _xml_tag = 'relationship' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote: 0} value = XMLStringChoiceChild('value', registry=RelationshipRegistry, other_type=RPIDOther, extension_type=RelationshipElement) _note_map = NoteMap() def __init__(self, relationship='self', notes=[]): XMLElement.__init__(self) self.value = relationship self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, Relationship): return super(Relationship, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.value, list(self.notes)) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): self.notes._build_element() Service.register_extension('relationship', type=Relationship) class ServiceClassRegistry(object, metaclass=XMLEmptyElementRegistryType): _xml_namespace = namespace _xml_document = PIDFDocument names = ('courier', 'electronic', 'freight', 'in-person', 'postal', 'unknown') class ServiceClass(XMLElement, ServiceExtension): _xml_tag = 'service-class' _xml_namespace = namespace _xml_document = PIDFDocument value = XMLStringChoiceChild('value', registry=ServiceClassRegistry, extension_type=ServiceClassElement) _note_map = NoteMap() def __init__(self, service_class=None, notes=[]): XMLElement.__init__(self) self.value = service_class self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, ServiceClass): return super(ServiceClass, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.value, list(self.notes)) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): self.notes._build_element() Service.register_extension('service_class', type=ServiceClass) class SphereRegistry(object, metaclass=XMLEmptyElementRegistryType): _xml_namespace = namespace _xml_document = PIDFDocument names = ('home', 'work', 'unknown') class Sphere(XMLElement, PersonExtension): _xml_tag = 'sphere' _xml_namespace = namespace _xml_document = PIDFDocument id = XMLAttribute('id', type=ID, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) value = XMLStringChoiceChild('value', registry=SphereRegistry, extension_type=SphereElement) def __init__(self, value=None, id=None, since=None, until=None): XMLElement.__init__(self) self.id = id self.since = since self.until = until self.value = value def __repr__(self): return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.value, self.id, self.since, self.until) Person.register_extension('sphere', type=Sphere) class StatusIcon(XMLStringElement, ServiceExtension, PersonExtension): _xml_tag = 'status-icon' _xml_namespace = namespace _xml_document = PIDFDocument id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) def __init__(self, value=None, id=None, since=None, until=None): XMLStringElement.__init__(self, value) self.id = id self.since = since self.until = until Person.register_extension('status_icon', type=StatusIcon) Service.register_extension('status_icon', type=StatusIcon) class TimeOffset(XMLStringElement, PersonExtension): _xml_tag = 'time-offset' _xml_namespace = namespace _xml_document = PIDFDocument id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) description = XMLAttribute('description', type=str, required=False, test_equal=True) def __init__(self, value=None, id=None, since=None, until=None, description=None): if value is None: - value = DateTime.now().utcoffset().seconds / 60 + value = int(DateTime.now().utcoffset().seconds / 60) XMLStringElement.__init__(self, str(value)) self.id = id self.since = since self.until = until self.description = description def __int__(self): return int(self.value) Person.register_extension('time_offset', type=TimeOffset) class UserInput(XMLStringElement, ServiceExtension, PersonExtension, DeviceExtension): _xml_tag = 'user-input' _xml_namespace = namespace _xml_document = PIDFDocument _xml_value_type = UserInputValue id = XMLAttribute('id', type=str, required=False, test_equal=True) last_input = XMLAttribute('last_input', xmlname='last-input', type=DateTime, required=False, test_equal=True) idle_threshold = XMLAttribute('idle_threshold', xmlname='idle-threshold', type=UnsignedLong, required=False, test_equal=True) def __init__(self, value='active', id=None, last_input=None, idle_threshold=None): XMLStringElement.__init__(self, value) self.id = id self.last_input = last_input self.idle_threshold = idle_threshold Service.register_extension('user_input', type=UserInput) Person.register_extension('user_input', type=UserInput) Device.register_extension('user_input', type=UserInput) class Class(XMLStringElement, ServiceExtension, PersonExtension, DeviceExtension): _xml_tag = 'class' _xml_namespace = namespace _xml_document = PIDFDocument Service.register_extension('rpid_class', type=Class) Person.register_extension('rpid_class', type=Class) Device.register_extension('rpid_class', type=Class)