Page MenuHomePhabricator

No OneTemporary

diff --git a/sipsimple/payloads/datatypes.py b/sipsimple/payloads/datatypes.py
index 7c74d3c4..ccf1516f 100644
--- a/sipsimple/payloads/datatypes.py
+++ b/sipsimple/payloads/datatypes.py
@@ -1,238 +1,233 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""Data types used for simple XML elements and for XML attributes"""
__all__ = ['Boolean', 'DateTime', 'Byte', 'UnsignedByte', 'Short', 'UnsignedShort', 'Int', 'UnsignedInt', 'Long', 'UnsignedLong',
'PositiveInteger', 'NegativeInteger', 'NonNegativeInteger', 'NonPositiveInteger', 'ID', 'AnyURI', 'SIPURI', 'XCAPURI']
import re
import urllib
import urlparse
-from sipsimple.util import Timestamp
+from sipsimple.util import ISOTimestamp
class Boolean(int):
def __new__(cls, value):
return int.__new__(cls, bool(value))
def __repr__(self):
return 'True' if self else 'False'
__str__ = __repr__
@classmethod
def __xmlparse__(cls, value):
if value in ('True', 'true'):
return int.__new__(cls, 1)
elif value in ('False', 'false'):
return int.__new__(cls, 0)
else:
raise ValueError("Invalid boolean string representation: %s" % value)
def __xmlbuild__(self):
return u'true' if self else u'false'
-class DateTime(Timestamp):
- @classmethod
- def __xmlparse__(cls, value):
- return cls.parse(value)
-
- def __xmlbuild__(self):
- return self.format(self)
+class DateTime(ISOTimestamp):
+ pass
class Byte(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (-128 <= instance <= 127):
raise ValueError("integer number must be a signed 8bit value")
return instance
class UnsignedByte(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (0 <= instance <= 255):
raise ValueError("integer number must be an unsigned 8bit value")
return instance
class Short(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (-32768 <= instance <= 32767):
raise ValueError("integer number must be a signed 16bit value")
return instance
class UnsignedShort(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if not (0 <= instance <= 65535):
raise ValueError("integer number must be an unsigned 16bit value")
return instance
class Int(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (-2147483648 <= instance <= 2147483647):
raise ValueError("integer number must be a signed 32bit value")
return instance
class UnsignedInt(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (0 <= instance <= 4294967295):
raise ValueError("integer number must be an unsigned 32bit value")
return instance
class Long(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (-9223372036854775808 <= instance <= 9223372036854775807):
raise ValueError("integer number must be a signed 64bit value")
return instance
class UnsignedLong(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if not (0 <= instance <= 18446744073709551615):
raise ValueError("integer number must be an unsigned 64bit value")
return instance
class PositiveInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance <= 0:
raise ValueError("integer number must be a positive value")
return instance
class NegativeInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance >= 0:
raise ValueError("integer number must be a negative value")
return instance
class NonNegativeInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance < 0:
raise ValueError("integer number must be a non-negative value")
return instance
class NonPositiveInteger(long):
def __new__(cls, value):
instance = long.__new__(cls, value)
if instance > 0:
raise ValueError("integer number must be a non-positive value")
return instance
class ID(str):
_id_regex = re.compile(r'^[a-z_][a-z0-9_.-]*$', re.I)
def __new__(cls, value):
if not cls._id_regex.match(value):
raise ValueError("illegal ID value: %s" % value)
return str.__new__(cls, value)
class AnyURI(unicode):
@classmethod
def __xmlparse__(cls, value):
return cls.__new__(cls, urllib.unquote(value).decode('utf-8'))
def __xmlbuild__(self):
return urllib.quote(self.encode('utf-8'))
class SIPURI(AnyURI):
_path_regex = re.compile(r'^((?P<username>[^:@]+)(:(?P<password>[^@]+))?@)?(?P<domain>.*)$')
def __new__(cls, value):
instance = AnyURI.__new__(cls, value)
uri = urlparse.urlparse(instance)
if uri.scheme not in ('sip', 'sips'):
raise ValueError("illegal scheme for SIP URI: %s" % uri.scheme)
instance.scheme = uri.scheme
instance.__dict__.update(cls._path_regex.match(uri.path).groupdict())
instance.params = {}
if uri.params:
params = (param.split('=', 1) for param in uri.params.split(';'))
for param in params:
if not param[0]:
raise ValueError("illegal SIP URI parameter name: %s" % param[0])
if len(param) == 1:
param.append(None)
elif '=' in param[1]:
raise ValueError("illegal SIP URI parameter value: %s" % param[1])
instance.params[param[0]] = param[1]
if uri.query:
try:
instance.headers = dict(header.split('=') for header in uri.query.split('&'))
except ValueError:
raise ValueError("illegal SIP URI headers: %s" % uri.query)
else:
for name, value in instance.headers.iteritems():
if not name or not value:
raise ValueError("illegal URI header: %s=%s" % (name, value))
else:
instance.headers = {}
return instance
class XCAPURI(AnyURI):
_path_regex = re.compile(r'^(?P<root>/(([^/]+)/)*)?(?P<auid>[^/]+)/((?P<globaltree>global)|(users/(?P<userstree>[^/]+)))/(?P<document>~?(([^~]+~)|([^~]+))*)(/~~(?P<node>.*))?$')
def __new__(cls, value):
instance = AnyURI.__new__(cls, value)
uri = urlparse.urlparse(instance)
if uri.scheme not in ('http', 'https', ''):
raise ValueError("illegal scheme for XCAP URI: %s" % uri.scheme)
instance.scheme = uri.scheme
instance.username = uri.username
instance.password = uri.password
instance.hostname = uri.hostname
instance.port = uri.port
instance.__dict__.update(cls._path_regex.match(uri.path).groupdict())
instance.globaltree = instance.globaltree is not None
if uri.query:
try:
instance.query = dict(header.split('=') for header in uri.query.split('&'))
except ValueError:
raise ValueError("illegal XCAP URI query string: %s" % uri.query)
else:
for name, value in instance.query.iteritems():
if not name or not value:
raise ValueError("illegal XCAP URI query parameter: %s=%s" % (name, value))
else:
instance.query = {}
return instance
relative = property(lambda self: self.scheme == '')
diff --git a/sipsimple/payloads/pidf.py b/sipsimple/payloads/pidf.py
index 0255dafe..47326a8e 100644
--- a/sipsimple/payloads/pidf.py
+++ b/sipsimple/payloads/pidf.py
@@ -1,530 +1,530 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""PIDF handling according to RFC3863 and RFC4479"""
__all__ = ['pidf_namespace',
'dm_namespace',
'PIDFDocument',
'ServiceExtension',
'DeviceExtension',
'PersonExtension',
'StatusExtension',
'Note',
'DeviceID',
'Status',
'Basic',
'Contact',
'ServiceTimestamp',
'Service',
'DeviceTimestamp',
'Device',
'PersonTimestamp',
'Person',
'PIDF',
# Extensions
'ExtendedStatus',
'DeviceInfo']
from itertools import izip
from application.python.weakref import weakobjectmap
from sipsimple.payloads import ValidationError, XMLDocument, XMLListRootElement, XMLListElement, XMLElement, XMLAttribute, XMLElementID, XMLElementChild
from sipsimple.payloads import XMLStringElement, XMLLocalizedStringElement, XMLDateTimeElement, XMLAnyURIElement
from sipsimple.payloads.datatypes import AnyURI, ID, DateTime
pidf_namespace = 'urn:ietf:params:xml:ns:pidf'
dm_namespace = 'urn:ietf:params:xml:ns:pidf:data-model'
class PIDFDocument(XMLDocument):
content_type = 'application/pidf+xml'
PIDFDocument.register_namespace(pidf_namespace, prefix=None, schema='pidf.xsd')
PIDFDocument.register_namespace(dm_namespace, prefix='dm', schema='data-model.xsd')
## Marker mixin
class ServiceExtension(object): pass
class ServiceItemExtension(object): pass
class DeviceExtension(object): pass
class PersonExtension(object): pass
class StatusExtension(object): pass
## Attribute value types
class BasicStatusValue(str):
def __new__(cls, value):
if value not in ('closed', 'open'):
raise ValueError('illegal BasicStatusValue')
return str.__new__(cls, value)
## General elements
class Note(unicode):
def __new__(cls, value, lang=None):
instance = unicode.__new__(cls, value)
instance.lang = lang
return instance
def __repr__(self):
return "%s(%s, lang=%r)" % (self.__class__.__name__, unicode.__repr__(self), self.lang)
def __eq__(self, other):
if isinstance(other, Note):
return unicode.__eq__(self, other) and self.lang == other.lang
elif isinstance(other, basestring):
return self.lang is None and unicode.__eq__(self, other)
else:
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
class PIDFNote(XMLLocalizedStringElement):
_xml_tag = 'note'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
def __unicode__(self):
return Note(self.value, self.lang)
class DMNote(XMLLocalizedStringElement):
_xml_tag = 'note'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
def __unicode__(self):
return Note(self.value, self.lang)
class NoteMap(object):
"""Descriptor to be used for _note_map attributes on XML elements with notes"""
def __init__(self):
self.object_map = weakobjectmap()
def __get__(self, obj, type):
if obj is None:
return self
try:
return self.object_map[obj]
except KeyError:
return self.object_map.setdefault(obj, {})
def __set__(self, obj, value):
raise AttributeError("cannot set attribute")
def __delete__(self, obj):
raise AttributeError("cannot delete attribute")
class NoteList(object):
def __init__(self, xml_element, note_type):
self.xml_element = xml_element
self.note_type = note_type
def __contains__(self, item):
if isinstance(item, Note):
item = self.note_type(item, item.lang)
elif isinstance(item, basestring):
item = self.note_type(item)
return item in self.xml_element._note_map.itervalues()
def __iter__(self):
return (unicode(self.xml_element._note_map[element]) for element in self.xml_element.element if element in self.xml_element._note_map)
def __len__(self):
return len(self.xml_element._note_map)
def __eq__(self, other):
if isinstance(other, NoteList):
return self is other or (len(self) == len(other) and all(self_item == other_item for self_item, other_item in izip(self, other)))
else:
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def _parse_element(self, element):
self.xml_element._note_map.clear()
for child in element:
if child.tag == self.note_type.qname:
try:
note = self.note_type.from_element(child, xml_document=self.xml_element._xml_document)
except ValidationError:
pass
else:
self.xml_element._note_map[note.element] = note
def _build_element(self):
for note in self.xml_element._note_map.itervalues():
note.to_element()
def add(self, item):
if isinstance(item, Note):
item = self.note_type(item, item.lang)
elif isinstance(item, basestring):
item = self.note_type(item)
if type(item) is not self.note_type:
raise TypeError("%s cannot add notes of type %s" % (self.xml_element.__class__.__name__, item.__class__.__name__))
self.xml_element._insert_element(item.element)
self.xml_element._note_map[item.element] = item
self.xml_element.__dirty__ = True
def remove(self, item):
if isinstance(item, Note):
try:
item = (entry for entry in self.xml_element._note_map.itervalues() if unicode(entry) == item).next()
except StopIteration:
raise KeyError(item)
elif isinstance(item, basestring):
try:
item = (entry for entry in self.xml_element._note_map.itervalues() if entry == item).next()
except StopIteration:
raise KeyError(item)
if type(item) is not self.note_type:
raise KeyError(item)
self.xml_element.element.remove(item.element)
del self.xml_element._note_map[item.element]
self.xml_element.__dirty__ = True
def update(self, sequence):
for item in sequence:
self.add(item)
def clear(self):
for item in self.xml_element._note_map.values():
self.remove(item)
class DeviceID(XMLStringElement):
_xml_tag = 'deviceID'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
## Service elements
class Basic(XMLStringElement):
_xml_tag = 'basic'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_value_type = BasicStatusValue
class Status(XMLElement):
_xml_tag = 'status'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_extension_type = StatusExtension
_xml_children_order = {Basic.qname: 0}
basic = XMLElementChild('basic', type=Basic, required=False, test_equal=True)
def __init__(self, basic=None):
XMLElement.__init__(self)
self.basic = basic
def check_validity(self):
if len(self.element) == 0:
raise ValidationError("Status objects must have at least one child")
super(Status, self).check_validity()
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.basic)
class Contact(XMLAnyURIElement):
_xml_tag = 'contact'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
priority = XMLAttribute('priority', type=float, required=False, test_equal=False)
class ServiceTimestamp(XMLDateTimeElement):
_xml_tag = 'timestamp'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
class Service(XMLListElement):
_xml_tag = 'tuple'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_extension_type = ServiceExtension
_xml_item_type = (DeviceID, ServiceItemExtension)
_xml_children_order = {Status.qname: 0,
None: 1,
Contact.qname: 2,
PIDFNote.qname: 3,
ServiceTimestamp.qname: 4}
id = XMLElementID('id', type=ID, required=True, test_equal=True)
status = XMLElementChild('status', type=Status, required=True, test_equal=True)
contact = XMLElementChild('contact', type=Contact, required=False, test_equal=True)
timestamp = XMLElementChild('timestamp', type=ServiceTimestamp, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id, notes=[], status=None, contact=None, timestamp=None):
XMLListElement.__init__(self)
self.id = id
self.status = status
self.contact = contact
self.timestamp = timestamp
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, PIDFNote)
def __eq__(self, other):
if isinstance(other, Service):
return super(Service, self).__eq__(other) and self.notes == other.notes
else:
return self.id == other
def __repr__(self):
return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.status, self.contact, self.timestamp)
def _parse_element(self, element):
super(Service, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Service, self)._build_element()
self.notes._build_element()
class DeviceTimestamp(XMLDateTimeElement):
_xml_tag = 'timestamp'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
class Device(XMLElement):
_xml_tag = 'device'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
_xml_extension_type = DeviceExtension
_xml_children_order = {None: 0,
DeviceID.qname: 1,
DMNote.qname: 2,
DeviceTimestamp.qname: 3}
id = XMLElementID('id', type=ID, required=True, test_equal=True)
device_id = XMLElementChild('device_id', type=DeviceID, required=False, test_equal=True)
timestamp = XMLElementChild('timestamp', type=DeviceTimestamp, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id, device_id=None, notes=[], timestamp=None):
XMLElement.__init__(self)
self.id = id
self.device_id = device_id
self.timestamp = timestamp
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, DMNote)
def __eq__(self, other):
if isinstance(other, Device):
return super(Device, self).__eq__(other) and self.notes == other.notes
else:
return self.id == other
def __repr__(self):
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.device_id, list(self.notes), self.timestamp)
def _parse_element(self, element):
super(Device, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Device, self)._build_element()
self.notes._build_element()
class PersonTimestamp(XMLDateTimeElement):
_xml_tag = 'timestamp'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
class Person(XMLElement):
_xml_tag = 'person'
_xml_namespace = dm_namespace
_xml_document = PIDFDocument
_xml_extension_type = PersonExtension
_xml_children_order = {None: 0,
DMNote.qname: 1,
PersonTimestamp.qname: 2}
id = XMLElementID('id', type=ID, required=True, test_equal=True)
timestamp = XMLElementChild('timestamp', type=PersonTimestamp, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id, notes=[], timestamp=None):
XMLElement.__init__(self)
self.id = id
self.timestamp = timestamp
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, DMNote)
def __eq__(self, other):
if isinstance(other, Person):
return super(Person, self).__eq__(other) and self.notes == other.notes
else:
return self.id == other
def __repr__(self):
return '%s(%r, %r, %r)' % (self.__class__.__name__, self.id, list(self.notes), self.timestamp)
def _parse_element(self, element):
super(Person, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Person, self)._build_element()
self.notes._build_element()
class PIDF(XMLListRootElement):
_xml_tag = 'presence'
_xml_namespace = pidf_namespace
_xml_document = PIDFDocument
_xml_children_order = {Service.qname: 0,
PIDFNote.qname: 1,
Person.qname: 2,
Device.qname: 3}
_xml_item_type = (Service, PIDFNote, Person, Device)
entity = XMLAttribute('entity', type=AnyURI, required=True, test_equal=True)
services = property(lambda self: (item for item in self if type(item) is Service))
notes = property(lambda self: (item for item in self if type(item) is Note))
persons = property(lambda self: (item for item in self if type(item) is Person))
devices = property(lambda self: (item for item in self if type(item) is Device))
def __init__(self, entity, elements=[]):
XMLListRootElement.__init__(self)
self.entity = entity
self.update(elements)
def __contains__(self, item):
if isinstance(item, Note):
item = PIDFNote(item, item.lang)
return super(PIDF, self).__contains__(item)
def __iter__(self):
return (unicode(item) if type(item) is PIDFNote else item for item in super(PIDF, self).__iter__())
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.entity, list(self))
def add(self, item):
if isinstance(item, Note):
item = PIDFNote(item, item.lang)
super(PIDF, self).add(item)
def remove(self, item):
if isinstance(item, Note):
try:
item = (entry for entry in super(PIDF, self).__iter__() if type(entry) is PIDFNote and unicode(entry) == item).next()
except StopIteration:
raise KeyError(item)
super(PIDF, self).remove(item)
#
# Extensions
#
agp_pidf_namespace = 'urn:ag-projects:xml:ns:pidf'
PIDFDocument.register_namespace(agp_pidf_namespace, prefix='agp-pidf')
class ExtendedStatusValue(str):
def __new__(cls, value):
if value not in ('available', 'offline', 'away', 'extended-away', 'busy'):
raise ValueError("illegal value for extended status")
return str.__new__(cls, value)
class ExtendedStatus(XMLStringElement, StatusExtension):
_xml_tag = 'extended'
_xml_namespace = agp_pidf_namespace
_xml_document = PIDFDocument
_xml_value_type = ExtendedStatusValue
Status.register_extension('extended', type=ExtendedStatus)
class Description(XMLStringElement):
_xml_tag = 'description'
_xml_namespace = agp_pidf_namespace
_xml_document = PIDFDocument
class UserAgent(XMLStringElement):
_xml_tag = 'user-agent'
_xml_namespace = agp_pidf_namespace
_xml_document = PIDFDocument
class TimeOffset(XMLStringElement):
_xml_tag = 'time-offset'
_xml_namespace = agp_pidf_namespace
_xml_document = PIDFDocument
description = XMLAttribute('description', type=unicode, required=False, test_equal=True)
def __init__(self, value=None, description=None):
if value is None:
- value = DateTime.utc_offset()
+ value = DateTime.now().utcoffset().seconds / 60
XMLStringElement.__init__(self, str(value))
self.description = description
def __int__(self):
return int(self.value)
class DeviceInfo(XMLElement, ServiceExtension):
_xml_tag = 'device-info'
_xml_namespace = agp_pidf_namespace
_xml_document = PIDFDocument
_xml_children_order = {Description.qname: 0,
UserAgent.qname: 1}
id = XMLElementID('id', type=str, required=True, test_equal=True)
description = XMLElementChild('description', type=Description, required=False, test_equal=True)
user_agent = XMLElementChild('user_agent', type=UserAgent, required=False, test_equal=True)
time_offset = XMLElementChild('time_offset', type=TimeOffset, required=False, test_equal=True)
def __init__(self, id, description=None, user_agent=None, time_offset=None):
XMLElement.__init__(self)
self.id = id
self.description = description
self.user_agent = user_agent
self.time_offset = time_offset
def __repr__(self):
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.description, self.user_agent, self.time_offset)
Service.register_extension('device_info', type=DeviceInfo)
diff --git a/sipsimple/payloads/rpid.py b/sipsimple/payloads/rpid.py
index 5fd42f2c..905a3747 100644
--- a/sipsimple/payloads/rpid.py
+++ b/sipsimple/payloads/rpid.py
@@ -1,714 +1,714 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
RPID handling according to RFC4480
This module provides an extension to PIDF to support rich presence.
"""
__all__ = ['namespace',
'ActivityElement',
'MoodElement',
'PlaceTypeElement',
'PrivacyElement',
'RelationshipElement',
'ServiceClassElement',
'SphereElement',
'Note',
'Other',
'Activities',
'Mood',
'PlaceIs',
'AudioPlaceInformation',
'VideoPlaceInformation',
'TextPlaceInformation',
'PlaceType',
'AudioPrivacy',
'TextPrivacy',
'VideoPrivacy',
'Privacy',
'Relationship',
'ServiceClass',
'Sphere',
'StatusIcon',
'TimeOffset',
'UserInput',
'Class']
from lxml import etree
from sipsimple.payloads import ValidationError, XMLElementType, XMLEmptyElementRegistryType, XMLAttribute, XMLElementChild, XMLStringChoiceChild
from sipsimple.payloads import XMLElement, XMLEmptyElement, XMLStringElement, XMLLocalizedStringElement, XMLStringListElement
from sipsimple.payloads.pidf import PIDFDocument, ServiceExtension, PersonExtension, DeviceExtension, Note, NoteMap, NoteList, Service, Person, Device
from sipsimple.payloads.datatypes import UnsignedLong, DateTime, ID
namespace = 'urn:ietf:params:xml:ns:pidf:rpid'
PIDFDocument.register_namespace(namespace, prefix='rpid', schema='rpid.xsd')
## Marker mixins
class ActivityElement(object): pass
class MoodElement(object): pass
class PlaceTypeElement(object): pass
class PrivacyElement(object): pass
class RelationshipElement(object): pass
class ServiceClassElement(object): pass
class SphereElement(object): pass
## Attribute value types
class AudioPlaceValue(str):
def __new__(cls, value):
if value not in ('noisy', 'ok', 'quiet', 'unknown'):
raise ValueError("illegal value for audio place-is")
return str.__new__(cls, value)
class VideoPlaceValue(str):
def __new__(cls, value):
if value not in ('toobright', 'ok', 'dark', 'unknown'):
raise ValueError("illegal value for video place-is")
return str.__new__(cls, value)
class TextPlaceValue(str):
def __new__(cls, value):
if value not in ('uncomfortable', 'inappropriate', 'ok', 'unknown'):
raise ValueError("illegal value for text place-is")
return str.__new__(cls, value)
class UserInputValue(str):
def __new__(cls, value):
if value not in ('active', 'idle'):
raise ValueError("illegal value for user-input")
return str.__new__(cls, value)
## Elements
class RPIDNote(XMLLocalizedStringElement):
_xml_tag = 'note'
_xml_namespace = namespace
_xml_document = PIDFDocument
def __unicode__(self):
return Note(self.value, self.lang)
@classmethod
def from_string(cls, value):
if isinstance(value, Note):
return cls(value, value.lang)
elif isinstance(value, basestring):
return cls(value)
else:
raise ValueError("expected str/unicode instance, got %s instead" % value.__class__.__name__)
class RPIDOther(XMLLocalizedStringElement):
_xml_tag = 'other'
_xml_namespace = namespace
_xml_document = PIDFDocument
def __unicode__(self):
return Other(self.value, self.lang)
@classmethod
def from_string(cls, value):
if isinstance(value, Other):
return cls(value, value.lang)
elif isinstance(value, basestring):
return cls(value)
else:
raise ValueError("expected str/unicode instance, got %s instead" % value.__class__.__name__)
class Other(Note): pass
class ActivityRegistry(object):
__metaclass__ = XMLEmptyElementRegistryType
_xml_namespace = namespace
_xml_document = PIDFDocument
names = ('appointment', 'away', 'breakfast', 'busy', 'dinner',
'holiday', 'in-transit', 'looking-for-work', 'meal', 'meeting',
'on-the-phone', 'performance', 'permanent-absence', 'playing',
'presentation', 'shopping', 'sleeping', 'spectator', 'steering',
'travel', 'tv', 'vacation', 'working', 'worship', 'unknown')
class Activities(XMLStringListElement, PersonExtension):
_xml_tag = 'activities'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_children_order = {RPIDNote.qname: 0}
_xml_item_registry = ActivityRegistry
_xml_item_other_type = RPIDOther
_xml_item_extension_type = ActivityElement
id = XMLAttribute('id', type=str, required=False, test_equal=True)
since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True)
until = XMLAttribute('until', type=DateTime, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id=None, since=None, until=None, activities=[], notes=[]):
XMLElement.__init__(self)
self.id = id
self.since = since
self.until = until
self.update(activities)
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, RPIDNote)
def __eq__(self, other):
if isinstance(other, Activities):
return super(Activities, self).__eq__(other) and self.notes == other.notes
else:
return NotImplemented
def __repr__(self):
return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, list(self), list(self.notes))
def _parse_element(self, element):
super(Activities, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Activities, self)._build_element()
self.notes._build_element()
def add(self, activity):
if isinstance(activity, basestring):
if activity in self._xml_item_registry.names:
activity = self._xml_item_registry.class_map[activity]()
else:
activity = self._xml_item_other_type.from_string(activity)
unknown_activity = self._xml_item_registry.class_map['unknown']()
if activity == unknown_activity or unknown_activity in self._element_map.itervalues():
self.clear()
super(Activities, self).add(activity)
def check_validity(self):
if not self:
raise ValidationError("Activity element must have at least one value")
super(Activities, self).check_validity()
Person.register_extension('activities', type=Activities)
class MoodRegistry(object):
__metaclass__ = XMLEmptyElementRegistryType
_xml_namespace = namespace
_xml_document = PIDFDocument
names = ('afraid', 'amazed', 'angry', 'annoyed', 'anxious', 'ashamed',
'bored', 'brave', 'calm', 'cold', 'confused', 'contended',
'cranky', 'curious', 'depressed', 'disappointed', 'disgusted',
'distracted', 'embarrassed', 'excited', 'flirtatious',
'frustrated', 'grumpy', 'guilty', 'happy', 'hot', 'humbled',
'humiliated', 'hungry', 'hurt', 'impressed', 'in_awe', 'in_love',
'indignant', 'interested', 'invisible', 'jealous', 'lonely',
'mean', 'moody', 'nervous', 'neutral', 'offended', 'playful',
'proud', 'relieved', 'remorseful', 'restless', 'sad',
'sarcastic', 'serious', 'shocked', 'shy', 'sick', 'sleepy',
'stressed', 'surprised', 'thirsty', 'worried', 'unknown')
class Mood(XMLStringListElement, PersonExtension):
_xml_tag = 'mood'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_extension_type = MoodElement
_xml_children_order = {RPIDNote.qname: 0}
_xml_item_registry = MoodRegistry
_xml_item_other_type = RPIDOther
_xml_item_extension_type = MoodElement
id = XMLAttribute('id', type=str, required=False, test_equal=True)
since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True)
until = XMLAttribute('until', type=DateTime, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id=None, since=None, until=None, moods=[], notes=[]):
XMLElement.__init__(self)
self.id = id
self.since = since
self.until = until
self.update(moods)
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, RPIDNote)
def __eq__(self, other):
if isinstance(other, Mood):
return super(Mood, self).__eq__(other) and self.notes == other.notes
else:
return NotImplemented
def __repr__(self):
return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, list(self), list(self.notes))
def _parse_element(self, element):
super(Mood, self)._parse_element(element)
self.notes._parse_element(element)
def _build_element(self):
super(Mood, self)._build_element()
self.notes._build_element()
def add(self, mood):
if isinstance(mood, basestring):
if mood in self._xml_item_registry.names:
mood = self._xml_item_registry.class_map[mood]()
else:
mood = self._xml_item_other_type.from_string(mood)
unknown_mood = self._xml_item_registry.class_map['unknown']()
if mood == unknown_mood or unknown_mood in self._element_map.itervalues():
self.clear()
super(Mood, self).add(mood)
def check_validity(self):
if not self:
raise ValidationError("Mood element must have at least one value")
super(Mood, self).check_validity()
Person.register_extension('mood', type=Mood)
class AudioPlaceInformation(XMLStringElement):
_xml_tag = 'audio'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_value_type = AudioPlaceValue
class VideoPlaceInformation(XMLStringElement):
_xml_tag = 'video'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_value_type = VideoPlaceValue
class TextPlaceInformation(XMLStringElement):
_xml_tag = 'text'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_value_type = TextPlaceValue
class PlaceIs(XMLElement, PersonExtension):
_xml_tag = 'place-is'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_children_order = {RPIDNote.qname: 0,
AudioPlaceInformation.qname: 1,
VideoPlaceInformation.qname: 2,
TextPlaceInformation.qname: 3}
id = XMLAttribute('id', type=str, required=False, test_equal=True)
since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True)
until = XMLAttribute('until', type=DateTime, required=False, test_equal=True)
audio = XMLElementChild('audio', type=AudioPlaceInformation, required=False, test_equal=True)
video = XMLElementChild('video', type=VideoPlaceInformation, required=False, test_equal=True)
text = XMLElementChild('text', type=TextPlaceInformation, required=False, test_equal=True)
_note_map = NoteMap()
def __init__(self, id=None, since=None, until=None, audio=None, video=None, text=None, notes=[]):
XMLElement.__init__(self)
self.id = id
self.since = since
self.until = until
self.audio = audio
self.video = video
self.text = text
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, RPIDNote)
def __eq__(self, other):
if isinstance(other, PlaceIs):
return super(PlaceIs, self).__eq__(other) and self.notes == other.notes
else:
return NotImplemented
def __repr__(self):
return '%s(%r, %r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, self.audio, self.video, self.text, list(self.notes))
def _parse_element(self, element):
self.notes._parse_element(element)
def _build_element(self):
self.notes._build_element()
Person.register_extension('place_is', type=PlaceIs)
class PlaceType(XMLElement, PersonExtension):
_xml_tag = 'place-type'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_children_order = {RPIDNote.qname: 0}
id = XMLAttribute('id', type=str, required=False, test_equal=True)
since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True)
until = XMLAttribute('until', type=DateTime, required=False, test_equal=True)
value = XMLStringChoiceChild('value', other_type=RPIDOther, extension_type=PlaceTypeElement)
_note_map = NoteMap()
def __init__(self, id=None, since=None, until=None, placetype=None, notes=[]):
super(PlaceType, self).__init__()
self.id = id
self.since = since
self.until = until
self.value = placetype
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, RPIDNote)
def __eq__(self, other):
if isinstance(other, PlaceType):
return super(PlaceType, self).__eq__(other) and self.notes == other.notes
else:
return NotImplemented
def __repr__(self):
return '%s(%r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, self.value, list(self.notes))
def _parse_element(self, element):
self.notes._parse_element(element)
def _build_element(self):
self.notes._build_element()
Person.register_extension('place_type', type=PlaceType)
class AudioPrivacy(XMLEmptyElement):
_xml_tag = 'audio'
_xml_namespace = namespace
_xml_document = PIDFDocument
def __init__(self, private=True):
XMLEmptyElement.__init__(self)
def __new__(cls, private=True):
if not private:
return None
return XMLEmptyElement.__new__(cls)
class TextPrivacy(XMLEmptyElement):
_xml_tag = 'text'
_xml_namespace = namespace
_xml_document = PIDFDocument
def __init__(self, private=True):
XMLEmptyElement.__init__(self)
def __new__(cls, private=True):
if not private:
return None
return XMLEmptyElement.__new__(cls)
class VideoPrivacy(XMLEmptyElement):
_xml_tag = 'video'
_xml_namespace = namespace
_xml_document = PIDFDocument
def __init__(self, private=True):
XMLEmptyElement.__init__(self)
def __new__(cls, private=True):
if not private:
return None
return XMLEmptyElement.__new__(cls)
class PrivacyType(XMLElementType):
def __init__(cls, name, bases, dct):
super(PrivacyType, cls).__init__(name, bases, dct)
child_attributes = (getattr(cls, name) for name in dir(cls) if type(getattr(cls, name)) is XMLElementChild)
cls._privacy_attributes = tuple(attr.name for attr in child_attributes if attr.name in ('audio', 'text', 'video') or issubclass(attr.type, PrivacyElement))
class Privacy(XMLElement, PersonExtension):
__metaclass__ = PrivacyType
_xml_tag = 'privacy'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_children_order = {RPIDNote.qname: 0,
AudioPrivacy.qname: 1,
TextPrivacy.qname: 2,
VideoPrivacy.qname: 3}
id = XMLAttribute('id', type=str, required=False, test_equal=True)
since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True)
until = XMLAttribute('until', type=DateTime, required=False, test_equal=True)
audio = XMLElementChild('audio', type=AudioPrivacy, required=False, test_equal=True)
text = XMLElementChild('text', type=TextPrivacy, required=False, test_equal=True)
video = XMLElementChild('video', type=VideoPrivacy, required=False, test_equal=True)
unknown = property(lambda self: all(getattr(self, name) is None for name in self._privacy_attributes))
_note_map = NoteMap()
def __init__(self, id=None, since=None, until=None, notes=[], audio=False, text=False, video=False):
super(Privacy, self).__init__()
self.id = id
self.since = since
self.until = until
self.audio = audio
self.text = text
self.video = video
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, RPIDNote)
def __eq__(self, other):
if isinstance(other, Privacy):
return super(Privacy, self).__eq__(other) and self.notes == other.notes
else:
return NotImplemented
def __repr__(self):
return '%s(%r, %r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.since, self.until, list(self.notes), self.audio, self.text, self.video)
def _parse_element(self, element):
self.notes._parse_element(element)
def _build_element(self):
if self.unknown:
if self.element.find('{%s}unknown' % self._xml_namespace) is None:
etree.SubElement(self.element, '{%s}unknown' % self._xml_namespace, nsmap=self._xml_document.nsmap)
else:
unknown_element = self.element.find('{%s}unknown' % self._xml_namespace)
if unknown_element is not None:
self.element.remove(unknown_element)
self.notes._build_element()
Person.register_extension('privacy', type=Privacy)
class RelationshipRegistry(object):
__metaclass__ = XMLEmptyElementRegistryType
_xml_namespace = namespace
_xml_document = PIDFDocument
names = ('assistant', 'associate', 'family', 'friend', 'self', 'supervisor', 'unknown')
class Relationship(XMLElement, ServiceExtension):
_xml_tag = 'relationship'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_children_order = {RPIDNote: 0}
value = XMLStringChoiceChild('value', registry=RelationshipRegistry, other_type=RPIDOther, extension_type=RelationshipElement)
_note_map = NoteMap()
def __init__(self, relationship='self', notes=[]):
XMLElement.__init__(self)
self.value = relationship
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, RPIDNote)
def __eq__(self, other):
if isinstance(other, Relationship):
return super(Relationship, self).__eq__(other) and self.notes == other.notes
else:
return NotImplemented
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.value, list(self.notes))
def _parse_element(self, element):
self.notes._parse_element(element)
def _build_element(self):
self.notes._build_element()
Service.register_extension('relationship', type=Relationship)
class ServiceClassRegistry(object):
__metaclass__ = XMLEmptyElementRegistryType
_xml_namespace = namespace
_xml_document = PIDFDocument
names = ('courier', 'electronic', 'freight', 'in-person', 'postal', 'unknown')
class ServiceClass(XMLElement, ServiceExtension):
_xml_tag = 'service-class'
_xml_namespace = namespace
_xml_document = PIDFDocument
value = XMLStringChoiceChild('value', registry=ServiceClassRegistry, extension_type=ServiceClassElement)
_note_map = NoteMap()
def __init__(self, service_class=None, notes=[]):
XMLElement.__init__(self)
self.value = service_class
self.notes.update(notes)
@property
def notes(self):
return NoteList(self, RPIDNote)
def __eq__(self, other):
if isinstance(other, ServiceClass):
return super(ServiceClass, self).__eq__(other) and self.notes == other.notes
else:
return NotImplemented
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.value, list(self.notes))
def _parse_element(self, element):
self.notes._parse_element(element)
def _build_element(self):
self.notes._build_element()
Service.register_extension('service_class', type=ServiceClass)
class SphereRegistry(object):
__metaclass__ = XMLEmptyElementRegistryType
_xml_namespace = namespace
_xml_document = PIDFDocument
names = ('home', 'work', 'unknown')
class Sphere(XMLElement, PersonExtension):
_xml_tag = 'sphere'
_xml_namespace = namespace
_xml_document = PIDFDocument
id = XMLAttribute('id', type=ID, required=False, test_equal=True)
since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True)
until = XMLAttribute('until', type=DateTime, required=False, test_equal=True)
value = XMLStringChoiceChild('value', registry=SphereRegistry, extension_type=SphereElement)
def __init__(self, value=None, id=None, since=None, until=None):
XMLElement.__init__(self)
self.id = id
self.since = since
self.until = until
self.value = value
def __repr__(self):
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.value, self.id, self.since, self.until)
Person.register_extension('sphere', type=Sphere)
class StatusIcon(XMLStringElement, ServiceExtension, PersonExtension):
_xml_tag = 'status-icon'
_xml_namespace = namespace
_xml_document = PIDFDocument
id = XMLAttribute('id', type=str, required=False, test_equal=True)
since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True)
until = XMLAttribute('until', type=DateTime, required=False, test_equal=True)
def __init__(self, value=None, id=None, since=None, until=None):
XMLStringElement.__init__(self, value)
self.id = id
self.since = since
self.until = until
Person.register_extension('status_icon', type=StatusIcon)
Service.register_extension('status_icon', type=StatusIcon)
class TimeOffset(XMLStringElement, PersonExtension):
_xml_tag = 'time-offset'
_xml_namespace = namespace
_xml_document = PIDFDocument
id = XMLAttribute('id', type=str, required=False, test_equal=True)
since = XMLAttribute('since', xmlname='from', type=DateTime, required=False, test_equal=True)
until = XMLAttribute('until', type=DateTime, required=False, test_equal=True)
description = XMLAttribute('description', type=str, required=False, test_equal=True)
def __init__(self, value=None, id=None, since=None, until=None, description=None):
if value is None:
- value = DateTime.utc_offset()
+ value = DateTime.now().utcoffset().seconds / 60
XMLStringElement.__init__(self, str(value))
self.id = id
self.since = since
self.until = until
self.description = description
def __int__(self):
return int(self.value)
Person.register_extension('time_offset', type=TimeOffset)
class UserInput(XMLStringElement, ServiceExtension, PersonExtension, DeviceExtension):
_xml_tag = 'user-input'
_xml_namespace = namespace
_xml_document = PIDFDocument
_xml_value_type = UserInputValue
id = XMLAttribute('id', type=str, required=False, test_equal=True)
last_input = XMLAttribute('last_input', xmlname='last-input', type=DateTime, required=False, test_equal=True)
idle_threshold = XMLAttribute('idle_threshold', xmlname='idle-threshold', type=UnsignedLong, required=False, test_equal=True)
def __init__(self, value='active', id=None, last_input=None, idle_threshold=None):
XMLStringElement.__init__(self, value)
self.id = id
self.last_input = last_input
self.idle_threshold = idle_threshold
Service.register_extension('user_input', type=UserInput)
Person.register_extension('user_input', type=UserInput)
Device.register_extension('user_input', type=UserInput)
class Class(XMLStringElement, ServiceExtension, PersonExtension, DeviceExtension):
_xml_tag = 'class'
_xml_namespace = namespace
_xml_document = PIDFDocument
Service.register_extension('rpid_class', type=Class)
Person.register_extension('rpid_class', type=Class)
Device.register_extension('rpid_class', type=Class)
diff --git a/sipsimple/streams/applications/chat.py b/sipsimple/streams/applications/chat.py
index ce1b088f..996c5d87 100644
--- a/sipsimple/streams/applications/chat.py
+++ b/sipsimple/streams/applications/chat.py
@@ -1,252 +1,252 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""Chat related objects, including CPIM support as defined in RFC3862"""
__all__ = ['ChatIdentity', 'ChatMessage', 'CPIMParserError', 'CPIMIdentity', 'CPIMHeader', 'CPIMMessage']
import codecs
import re
from email.message import Message
from email.parser import Parser
from types import NoneType
from sipsimple.core import SIPURI, BaseSIPURI
-from sipsimple.util import MultilingualText, Timestamp
+from sipsimple.util import MultilingualText, ISOTimestamp
class ChatIdentity(object):
def __init__(self, uri, display_name=None):
self.uri = uri
self.display_name = display_name
def __eq__(self, other):
if isinstance(other, ChatIdentity):
return self.uri.user == other.uri.user and self.uri.host == other.uri.host
elif isinstance(other, BaseSIPURI):
return self.uri.user == other.user and self.uri.host == other.host
elif isinstance(other, basestring):
try:
other_uri = SIPURI.parse(other)
except Exception:
return False
else:
return self.uri.user == other_uri.user and self.uri.host == other_uri.host
else:
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def __unicode__(self):
if self.display_name:
return u'%s <%s>' % (self.display_name, self.uri)
else:
return u'<%s>' % self.uri
class ChatMessage(object):
def __init__(self, body, content_type, sender=None, recipient=None, timestamp=None):
self.body = body
self.content_type = content_type
self.sender = sender
self.recipients = [recipient] if recipient is not None else []
self.courtesy_recipients = []
self.subject = None
self.timestamp = timestamp
self.required = []
self.additional_headers = []
## CPIM support
class CPIMParserError(Exception): pass
class CPIMCodec(codecs.Codec):
character_map = dict((c, u'\\u%04x' % c) for c in range(32) + [127])
character_map[ord(u'\\')] = u'\\\\'
@classmethod
def encode(cls, input, errors='strict'):
return input.translate(cls.character_map).encode('utf-8', errors), len(input)
@classmethod
def decode(cls, input, errors='strict'):
return input.decode('utf-8', errors).encode('raw-unicode-escape', errors).decode('unicode-escape', errors), len(input)
def cpim_codec_search(name):
if name.lower() in ('cpim-headers', 'cpim_headers'):
return codecs.CodecInfo(name='CPIM-headers',
encode=CPIMCodec.encode,
decode=CPIMCodec.decode,
incrementalencoder=codecs.IncrementalEncoder,
incrementaldecoder=codecs.IncrementalDecoder,
streamwriter=codecs.StreamWriter,
streamreader=codecs.StreamReader)
codecs.register(cpim_codec_search)
del cpim_codec_search
class Namespace(unicode):
def __new__(cls, value, prefix=''):
obj = unicode.__new__(cls, value)
obj.prefix = prefix
return obj
class CPIMHeader(object):
def __init__(self, name, namespace, value):
self.name = name
self.namespace = namespace
self.value = value
class CPIMIdentity(ChatIdentity):
_re_format = re.compile(r'^("?(?P<display_name>[^<]*[^"\s])"?)?\s*<(?P<uri>sips?:.+)>$')
@classmethod
def parse(cls, value):
if isinstance(value, str):
value = value.decode('cpim-headers')
match = cls._re_format.match(value)
if not match:
raise ValueError('Cannot parse message/cpim identity header value: %r' % value)
groupdict = match.groupdict()
display_name = groupdict['display_name']
uri = groupdict['uri']
uri = SIPURI.parse(str(uri)) # FIXME: SIPURI is not unicode friendly and expects a str. -Luci
return cls(uri, display_name)
class CPIMMessage(ChatMessage):
standard_namespace = u'urn:ietf:params:cpim-headers:'
headers_re = re.compile(r'(?:([^:]+?)\.)?(.+?):\s*(.+?)\r\n')
subject_re = re.compile(r'^(?:;lang=([a-z]{1,8}(?:-[a-z0-9]{1,8})*)\s+)?(.*)$')
namespace_re = re.compile(r'^(?:(\S+) ?)?<(.*)>$')
def __init__(self, body, content_type, sender=None, recipients=None, courtesy_recipients=None,
subject=None, timestamp=None, required=None, additional_headers=None):
self.body = body
self.content_type = content_type
self.sender = sender
self.recipients = recipients if recipients is not None else []
self.courtesy_recipients = courtesy_recipients if courtesy_recipients is not None else []
self.subject = subject if isinstance(subject, (MultilingualText, NoneType)) else MultilingualText(subject)
- self.timestamp = timestamp
+ self.timestamp = ISOTimestamp(timestamp) if timestamp is not None else None
self.required = required if required is not None else []
self.additional_headers = additional_headers if additional_headers is not None else []
def __str__(self):
headers = []
if self.sender:
headers.append(u'From: %s' % self.sender)
for recipient in self.recipients:
headers.append(u'To: %s' % recipient)
for recipient in self.courtesy_recipients:
headers.append(u'cc: %s' % recipient)
if self.subject:
headers.append(u'Subject: %s' % self.subject)
if self.subject is not None:
for lang, translation in self.subject.translations.iteritems():
headers.append(u'Subject:;lang=%s %s' % (lang, translation))
if self.timestamp:
- headers.append(u'DateTime: %s' % Timestamp.format(self.timestamp))
+ headers.append(u'DateTime: %s' % self.timestamp)
if self.required:
headers.append(u'Required: %s' % ','.join(self.required))
namespaces = {u'': self.standard_namespace}
for header in self.additional_headers:
if namespaces.get(header.namespace.prefix, None) != header.namespace:
if header.namespace.prefix:
headers.append(u'NS: %s <%s>' % (header.namespace.prefix, header.namespace))
else:
headers.append(u'NS: <%s>' % header.namespace)
namespaces[header.namespace.prefix] = header.namespace
if header.namespace.prefix:
headers.append(u'%s.%s: %s' % (header.namespace.prefix, header.name, header.value))
else:
headers.append(u'%s: %s' % (header.name, header.value))
headers.append(u'')
headers = '\r\n'.join(s.encode('cpim-headers') for s in headers)
message = Message()
message.set_type(self.content_type)
if isinstance(self.body, unicode):
message.set_param('charset', 'utf-8')
message.set_payload(self.body.encode('utf-8'))
else:
message.set_payload(self.body)
return headers + '\r\n' + message.as_string()
@classmethod
def parse(cls, string):
message = cls('', None)
try:
headers_end = string.index('\r\n\r\n')
except ValueError:
raise CPIMParserError('Invalid CPIM message')
else:
headers = cls.headers_re.findall(buffer(string, 0, headers_end+2))
body = buffer(string, headers_end+4)
namespaces = {u'': Namespace(cls.standard_namespace, u'')}
subjects = {}
for prefix, name, value in headers:
if '.' in name:
continue
namespace = namespaces.get(prefix)
if not namespace:
continue
try:
value = value.decode('cpim-headers')
if name == 'From' and namespace == cls.standard_namespace:
message.sender = CPIMIdentity.parse(value)
elif name == 'To' and namespace == cls.standard_namespace:
message.recipients.append(CPIMIdentity.parse(value))
elif name == 'cc' and namespace == cls.standard_namespace:
message.courtesy_recipients.append(CPIMIdentity.parse(value))
elif name == 'Subject' and namespace == cls.standard_namespace:
match = cls.subject_re.match(value)
if match is None:
raise ValueError('Illegal Subject header: %r' % value)
lang, subject = match.groups()
# language tags must be ASCII
subjects[str(lang) if lang is not None else None] = subject
elif name == 'DateTime' and namespace == cls.standard_namespace:
- message.timestamp = Timestamp.parse(value)
+ message.timestamp = ISOTimestamp(value)
elif name == 'Required' and namespace == cls.standard_namespace:
message.required.extend(re.split(r'\s*,\s*', value))
elif name == 'NS' and namespace == cls.standard_namespace:
match = cls.namespace_re.match(value)
if match is None:
raise ValueError('Illegal NS header: %r' % value)
prefix, uri = match.groups()
namespaces[prefix] = Namespace(uri, prefix)
else:
message.additional_headers.append(CPIMHeader(name, namespace, value))
except ValueError:
pass
if None in subjects:
message.subject = MultilingualText(subjects.pop(None), **subjects)
else:
message.subject = MultilingualText(**subjects)
mime_message = Parser().parsestr(body)
message.content_type = mime_message.get_content_type()
if message.content_type.startswith('multipart/') or message.content_type == 'message/rfc822':
message.body = mime_message.get_payload()
elif message.content_type.startswith('text/'):
message.body = mime_message.get_payload().decode(mime_message.get_content_charset() or 'utf-8')
else:
message.body = mime_message.get_payload()
if message.content_type is None:
raise CPIMParserError("CPIM message missing Content-Type MIME header")
return message
diff --git a/sipsimple/util.py b/sipsimple/util.py
index 1380328e..6ad8aa39 100644
--- a/sipsimple/util.py
+++ b/sipsimple/util.py
@@ -1,145 +1,105 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""Implements utilities commonly used in various parts of the library"""
from __future__ import absolute_import
-__all__ = ["All", "Any", "MultilingualText", "Timestamp", "user_info"]
+__all__ = ["All", "Any", "ISOTimestamp", "MultilingualText", "user_info"]
import os
import platform
-import re
import sys
+import dateutil.parser
from application.python.types import Singleton, MarkerType
from datetime import datetime
-from dateutil.tz import tzoffset
+from dateutil.tz import tzlocal, tzutc
# Utility classes
#
class All(object):
__metaclass__ = MarkerType
class Any(object):
__metaclass__ = MarkerType
+class ISOTimestamp(datetime):
+ def __new__(cls, *args, **kw):
+ if len(args) == 1:
+ value = args[0]
+ if isinstance(value, cls):
+ return value
+ elif isinstance(value, basestring):
+ value = dateutil.parser.parse(value)
+ return cls(value.year, value.month, value.day, value.hour, value.minute, value.second, value.microsecond, value.tzinfo)
+ elif isinstance(value, datetime):
+ return cls(value.year, value.month, value.day, value.hour, value.minute, value.second, value.microsecond, value.tzinfo or tzlocal())
+ else:
+ return datetime.__new__(cls, *args, **kw)
+ else:
+ if len(args) < 8 and 'tzinfo' not in kw:
+ kw['tzinfo'] = tzlocal()
+ return datetime.__new__(cls, *args, **kw)
+
+ def __str__(self):
+ return self.isoformat()
+
+ @classmethod
+ def now(cls):
+ return cls(datetime.now(tzlocal()))
+
+ @classmethod
+ def utcnow(cls):
+ return cls(datetime.now(tzutc()))
+
+
class MultilingualText(unicode):
def __new__(cls, *args, **translations):
if len(args) > 1:
raise TypeError("%s.__new__ takes at most 1 positional argument (%d given)" % (cls.__name__, len(args)))
default = args[0] if args else translations.get('en', u'')
obj = unicode.__new__(cls, default)
obj.translations = translations
return obj
def get_translation(self, language):
return self.translations.get(language, self)
-class Timestamp(datetime):
- _timestamp_re = re.compile(r'(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})T(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})(\.(?P<secfrac>\d{1,}))?((?P<UTC>Z)|((?P<tzsign>\+|-)(?P<tzhour>\d{2}):(?P<tzminute>\d{2})))')
-
- @classmethod
- def utc_offset(cls):
- timediff = datetime.now() - datetime.utcnow()
- return int(round((timediff.days*86400 + timediff.seconds + timediff.microseconds/1000000.0)/60))
-
- @classmethod
- def parse(cls, stamp):
- if stamp is None:
- return None
- match = cls._timestamp_re.match(stamp)
- if match is None:
- raise ValueError("Timestamp %s is not in RFC3339 format" % stamp)
- dct = match.groupdict()
- if dct['UTC'] is not None:
- secoffset = 0
- else:
- secoffset = int(dct['tzminute'])*60 + int(dct['tzhour'])*3600
- if dct['tzsign'] == '-':
- secoffset *= -1
- tzinfo = tzoffset(None, secoffset)
- if dct['secfrac'] is not None:
- secfrac = dct['secfrac'][:6]
- secfrac += '0'*(6-len(secfrac))
- secfrac = int(secfrac)
- else:
- secfrac = 0
- dt = datetime(int(dct['year']), month=int(dct['month']), day=int(dct['day']),
- hour=int(dct['hour']), minute=int(dct['minute']), second=int(dct['second']),
- microsecond=secfrac, tzinfo=tzinfo)
- return cls(dt)
-
- @classmethod
- def format(cls, dt):
- if dt is None:
- return None
- if dt.tzinfo is not None:
- return dt.replace(microsecond=0).isoformat()
- minutes = cls.utc_offset()
- if minutes == 0:
- tzspec = 'Z'
- else:
- if minutes < 0:
- sign = '-'
- minutes *= -1
- else:
- sign = '+'
- hours = minutes / 60
- minutes = minutes % 60
- tzspec = '%s%02d:%02d' % (sign, hours, minutes)
- return dt.replace(microsecond=0).isoformat()+tzspec
-
- def __new__(cls, value, *args, **kwargs):
- if isinstance(value, cls):
- return value
- elif isinstance(value, datetime):
- return cls(value.year, month=value.month, day=value.day,
- hour=value.hour, minute=value.minute, second=value.second,
- microsecond=value.microsecond, tzinfo=value.tzinfo)
- elif isinstance(value, basestring):
- return cls.parse(value)
- else:
- return datetime.__new__(cls, value, *args, **kwargs)
-
- def __str__(self):
- return self.format(self)
-
-
# Utility objects
#
class UserInfo(object):
__metaclass__ = Singleton
def __repr__(self):
attribs = ', '.join('%s=%r' % (attr, getattr(self, attr)) for attr in ('username', 'fullname'))
return '%s(%s)' % (self.__class__.__name__, attribs)
@property
def username(self):
if platform.system() == 'Windows':
name = os.getenv('USERNAME')
else:
import pwd
name = pwd.getpwuid(os.getuid()).pw_name
return name.decode(sys.getfilesystemencoding())
@property
def fullname(self):
if platform.system() == 'Windows':
name = os.getenv('USERNAME')
else:
import pwd
name = pwd.getpwuid(os.getuid()).pw_gecos.split(',', 1)[0] or pwd.getpwuid(os.getuid()).pw_name
return name.decode(sys.getfilesystemencoding())
user_info = UserInfo()
del UserInfo

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 5:50 AM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408888
Default Alt Text
(66 KB)

Event Timeline