diff --git a/sipsimple/payloads/datatypes.py b/sipsimple/payloads/datatypes.py index 7c74d3c4..ccf1516f 100644 --- a/sipsimple/payloads/datatypes.py +++ b/sipsimple/payloads/datatypes.py @@ -1,238 +1,233 @@ # Copyright (C) 2008-2011 AG Projects. See LICENSE for details. # """Data types used for simple XML elements and for XML attributes""" __all__ = ['Boolean', 'DateTime', 'Byte', 'UnsignedByte', 'Short', 'UnsignedShort', 'Int', 'UnsignedInt', 'Long', 'UnsignedLong', 'PositiveInteger', 'NegativeInteger', 'NonNegativeInteger', 'NonPositiveInteger', 'ID', 'AnyURI', 'SIPURI', 'XCAPURI'] import re import urllib import urlparse -from sipsimple.util import Timestamp +from sipsimple.util import ISOTimestamp class Boolean(int): def __new__(cls, value): return int.__new__(cls, bool(value)) def __repr__(self): return 'True' if self else 'False' __str__ = __repr__ @classmethod def __xmlparse__(cls, value): if value in ('True', 'true'): return int.__new__(cls, 1) elif value in ('False', 'false'): return int.__new__(cls, 0) else: raise ValueError("Invalid boolean string representation: %s" % value) def __xmlbuild__(self): return u'true' if self else u'false' -class DateTime(Timestamp): - @classmethod - def __xmlparse__(cls, value): - return cls.parse(value) - - def __xmlbuild__(self): - return self.format(self) +class DateTime(ISOTimestamp): + pass class Byte(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (-128 <= instance <= 127): raise ValueError("integer number must be a signed 8bit value") return instance class UnsignedByte(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (0 <= instance <= 255): raise ValueError("integer number must be an unsigned 8bit value") return instance class Short(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (-32768 <= instance <= 32767): raise ValueError("integer number must be a signed 16bit value") return instance class UnsignedShort(int): def __new__(cls, value): instance = int.__new__(cls, value) if not (0 <= instance <= 65535): raise ValueError("integer number must be an unsigned 16bit value") return instance class Int(long): def __new__(cls, value): instance = long.__new__(cls, value) if not (-2147483648 <= instance <= 2147483647): raise ValueError("integer number must be a signed 32bit value") return instance class UnsignedInt(long): def __new__(cls, value): instance = long.__new__(cls, value) if not (0 <= instance <= 4294967295): raise ValueError("integer number must be an unsigned 32bit value") return instance class Long(long): def __new__(cls, value): instance = long.__new__(cls, value) if not (-9223372036854775808 <= instance <= 9223372036854775807): raise ValueError("integer number must be a signed 64bit value") return instance class UnsignedLong(long): def __new__(cls, value): instance = long.__new__(cls, value) if not (0 <= instance <= 18446744073709551615): raise ValueError("integer number must be an unsigned 64bit value") return instance class PositiveInteger(long): def __new__(cls, value): instance = long.__new__(cls, value) if instance <= 0: raise ValueError("integer number must be a positive value") return instance class NegativeInteger(long): def __new__(cls, value): instance = long.__new__(cls, value) if instance >= 0: raise ValueError("integer number must be a negative value") return instance class NonNegativeInteger(long): def __new__(cls, value): instance = long.__new__(cls, value) if instance < 0: raise ValueError("integer number must be a non-negative value") return instance class NonPositiveInteger(long): def __new__(cls, value): instance = long.__new__(cls, value) if instance > 0: raise ValueError("integer number must be a non-positive value") return instance class ID(str): _id_regex = re.compile(r'^[a-z_][a-z0-9_.-]*$', re.I) def __new__(cls, value): if not cls._id_regex.match(value): raise ValueError("illegal ID value: %s" % value) return str.__new__(cls, value) class AnyURI(unicode): @classmethod def __xmlparse__(cls, value): return cls.__new__(cls, urllib.unquote(value).decode('utf-8')) def __xmlbuild__(self): return urllib.quote(self.encode('utf-8')) class SIPURI(AnyURI): _path_regex = re.compile(r'^((?P[^:@]+)(:(?P[^@]+))?@)?(?P.*)$') def __new__(cls, value): instance = AnyURI.__new__(cls, value) uri = urlparse.urlparse(instance) if uri.scheme not in ('sip', 'sips'): raise ValueError("illegal scheme for SIP URI: %s" % uri.scheme) instance.scheme = uri.scheme instance.__dict__.update(cls._path_regex.match(uri.path).groupdict()) instance.params = {} if uri.params: params = (param.split('=', 1) for param in uri.params.split(';')) for param in params: if not param[0]: raise ValueError("illegal SIP URI parameter name: %s" % param[0]) if len(param) == 1: param.append(None) elif '=' in param[1]: raise ValueError("illegal SIP URI parameter value: %s" % param[1]) instance.params[param[0]] = param[1] if uri.query: try: instance.headers = dict(header.split('=') for header in uri.query.split('&')) except ValueError: raise ValueError("illegal SIP URI headers: %s" % uri.query) else: for name, value in instance.headers.iteritems(): if not name or not value: raise ValueError("illegal URI header: %s=%s" % (name, value)) else: instance.headers = {} return instance class XCAPURI(AnyURI): _path_regex = re.compile(r'^(?P/(([^/]+)/)*)?(?P[^/]+)/((?Pglobal)|(users/(?P[^/]+)))/(?P~?(([^~]+~)|([^~]+))*)(/~~(?P.*))?$') def __new__(cls, value): instance = AnyURI.__new__(cls, value) uri = urlparse.urlparse(instance) if uri.scheme not in ('http', 'https', ''): raise ValueError("illegal scheme for XCAP URI: %s" % uri.scheme) instance.scheme = uri.scheme instance.username = uri.username instance.password = uri.password instance.hostname = uri.hostname instance.port = uri.port instance.__dict__.update(cls._path_regex.match(uri.path).groupdict()) instance.globaltree = instance.globaltree is not None if uri.query: try: instance.query = dict(header.split('=') for header in uri.query.split('&')) except ValueError: raise ValueError("illegal XCAP URI query string: %s" % uri.query) else: for name, value in instance.query.iteritems(): if not name or not value: raise ValueError("illegal XCAP URI query parameter: %s=%s" % (name, value)) else: instance.query = {} return instance relative = property(lambda self: self.scheme == '') diff --git a/sipsimple/payloads/pidf.py b/sipsimple/payloads/pidf.py index 0255dafe..47326a8e 100644 --- a/sipsimple/payloads/pidf.py +++ b/sipsimple/payloads/pidf.py @@ -1,530 +1,530 @@ # Copyright (C) 2008-2011 AG Projects. See LICENSE for details. # """PIDF handling according to RFC3863 and RFC4479""" __all__ = ['pidf_namespace', 'dm_namespace', 'PIDFDocument', 'ServiceExtension', 'DeviceExtension', 'PersonExtension', 'StatusExtension', 'Note', 'DeviceID', 'Status', 'Basic', 'Contact', 'ServiceTimestamp', 'Service', 'DeviceTimestamp', 'Device', 'PersonTimestamp', 'Person', 'PIDF', # Extensions 'ExtendedStatus', 'DeviceInfo'] from itertools import izip from application.python.weakref import weakobjectmap from sipsimple.payloads import ValidationError, XMLDocument, XMLListRootElement, XMLListElement, XMLElement, XMLAttribute, XMLElementID, XMLElementChild from sipsimple.payloads import XMLStringElement, XMLLocalizedStringElement, XMLDateTimeElement, XMLAnyURIElement from sipsimple.payloads.datatypes import AnyURI, ID, DateTime pidf_namespace = 'urn:ietf:params:xml:ns:pidf' dm_namespace = 'urn:ietf:params:xml:ns:pidf:data-model' class PIDFDocument(XMLDocument): content_type = 'application/pidf+xml' PIDFDocument.register_namespace(pidf_namespace, prefix=None, schema='pidf.xsd') PIDFDocument.register_namespace(dm_namespace, prefix='dm', schema='data-model.xsd') ## Marker mixin class ServiceExtension(object): pass class ServiceItemExtension(object): pass class DeviceExtension(object): pass class PersonExtension(object): pass class StatusExtension(object): pass ## Attribute value types class BasicStatusValue(str): def __new__(cls, value): if value not in ('closed', 'open'): raise ValueError('illegal BasicStatusValue') return str.__new__(cls, value) ## General elements class Note(unicode): def __new__(cls, value, lang=None): instance = unicode.__new__(cls, value) instance.lang = lang return instance def __repr__(self): return "%s(%s, lang=%r)" % (self.__class__.__name__, unicode.__repr__(self), self.lang) def __eq__(self, other): if isinstance(other, Note): return unicode.__eq__(self, other) and self.lang == other.lang elif isinstance(other, basestring): return self.lang is None and unicode.__eq__(self, other) else: return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal class PIDFNote(XMLLocalizedStringElement): _xml_tag = 'note' _xml_namespace = pidf_namespace _xml_document = PIDFDocument def __unicode__(self): return Note(self.value, self.lang) class DMNote(XMLLocalizedStringElement): _xml_tag = 'note' _xml_namespace = dm_namespace _xml_document = PIDFDocument def __unicode__(self): return Note(self.value, self.lang) class NoteMap(object): """Descriptor to be used for _note_map attributes on XML elements with notes""" def __init__(self): self.object_map = weakobjectmap() def __get__(self, obj, type): if obj is None: return self try: return self.object_map[obj] except KeyError: return self.object_map.setdefault(obj, {}) def __set__(self, obj, value): raise AttributeError("cannot set attribute") def __delete__(self, obj): raise AttributeError("cannot delete attribute") class NoteList(object): def __init__(self, xml_element, note_type): self.xml_element = xml_element self.note_type = note_type def __contains__(self, item): if isinstance(item, Note): item = self.note_type(item, item.lang) elif isinstance(item, basestring): item = self.note_type(item) return item in self.xml_element._note_map.itervalues() def __iter__(self): return (unicode(self.xml_element._note_map[element]) for element in self.xml_element.element if element in self.xml_element._note_map) def __len__(self): return len(self.xml_element._note_map) def __eq__(self, other): if isinstance(other, NoteList): return self is other or (len(self) == len(other) and all(self_item == other_item for self_item, other_item in izip(self, other))) else: return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal def _parse_element(self, element): self.xml_element._note_map.clear() for child in element: if child.tag == self.note_type.qname: try: note = self.note_type.from_element(child, xml_document=self.xml_element._xml_document) except ValidationError: pass else: self.xml_element._note_map[note.element] = note def _build_element(self): for note in self.xml_element._note_map.itervalues(): note.to_element() def add(self, item): if isinstance(item, Note): item = self.note_type(item, item.lang) elif isinstance(item, basestring): item = self.note_type(item) if type(item) is not self.note_type: raise TypeError("%s cannot add notes of type %s" % (self.xml_element.__class__.__name__, item.__class__.__name__)) self.xml_element._insert_element(item.element) self.xml_element._note_map[item.element] = item self.xml_element.__dirty__ = True def remove(self, item): if isinstance(item, Note): try: item = (entry for entry in self.xml_element._note_map.itervalues() if unicode(entry) == item).next() except StopIteration: raise KeyError(item) elif isinstance(item, basestring): try: item = (entry for entry in self.xml_element._note_map.itervalues() if entry == item).next() except StopIteration: raise KeyError(item) if type(item) is not self.note_type: raise KeyError(item) self.xml_element.element.remove(item.element) del self.xml_element._note_map[item.element] self.xml_element.__dirty__ = True def update(self, sequence): for item in sequence: self.add(item) def clear(self): for item in self.xml_element._note_map.values(): self.remove(item) class DeviceID(XMLStringElement): _xml_tag = 'deviceID' _xml_namespace = dm_namespace _xml_document = PIDFDocument ## Service elements class Basic(XMLStringElement): _xml_tag = 'basic' _xml_namespace = pidf_namespace _xml_document = PIDFDocument _xml_value_type = BasicStatusValue class Status(XMLElement): _xml_tag = 'status' _xml_namespace = pidf_namespace _xml_document = PIDFDocument _xml_extension_type = StatusExtension _xml_children_order = {Basic.qname: 0} basic = XMLElementChild('basic', type=Basic, required=False, test_equal=True) def __init__(self, basic=None): XMLElement.__init__(self) self.basic = basic def check_validity(self): if len(self.element) == 0: raise ValidationError("Status objects must have at least one child") super(Status, self).check_validity() def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.basic) class Contact(XMLAnyURIElement): _xml_tag = 'contact' _xml_namespace = pidf_namespace _xml_document = PIDFDocument priority = XMLAttribute('priority', type=float, required=False, test_equal=False) class ServiceTimestamp(XMLDateTimeElement): _xml_tag = 'timestamp' _xml_namespace = pidf_namespace _xml_document = PIDFDocument class Service(XMLListElement): _xml_tag = 'tuple' _xml_namespace = pidf_namespace _xml_document = PIDFDocument _xml_extension_type = ServiceExtension _xml_item_type = (DeviceID, ServiceItemExtension) _xml_children_order = {Status.qname: 0, None: 1, Contact.qname: 2, PIDFNote.qname: 3, ServiceTimestamp.qname: 4} id = XMLElementID('id', type=ID, required=True, test_equal=True) status = XMLElementChild('status', type=Status, required=True, test_equal=True) contact = XMLElementChild('contact', type=Contact, required=False, test_equal=True) timestamp = XMLElementChild('timestamp', type=ServiceTimestamp, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id, notes=[], status=None, contact=None, timestamp=None): XMLListElement.__init__(self) self.id = id self.status = status self.contact = contact self.timestamp = timestamp self.notes.update(notes) @property def notes(self): return NoteList(self, PIDFNote) def __eq__(self, other): if isinstance(other, Service): return super(Service, self).__eq__(other) and self.notes == other.notes else: return self.id == other def __repr__(self): return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.status, self.contact, self.timestamp) def _parse_element(self, element): super(Service, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Service, self)._build_element() self.notes._build_element() class DeviceTimestamp(XMLDateTimeElement): _xml_tag = 'timestamp' _xml_namespace = dm_namespace _xml_document = PIDFDocument class Device(XMLElement): _xml_tag = 'device' _xml_namespace = dm_namespace _xml_document = PIDFDocument _xml_extension_type = DeviceExtension _xml_children_order = {None: 0, DeviceID.qname: 1, DMNote.qname: 2, DeviceTimestamp.qname: 3} id = XMLElementID('id', type=ID, required=True, test_equal=True) device_id = XMLElementChild('device_id', type=DeviceID, required=False, test_equal=True) timestamp = XMLElementChild('timestamp', type=DeviceTimestamp, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id, device_id=None, notes=[], timestamp=None): XMLElement.__init__(self) self.id = id self.device_id = device_id self.timestamp = timestamp self.notes.update(notes) @property def notes(self): return NoteList(self, DMNote) def __eq__(self, other): if isinstance(other, Device): return super(Device, self).__eq__(other) and self.notes == other.notes else: return self.id == other def __repr__(self): return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.device_id, list(self.notes), self.timestamp) def _parse_element(self, element): super(Device, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Device, self)._build_element() self.notes._build_element() class PersonTimestamp(XMLDateTimeElement): _xml_tag = 'timestamp' _xml_namespace = dm_namespace _xml_document = PIDFDocument class Person(XMLElement): _xml_tag = 'person' _xml_namespace = dm_namespace _xml_document = PIDFDocument _xml_extension_type = PersonExtension _xml_children_order = {None: 0, DMNote.qname: 1, PersonTimestamp.qname: 2} id = XMLElementID('id', type=ID, required=True, test_equal=True) timestamp = XMLElementChild('timestamp', type=PersonTimestamp, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id, notes=[], timestamp=None): XMLElement.__init__(self) self.id = id self.timestamp = timestamp self.notes.update(notes) @property def notes(self): return NoteList(self, DMNote) def __eq__(self, other): if isinstance(other, Person): return super(Person, self).__eq__(other) and self.notes == other.notes else: return self.id == other def __repr__(self): return '%s(%r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.timestamp) def _parse_element(self, element): super(Person, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Person, self)._build_element() self.notes._build_element() class PIDF(XMLListRootElement): _xml_tag = 'presence' _xml_namespace = pidf_namespace _xml_document = PIDFDocument _xml_children_order = {Service.qname: 0, PIDFNote.qname: 1, Person.qname: 2, Device.qname: 3} _xml_item_type = (Service, PIDFNote, Person, Device) entity = XMLAttribute('entity', type=AnyURI, required=True, test_equal=True) services = property(lambda self: (item for item in self if type(item) is Service)) notes = property(lambda self: (item for item in self if type(item) is Note)) persons = property(lambda self: (item for item in self if type(item) is Person)) devices = property(lambda self: (item for item in self if type(item) is Device)) def __init__(self, entity, elements=[]): XMLListRootElement.__init__(self) self.entity = entity self.update(elements) def __contains__(self, item): if isinstance(item, Note): item = PIDFNote(item, item.lang) return super(PIDF, self).__contains__(item) def __iter__(self): return (unicode(item) if type(item) is PIDFNote else item for item in super(PIDF, self).__iter__()) def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.entity, list(self)) def add(self, item): if isinstance(item, Note): item = PIDFNote(item, item.lang) super(PIDF, self).add(item) def remove(self, item): if isinstance(item, Note): try: item = (entry for entry in super(PIDF, self).__iter__() if type(entry) is PIDFNote and unicode(entry) == item).next() except StopIteration: raise KeyError(item) super(PIDF, self).remove(item) # # Extensions # agp_pidf_namespace = 'urn:ag-projects:xml:ns:pidf' PIDFDocument.register_namespace(agp_pidf_namespace, prefix='agp-pidf') class ExtendedStatusValue(str): def __new__(cls, value): if value not in ('available', 'offline', 'away', 'extended-away', 'busy'): raise ValueError("illegal value for extended status") return str.__new__(cls, value) class ExtendedStatus(XMLStringElement, StatusExtension): _xml_tag = 'extended' _xml_namespace = agp_pidf_namespace _xml_document = PIDFDocument _xml_value_type = ExtendedStatusValue Status.register_extension('extended', type=ExtendedStatus) class Description(XMLStringElement): _xml_tag = 'description' _xml_namespace = agp_pidf_namespace _xml_document = PIDFDocument class UserAgent(XMLStringElement): _xml_tag = 'user-agent' _xml_namespace = agp_pidf_namespace _xml_document = PIDFDocument class TimeOffset(XMLStringElement): _xml_tag = 'time-offset' _xml_namespace = agp_pidf_namespace _xml_document = PIDFDocument description = XMLAttribute('description', type=unicode, required=False, test_equal=True) def __init__(self, value=None, description=None): if value is None: - value = DateTime.utc_offset() + value = DateTime.now().utcoffset().seconds / 60 XMLStringElement.__init__(self, str(value)) self.description = description def __int__(self): return int(self.value) class DeviceInfo(XMLElement, ServiceExtension): _xml_tag = 'device-info' _xml_namespace = agp_pidf_namespace _xml_document = PIDFDocument _xml_children_order = {Description.qname: 0, UserAgent.qname: 1} id = XMLElementID('id', type=str, required=True, test_equal=True) description = XMLElementChild('description', type=Description, required=False, test_equal=True) user_agent = XMLElementChild('user_agent', type=UserAgent, required=False, test_equal=True) time_offset = XMLElementChild('time_offset', type=TimeOffset, required=False, test_equal=True) def __init__(self, id, description=None, user_agent=None, time_offset=None): XMLElement.__init__(self) self.id = id self.description = description self.user_agent = user_agent self.time_offset = time_offset def __repr__(self): return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.description, self.user_agent, self.time_offset) Service.register_extension('device_info', type=DeviceInfo) diff --git a/sipsimple/payloads/rpid.py b/sipsimple/payloads/rpid.py index 5fd42f2c..905a3747 100644 --- a/sipsimple/payloads/rpid.py +++ b/sipsimple/payloads/rpid.py @@ -1,714 +1,714 @@ # Copyright (C) 2008-2011 AG Projects. See LICENSE for details. # """ RPID handling according to RFC4480 This module provides an extension to PIDF to support rich presence. """ __all__ = ['namespace', 'ActivityElement', 'MoodElement', 'PlaceTypeElement', 'PrivacyElement', 'RelationshipElement', 'ServiceClassElement', 'SphereElement', 'Note', 'Other', 'Activities', 'Mood', 'PlaceIs', 'AudioPlaceInformation', 'VideoPlaceInformation', 'TextPlaceInformation', 'PlaceType', 'AudioPrivacy', 'TextPrivacy', 'VideoPrivacy', 'Privacy', 'Relationship', 'ServiceClass', 'Sphere', 'StatusIcon', 'TimeOffset', 'UserInput', 'Class'] from lxml import etree from sipsimple.payloads import ValidationError, XMLElementType, XMLEmptyElementRegistryType, XMLAttribute, XMLElementChild, XMLStringChoiceChild from sipsimple.payloads import XMLElement, XMLEmptyElement, XMLStringElement, XMLLocalizedStringElement, XMLStringListElement from sipsimple.payloads.pidf import PIDFDocument, ServiceExtension, PersonExtension, DeviceExtension, Note, NoteMap, NoteList, Service, Person, Device from sipsimple.payloads.datatypes import UnsignedLong, DateTime, ID namespace = 'urn:ietf:params:xml:ns:pidf:rpid' PIDFDocument.register_namespace(namespace, prefix='rpid', schema='rpid.xsd') ## Marker mixins class ActivityElement(object): pass class MoodElement(object): pass class PlaceTypeElement(object): pass class PrivacyElement(object): pass class RelationshipElement(object): pass class ServiceClassElement(object): pass class SphereElement(object): pass ## Attribute value types class AudioPlaceValue(str): def __new__(cls, value): if value not in ('noisy', 'ok', 'quiet', 'unknown'): raise ValueError("illegal value for audio place-is") return str.__new__(cls, value) class VideoPlaceValue(str): def __new__(cls, value): if value not in ('toobright', 'ok', 'dark', 'unknown'): raise ValueError("illegal value for video place-is") return str.__new__(cls, value) class TextPlaceValue(str): def __new__(cls, value): if value not in ('uncomfortable', 'inappropriate', 'ok', 'unknown'): raise ValueError("illegal value for text place-is") return str.__new__(cls, value) class UserInputValue(str): def __new__(cls, value): if value not in ('active', 'idle'): raise ValueError("illegal value for user-input") return str.__new__(cls, value) ## Elements class RPIDNote(XMLLocalizedStringElement): _xml_tag = 'note' _xml_namespace = namespace _xml_document = PIDFDocument def __unicode__(self): return Note(self.value, self.lang) @classmethod def from_string(cls, value): if isinstance(value, Note): return cls(value, value.lang) elif isinstance(value, basestring): return cls(value) else: raise ValueError("expected str/unicode instance, got %s instead" % value.__class__.__name__) class RPIDOther(XMLLocalizedStringElement): _xml_tag = 'other' _xml_namespace = namespace _xml_document = PIDFDocument def __unicode__(self): return Other(self.value, self.lang) @classmethod def from_string(cls, value): if isinstance(value, Other): return cls(value, value.lang) elif isinstance(value, basestring): return cls(value) else: raise ValueError("expected str/unicode instance, got %s instead" % value.__class__.__name__) class Other(Note): pass class ActivityRegistry(object): __metaclass__ = XMLEmptyElementRegistryType _xml_namespace = namespace _xml_document = PIDFDocument names = ('appointment', 'away', 'breakfast', 'busy', 'dinner', 'holiday', 'in-transit', 'looking-for-work', 'meal', 'meeting', 'on-the-phone', 'performance', 'permanent-absence', 'playing', 'presentation', 'shopping', 'sleeping', 'spectator', 'steering', 'travel', 'tv', 'vacation', 'working', 'worship', 'unknown') class Activities(XMLStringListElement, PersonExtension): _xml_tag = 'activities' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote.qname: 0} _xml_item_registry = ActivityRegistry _xml_item_other_type = RPIDOther _xml_item_extension_type = ActivityElement id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, activities=[], notes=[]): XMLElement.__init__(self) self.id = id self.since = since self.until = until self.update(activities) self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, Activities): return super(Activities, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, list(self), list(self.notes)) def _parse_element(self, element): super(Activities, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Activities, self)._build_element() self.notes._build_element() def add(self, activity): if isinstance(activity, basestring): if activity in self._xml_item_registry.names: activity = self._xml_item_registry.class_map[activity]() else: activity = self._xml_item_other_type.from_string(activity) unknown_activity = self._xml_item_registry.class_map['unknown']() if activity == unknown_activity or unknown_activity in self._element_map.itervalues(): self.clear() super(Activities, self).add(activity) def check_validity(self): if not self: raise ValidationError("Activity element must have at least one value") super(Activities, self).check_validity() Person.register_extension('activities', type=Activities) class MoodRegistry(object): __metaclass__ = XMLEmptyElementRegistryType _xml_namespace = namespace _xml_document = PIDFDocument names = ('afraid', 'amazed', 'angry', 'annoyed', 'anxious', 'ashamed', 'bored', 'brave', 'calm', 'cold', 'confused', 'contended', 'cranky', 'curious', 'depressed', 'disappointed', 'disgusted', 'distracted', 'embarrassed', 'excited', 'flirtatious', 'frustrated', 'grumpy', 'guilty', 'happy', 'hot', 'humbled', 'humiliated', 'hungry', 'hurt', 'impressed', 'in_awe', 'in_love', 'indignant', 'interested', 'invisible', 'jealous', 'lonely', 'mean', 'moody', 'nervous', 'neutral', 'offended', 'playful', 'proud', 'relieved', 'remorseful', 'restless', 'sad', 'sarcastic', 'serious', 'shocked', 'shy', 'sick', 'sleepy', 'stressed', 'surprised', 'thirsty', 'worried', 'unknown') class Mood(XMLStringListElement, PersonExtension): _xml_tag = 'mood' _xml_namespace = namespace _xml_document = PIDFDocument _xml_extension_type = MoodElement _xml_children_order = {RPIDNote.qname: 0} _xml_item_registry = MoodRegistry _xml_item_other_type = RPIDOther _xml_item_extension_type = MoodElement id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, moods=[], notes=[]): XMLElement.__init__(self) self.id = id self.since = since self.until = until self.update(moods) self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, Mood): return super(Mood, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, list(self), list(self.notes)) def _parse_element(self, element): super(Mood, self)._parse_element(element) self.notes._parse_element(element) def _build_element(self): super(Mood, self)._build_element() self.notes._build_element() def add(self, mood): if isinstance(mood, basestring): if mood in self._xml_item_registry.names: mood = self._xml_item_registry.class_map[mood]() else: mood = self._xml_item_other_type.from_string(mood) unknown_mood = self._xml_item_registry.class_map['unknown']() if mood == unknown_mood or unknown_mood in self._element_map.itervalues(): self.clear() super(Mood, self).add(mood) def check_validity(self): if not self: raise ValidationError("Mood element must have at least one value") super(Mood, self).check_validity() Person.register_extension('mood', type=Mood) class AudioPlaceInformation(XMLStringElement): _xml_tag = 'audio' _xml_namespace = namespace _xml_document = PIDFDocument _xml_value_type = AudioPlaceValue class VideoPlaceInformation(XMLStringElement): _xml_tag = 'video' _xml_namespace = namespace _xml_document = PIDFDocument _xml_value_type = VideoPlaceValue class TextPlaceInformation(XMLStringElement): _xml_tag = 'text' _xml_namespace = namespace _xml_document = PIDFDocument _xml_value_type = TextPlaceValue class PlaceIs(XMLElement, PersonExtension): _xml_tag = 'place-is' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote.qname: 0, AudioPlaceInformation.qname: 1, VideoPlaceInformation.qname: 2, TextPlaceInformation.qname: 3} id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) audio = XMLElementChild('audio', type=AudioPlaceInformation, required=False, test_equal=True) video = XMLElementChild('video', type=VideoPlaceInformation, required=False, test_equal=True) text = XMLElementChild('text', type=TextPlaceInformation, required=False, test_equal=True) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, audio=None, video=None, text=None, notes=[]): XMLElement.__init__(self) self.id = id self.since = since self.until = until self.audio = audio self.video = video self.text = text self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, PlaceIs): return super(PlaceIs, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, self.audio, self.video, self.text, list(self.notes)) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): self.notes._build_element() Person.register_extension('place_is', type=PlaceIs) class PlaceType(XMLElement, PersonExtension): _xml_tag = 'place-type' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote.qname: 0} id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) value = XMLStringChoiceChild('value', other_type=RPIDOther, extension_type=PlaceTypeElement) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, placetype=None, notes=[]): super(PlaceType, self).__init__() self.id = id self.since = since self.until = until self.value = placetype self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, PlaceType): return super(PlaceType, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, self.value, list(self.notes)) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): self.notes._build_element() Person.register_extension('place_type', type=PlaceType) class AudioPrivacy(XMLEmptyElement): _xml_tag = 'audio' _xml_namespace = namespace _xml_document = PIDFDocument def __init__(self, private=True): XMLEmptyElement.__init__(self) def __new__(cls, private=True): if not private: return None return XMLEmptyElement.__new__(cls) class TextPrivacy(XMLEmptyElement): _xml_tag = 'text' _xml_namespace = namespace _xml_document = PIDFDocument def __init__(self, private=True): XMLEmptyElement.__init__(self) def __new__(cls, private=True): if not private: return None return XMLEmptyElement.__new__(cls) class VideoPrivacy(XMLEmptyElement): _xml_tag = 'video' _xml_namespace = namespace _xml_document = PIDFDocument def __init__(self, private=True): XMLEmptyElement.__init__(self) def __new__(cls, private=True): if not private: return None return XMLEmptyElement.__new__(cls) class PrivacyType(XMLElementType): def __init__(cls, name, bases, dct): super(PrivacyType, cls).__init__(name, bases, dct) child_attributes = (getattr(cls, name) for name in dir(cls) if type(getattr(cls, name)) is XMLElementChild) cls._privacy_attributes = tuple(attr.name for attr in child_attributes if attr.name in ('audio', 'text', 'video') or issubclass(attr.type, PrivacyElement)) class Privacy(XMLElement, PersonExtension): __metaclass__ = PrivacyType _xml_tag = 'privacy' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote.qname: 0, AudioPrivacy.qname: 1, TextPrivacy.qname: 2, VideoPrivacy.qname: 3} id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) audio = XMLElementChild('audio', type=AudioPrivacy, required=False, test_equal=True) text = XMLElementChild('text', type=TextPrivacy, required=False, test_equal=True) video = XMLElementChild('video', type=VideoPrivacy, required=False, test_equal=True) unknown = property(lambda self: all(getattr(self, name) is None for name in self._privacy_attributes)) _note_map = NoteMap() def __init__(self, id=None, since=None, until=None, notes=[], audio=False, text=False, video=False): super(Privacy, self).__init__() self.id = id self.since = since self.until = until self.audio = audio self.text = text self.video = video self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, Privacy): return super(Privacy, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, list(self.notes), self.audio, self.text, self.video) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): if self.unknown: if self.element.find('{%s}unknown' % self._xml_namespace) is None: etree.SubElement(self.element, '{%s}unknown' % self._xml_namespace, nsmap=self._xml_document.nsmap) else: unknown_element = self.element.find('{%s}unknown' % self._xml_namespace) if unknown_element is not None: self.element.remove(unknown_element) self.notes._build_element() Person.register_extension('privacy', type=Privacy) class RelationshipRegistry(object): __metaclass__ = XMLEmptyElementRegistryType _xml_namespace = namespace _xml_document = PIDFDocument names = ('assistant', 'associate', 'family', 'friend', 'self', 'supervisor', 'unknown') class Relationship(XMLElement, ServiceExtension): _xml_tag = 'relationship' _xml_namespace = namespace _xml_document = PIDFDocument _xml_children_order = {RPIDNote: 0} value = XMLStringChoiceChild('value', registry=RelationshipRegistry, other_type=RPIDOther, extension_type=RelationshipElement) _note_map = NoteMap() def __init__(self, relationship='self', notes=[]): XMLElement.__init__(self) self.value = relationship self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, Relationship): return super(Relationship, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.value, list(self.notes)) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): self.notes._build_element() Service.register_extension('relationship', type=Relationship) class ServiceClassRegistry(object): __metaclass__ = XMLEmptyElementRegistryType _xml_namespace = namespace _xml_document = PIDFDocument names = ('courier', 'electronic', 'freight', 'in-person', 'postal', 'unknown') class ServiceClass(XMLElement, ServiceExtension): _xml_tag = 'service-class' _xml_namespace = namespace _xml_document = PIDFDocument value = XMLStringChoiceChild('value', registry=ServiceClassRegistry, extension_type=ServiceClassElement) _note_map = NoteMap() def __init__(self, service_class=None, notes=[]): XMLElement.__init__(self) self.value = service_class self.notes.update(notes) @property def notes(self): return NoteList(self, RPIDNote) def __eq__(self, other): if isinstance(other, ServiceClass): return super(ServiceClass, self).__eq__(other) and self.notes == other.notes else: return NotImplemented def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.value, list(self.notes)) def _parse_element(self, element): self.notes._parse_element(element) def _build_element(self): self.notes._build_element() Service.register_extension('service_class', type=ServiceClass) class SphereRegistry(object): __metaclass__ = XMLEmptyElementRegistryType _xml_namespace = namespace _xml_document = PIDFDocument names = ('home', 'work', 'unknown') class Sphere(XMLElement, PersonExtension): _xml_tag = 'sphere' _xml_namespace = namespace _xml_document = PIDFDocument id = XMLAttribute('id', type=ID, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) value = XMLStringChoiceChild('value', registry=SphereRegistry, extension_type=SphereElement) def __init__(self, value=None, id=None, since=None, until=None): XMLElement.__init__(self) self.id = id self.since = since self.until = until self.value = value def __repr__(self): return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.value, self.id, self.since, self.until) Person.register_extension('sphere', type=Sphere) class StatusIcon(XMLStringElement, ServiceExtension, PersonExtension): _xml_tag = 'status-icon' _xml_namespace = namespace _xml_document = PIDFDocument id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) def __init__(self, value=None, id=None, since=None, until=None): XMLStringElement.__init__(self, value) self.id = id self.since = since self.until = until Person.register_extension('status_icon', type=StatusIcon) Service.register_extension('status_icon', type=StatusIcon) class TimeOffset(XMLStringElement, PersonExtension): _xml_tag = 'time-offset' _xml_namespace = namespace _xml_document = PIDFDocument id = XMLAttribute('id', type=str, required=False, test_equal=True) since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True) until = XMLAttribute('until', type=DateTime, required=False, test_equal=True) description = XMLAttribute('description', type=str, required=False, test_equal=True) def __init__(self, value=None, id=None, since=None, until=None, description=None): if value is None: - value = DateTime.utc_offset() + value = DateTime.now().utcoffset().seconds / 60 XMLStringElement.__init__(self, str(value)) self.id = id self.since = since self.until = until self.description = description def __int__(self): return int(self.value) Person.register_extension('time_offset', type=TimeOffset) class UserInput(XMLStringElement, ServiceExtension, PersonExtension, DeviceExtension): _xml_tag = 'user-input' _xml_namespace = namespace _xml_document = PIDFDocument _xml_value_type = UserInputValue id = XMLAttribute('id', type=str, required=False, test_equal=True) last_input = XMLAttribute('last_input', xmlname='last-input', type=DateTime, required=False, test_equal=True) idle_threshold = XMLAttribute('idle_threshold', xmlname='idle-threshold', type=UnsignedLong, required=False, test_equal=True) def __init__(self, value='active', id=None, last_input=None, idle_threshold=None): XMLStringElement.__init__(self, value) self.id = id self.last_input = last_input self.idle_threshold = idle_threshold Service.register_extension('user_input', type=UserInput) Person.register_extension('user_input', type=UserInput) Device.register_extension('user_input', type=UserInput) class Class(XMLStringElement, ServiceExtension, PersonExtension, DeviceExtension): _xml_tag = 'class' _xml_namespace = namespace _xml_document = PIDFDocument Service.register_extension('rpid_class', type=Class) Person.register_extension('rpid_class', type=Class) Device.register_extension('rpid_class', type=Class) diff --git a/sipsimple/streams/applications/chat.py b/sipsimple/streams/applications/chat.py index ce1b088f..996c5d87 100644 --- a/sipsimple/streams/applications/chat.py +++ b/sipsimple/streams/applications/chat.py @@ -1,252 +1,252 @@ # Copyright (C) 2008-2011 AG Projects. See LICENSE for details. # """Chat related objects, including CPIM support as defined in RFC3862""" __all__ = ['ChatIdentity', 'ChatMessage', 'CPIMParserError', 'CPIMIdentity', 'CPIMHeader', 'CPIMMessage'] import codecs import re from email.message import Message from email.parser import Parser from types import NoneType from sipsimple.core import SIPURI, BaseSIPURI -from sipsimple.util import MultilingualText, Timestamp +from sipsimple.util import MultilingualText, ISOTimestamp class ChatIdentity(object): def __init__(self, uri, display_name=None): self.uri = uri self.display_name = display_name def __eq__(self, other): if isinstance(other, ChatIdentity): return self.uri.user == other.uri.user and self.uri.host == other.uri.host elif isinstance(other, BaseSIPURI): return self.uri.user == other.user and self.uri.host == other.host elif isinstance(other, basestring): try: other_uri = SIPURI.parse(other) except Exception: return False else: return self.uri.user == other_uri.user and self.uri.host == other_uri.host else: return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal def __unicode__(self): if self.display_name: return u'%s <%s>' % (self.display_name, self.uri) else: return u'<%s>' % self.uri class ChatMessage(object): def __init__(self, body, content_type, sender=None, recipient=None, timestamp=None): self.body = body self.content_type = content_type self.sender = sender self.recipients = [recipient] if recipient is not None else [] self.courtesy_recipients = [] self.subject = None self.timestamp = timestamp self.required = [] self.additional_headers = [] ## CPIM support class CPIMParserError(Exception): pass class CPIMCodec(codecs.Codec): character_map = dict((c, u'\\u%04x' % c) for c in range(32) + [127]) character_map[ord(u'\\')] = u'\\\\' @classmethod def encode(cls, input, errors='strict'): return input.translate(cls.character_map).encode('utf-8', errors), len(input) @classmethod def decode(cls, input, errors='strict'): return input.decode('utf-8', errors).encode('raw-unicode-escape', errors).decode('unicode-escape', errors), len(input) def cpim_codec_search(name): if name.lower() in ('cpim-headers', 'cpim_headers'): return codecs.CodecInfo(name='CPIM-headers', encode=CPIMCodec.encode, decode=CPIMCodec.decode, incrementalencoder=codecs.IncrementalEncoder, incrementaldecoder=codecs.IncrementalDecoder, streamwriter=codecs.StreamWriter, streamreader=codecs.StreamReader) codecs.register(cpim_codec_search) del cpim_codec_search class Namespace(unicode): def __new__(cls, value, prefix=''): obj = unicode.__new__(cls, value) obj.prefix = prefix return obj class CPIMHeader(object): def __init__(self, name, namespace, value): self.name = name self.namespace = namespace self.value = value class CPIMIdentity(ChatIdentity): _re_format = re.compile(r'^("?(?P[^<]*[^"\s])"?)?\s*<(?Psips?:.+)>$') @classmethod def parse(cls, value): if isinstance(value, str): value = value.decode('cpim-headers') match = cls._re_format.match(value) if not match: raise ValueError('Cannot parse message/cpim identity header value: %r' % value) groupdict = match.groupdict() display_name = groupdict['display_name'] uri = groupdict['uri'] uri = SIPURI.parse(str(uri)) # FIXME: SIPURI is not unicode friendly and expects a str. -Luci return cls(uri, display_name) class CPIMMessage(ChatMessage): standard_namespace = u'urn:ietf:params:cpim-headers:' headers_re = re.compile(r'(?:([^:]+?)\.)?(.+?):\s*(.+?)\r\n') subject_re = re.compile(r'^(?:;lang=([a-z]{1,8}(?:-[a-z0-9]{1,8})*)\s+)?(.*)$') namespace_re = re.compile(r'^(?:(\S+) ?)?<(.*)>$') def __init__(self, body, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None): self.body = body self.content_type = content_type self.sender = sender self.recipients = recipients if recipients is not None else [] self.courtesy_recipients = courtesy_recipients if courtesy_recipients is not None else [] self.subject = subject if isinstance(subject, (MultilingualText, NoneType)) else MultilingualText(subject) - self.timestamp = timestamp + self.timestamp = ISOTimestamp(timestamp) if timestamp is not None else None self.required = required if required is not None else [] self.additional_headers = additional_headers if additional_headers is not None else [] def __str__(self): headers = [] if self.sender: headers.append(u'From: %s' % self.sender) for recipient in self.recipients: headers.append(u'To: %s' % recipient) for recipient in self.courtesy_recipients: headers.append(u'cc: %s' % recipient) if self.subject: headers.append(u'Subject: %s' % self.subject) if self.subject is not None: for lang, translation in self.subject.translations.iteritems(): headers.append(u'Subject:;lang=%s %s' % (lang, translation)) if self.timestamp: - headers.append(u'DateTime: %s' % Timestamp.format(self.timestamp)) + headers.append(u'DateTime: %s' % self.timestamp) if self.required: headers.append(u'Required: %s' % ','.join(self.required)) namespaces = {u'': self.standard_namespace} for header in self.additional_headers: if namespaces.get(header.namespace.prefix, None) != header.namespace: if header.namespace.prefix: headers.append(u'NS: %s <%s>' % (header.namespace.prefix, header.namespace)) else: headers.append(u'NS: <%s>' % header.namespace) namespaces[header.namespace.prefix] = header.namespace if header.namespace.prefix: headers.append(u'%s.%s: %s' % (header.namespace.prefix, header.name, header.value)) else: headers.append(u'%s: %s' % (header.name, header.value)) headers.append(u'') headers = '\r\n'.join(s.encode('cpim-headers') for s in headers) message = Message() message.set_type(self.content_type) if isinstance(self.body, unicode): message.set_param('charset', 'utf-8') message.set_payload(self.body.encode('utf-8')) else: message.set_payload(self.body) return headers + '\r\n' + message.as_string() @classmethod def parse(cls, string): message = cls('', None) try: headers_end = string.index('\r\n\r\n') except ValueError: raise CPIMParserError('Invalid CPIM message') else: headers = cls.headers_re.findall(buffer(string, 0, headers_end+2)) body = buffer(string, headers_end+4) namespaces = {u'': Namespace(cls.standard_namespace, u'')} subjects = {} for prefix, name, value in headers: if '.' in name: continue namespace = namespaces.get(prefix) if not namespace: continue try: value = value.decode('cpim-headers') if name == 'From' and namespace == cls.standard_namespace: message.sender = CPIMIdentity.parse(value) elif name == 'To' and namespace == cls.standard_namespace: message.recipients.append(CPIMIdentity.parse(value)) elif name == 'cc' and namespace == cls.standard_namespace: message.courtesy_recipients.append(CPIMIdentity.parse(value)) elif name == 'Subject' and namespace == cls.standard_namespace: match = cls.subject_re.match(value) if match is None: raise ValueError('Illegal Subject header: %r' % value) lang, subject = match.groups() # language tags must be ASCII subjects[str(lang) if lang is not None else None] = subject elif name == 'DateTime' and namespace == cls.standard_namespace: - message.timestamp = Timestamp.parse(value) + message.timestamp = ISOTimestamp(value) elif name == 'Required' and namespace == cls.standard_namespace: message.required.extend(re.split(r'\s*,\s*', value)) elif name == 'NS' and namespace == cls.standard_namespace: match = cls.namespace_re.match(value) if match is None: raise ValueError('Illegal NS header: %r' % value) prefix, uri = match.groups() namespaces[prefix] = Namespace(uri, prefix) else: message.additional_headers.append(CPIMHeader(name, namespace, value)) except ValueError: pass if None in subjects: message.subject = MultilingualText(subjects.pop(None), **subjects) else: message.subject = MultilingualText(**subjects) mime_message = Parser().parsestr(body) message.content_type = mime_message.get_content_type() if message.content_type.startswith('multipart/') or message.content_type == 'message/rfc822': message.body = mime_message.get_payload() elif message.content_type.startswith('text/'): message.body = mime_message.get_payload().decode(mime_message.get_content_charset() or 'utf-8') else: message.body = mime_message.get_payload() if message.content_type is None: raise CPIMParserError("CPIM message missing Content-Type MIME header") return message diff --git a/sipsimple/util.py b/sipsimple/util.py index 1380328e..6ad8aa39 100644 --- a/sipsimple/util.py +++ b/sipsimple/util.py @@ -1,145 +1,105 @@ # Copyright (C) 2008-2011 AG Projects. See LICENSE for details. # """Implements utilities commonly used in various parts of the library""" from __future__ import absolute_import -__all__ = ["All", "Any", "MultilingualText", "Timestamp", "user_info"] +__all__ = ["All", "Any", "ISOTimestamp", "MultilingualText", "user_info"] import os import platform -import re import sys +import dateutil.parser from application.python.types import Singleton, MarkerType from datetime import datetime -from dateutil.tz import tzoffset +from dateutil.tz import tzlocal, tzutc # Utility classes # class All(object): __metaclass__ = MarkerType class Any(object): __metaclass__ = MarkerType +class ISOTimestamp(datetime): + def __new__(cls, *args, **kw): + if len(args) == 1: + value = args[0] + if isinstance(value, cls): + return value + elif isinstance(value, basestring): + value = dateutil.parser.parse(value) + return cls(value.year, value.month, value.day, value.hour, value.minute, value.second, value.microsecond, value.tzinfo) + elif isinstance(value, datetime): + return cls(value.year, value.month, value.day, value.hour, value.minute, value.second, value.microsecond, value.tzinfo or tzlocal()) + else: + return datetime.__new__(cls, *args, **kw) + else: + if len(args) < 8 and 'tzinfo' not in kw: + kw['tzinfo'] = tzlocal() + return datetime.__new__(cls, *args, **kw) + + def __str__(self): + return self.isoformat() + + @classmethod + def now(cls): + return cls(datetime.now(tzlocal())) + + @classmethod + def utcnow(cls): + return cls(datetime.now(tzutc())) + + class MultilingualText(unicode): def __new__(cls, *args, **translations): if len(args) > 1: raise TypeError("%s.__new__ takes at most 1 positional argument (%d given)" % (cls.__name__, len(args))) default = args[0] if args else translations.get('en', u'') obj = unicode.__new__(cls, default) obj.translations = translations return obj def get_translation(self, language): return self.translations.get(language, self) -class Timestamp(datetime): - _timestamp_re = re.compile(r'(?P\d{4})-(?P\d{2})-(?P\d{2})T(?P\d{2}):(?P\d{2}):(?P\d{2})(\.(?P\d{1,}))?((?PZ)|((?P\+|-)(?P\d{2}):(?P\d{2})))') - - @classmethod - def utc_offset(cls): - timediff = datetime.now() - datetime.utcnow() - return int(round((timediff.days*86400 + timediff.seconds + timediff.microseconds/1000000.0)/60)) - - @classmethod - def parse(cls, stamp): - if stamp is None: - return None - match = cls._timestamp_re.match(stamp) - if match is None: - raise ValueError("Timestamp %s is not in RFC3339 format" % stamp) - dct = match.groupdict() - if dct['UTC'] is not None: - secoffset = 0 - else: - secoffset = int(dct['tzminute'])*60 + int(dct['tzhour'])*3600 - if dct['tzsign'] == '-': - secoffset *= -1 - tzinfo = tzoffset(None, secoffset) - if dct['secfrac'] is not None: - secfrac = dct['secfrac'][:6] - secfrac += '0'*(6-len(secfrac)) - secfrac = int(secfrac) - else: - secfrac = 0 - dt = datetime(int(dct['year']), month=int(dct['month']), day=int(dct['day']), - hour=int(dct['hour']), minute=int(dct['minute']), second=int(dct['second']), - microsecond=secfrac, tzinfo=tzinfo) - return cls(dt) - - @classmethod - def format(cls, dt): - if dt is None: - return None - if dt.tzinfo is not None: - return dt.replace(microsecond=0).isoformat() - minutes = cls.utc_offset() - if minutes == 0: - tzspec = 'Z' - else: - if minutes < 0: - sign = '-' - minutes *= -1 - else: - sign = '+' - hours = minutes / 60 - minutes = minutes % 60 - tzspec = '%s%02d:%02d' % (sign, hours, minutes) - return dt.replace(microsecond=0).isoformat()+tzspec - - def __new__(cls, value, *args, **kwargs): - if isinstance(value, cls): - return value - elif isinstance(value, datetime): - return cls(value.year, month=value.month, day=value.day, - hour=value.hour, minute=value.minute, second=value.second, - microsecond=value.microsecond, tzinfo=value.tzinfo) - elif isinstance(value, basestring): - return cls.parse(value) - else: - return datetime.__new__(cls, value, *args, **kwargs) - - def __str__(self): - return self.format(self) - - # Utility objects # class UserInfo(object): __metaclass__ = Singleton def __repr__(self): attribs = ', '.join('%s=%r' % (attr, getattr(self, attr)) for attr in ('username', 'fullname')) return '%s(%s)' % (self.__class__.__name__, attribs) @property def username(self): if platform.system() == 'Windows': name = os.getenv('USERNAME') else: import pwd name = pwd.getpwuid(os.getuid()).pw_name return name.decode(sys.getfilesystemencoding()) @property def fullname(self): if platform.system() == 'Windows': name = os.getenv('USERNAME') else: import pwd name = pwd.getpwuid(os.getuid()).pw_gecos.split(',', 1)[0] or pwd.getpwuid(os.getuid()).pw_name return name.decode(sys.getfilesystemencoding()) user_info = UserInfo() del UserInfo