Page MenuHomePhabricator

No OneTemporary

diff --git a/sipsimple/payloads/commonpolicy.py b/sipsimple/payloads/commonpolicy.py
index 4e1592f1..09fad7b4 100644
--- a/sipsimple/payloads/commonpolicy.py
+++ b/sipsimple/payloads/commonpolicy.py
@@ -1,357 +1,367 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Generic data types to be used in policy applications, according to
RFC4745.
"""
__all__ = ['namespace',
'CommonPolicyDocument',
'ConditionElement',
'ActionElement',
'TransformationElement',
'RuleExtension',
'IdentityOne',
'IdentityExcept',
'IdentityMany',
'Identity',
'Validity',
'Conditions',
'Actions',
'Transformations',
'Rule',
'RuleSet',
# Extensions
'FalseCondition',
'RuleDisplayName']
from sipsimple.payloads import ValidationError, XMLDocument, XMLElement, XMLListElement, XMLListRootElement, XMLAttribute, XMLElementID, XMLElementChild, XMLLocalizedStringElement, XMLDateTimeElement
+from sipsimple.payloads import IterateIDs, IterateItems, All
from sipsimple.payloads.datatypes import AnyURI, ID
namespace = 'urn:ietf:params:xml:ns:common-policy'
class CommonPolicyDocument(XMLDocument):
content_type = 'application/auth-policy+xml'
CommonPolicyDocument.register_namespace(namespace, prefix='cp', schema='common-policy.xsd')
## Mixin types for extensibility
class ConditionElement(object): pass
class ActionElement(object): pass
class TransformationElement(object): pass
class RuleExtension(object): pass
## Elements
class IdentityOne(XMLElement):
_xml_tag = 'one'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
id = XMLElementID('id', type=AnyURI, required=True, test_equal=True)
def __init__(self, id):
XMLElement.__init__(self)
self.id = id
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.id)
def __str__(self):
return self.id
def matches(self, uri):
return self.id == uri
class IdentityExcept(XMLElement):
_xml_tag = 'except'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
def _onset_id(self, attribute, value):
if value is not None:
self.domain = None
id = XMLAttribute('id', type=str, required=False, test_equal=True, onset=_onset_id)
del _onset_id
def _onset_domain(self, attribute, value):
if value is not None:
self.id = None
domain = XMLAttribute('domain', type=str, required=False, test_equal=True, onset=_onset_domain)
del _onset_domain
def __init__(self, id=None, domain=None):
XMLElement.__init__(self)
self.id = id
self.domain = domain
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.id, self.domain)
def __str__(self):
if self.id is not None:
return self.id
else:
return self.domain
def matches(self, uri):
if self.id is not None:
return self.id != uri
else:
return [self.domain] != uri.split('@', 1)[1:]
class IdentityMany(XMLListElement):
_xml_tag = 'many'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_children_order = {IdentityExcept.qname: 0}
_xml_item_type = IdentityExcept
domain = XMLAttribute('domain', type=str, required=False, test_equal=True)
def __init__(self, domain=None, exceptions=[]):
XMLListElement.__init__(self)
self.domain = domain
self.update(exceptions)
def __repr__(self):
return '%s(%r, %s)' % (self.__class__.__name__, self.domain, list(self))
def matches(self, uri):
if self.domain is not None:
if self.domain != uri.partition('@')[2]:
return False
for child in self:
if not child.matches(uri):
return False
return True
class Identity(XMLListElement):
_xml_tag = 'identity'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = (IdentityOne, IdentityMany)
def __init__(self, identities=[]):
XMLListElement.__init__(self)
self.update(identities)
def matches(self, uri):
for child in self:
if child.matches(uri):
return True
return False
class Sphere(XMLElement):
_xml_tag = 'sphere'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
value = XMLAttribute('value', type=str, required=True, test_equal=True)
def __init__(self, value):
XMLElement.__init__(self)
self.value = value
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.value)
class ValidFrom(XMLDateTimeElement):
_xml_tag = 'from'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
class ValidUntil(XMLDateTimeElement):
_xml_tag = 'until'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
class ValidityInterval(object):
def __init__(self, from_timestamp, until_timestamp):
self.valid_from = ValidFrom(from_timestamp)
self.valid_until = ValidUntil(until_timestamp)
def __eq__(self, other):
if isinstance(other, ValidityInterval):
return self is other or (self.valid_from == other.valid_from and self.valid_until == other.valid_until)
return NotImplemented
def __ne__(self, other):
if isinstance(other, ValidityInterval):
return self is not other and (self.valid_from != other.valid_from or self.valid_until != other.valid_until)
return NotImplemented
@classmethod
def from_elements(cls, from_element, until_element, xml_document=None):
instance = object.__new__(cls)
instance.valid_from = ValidFrom.from_element(from_element, xml_document)
instance.valid_until = ValidUntil.from_element(until_element, xml_document)
return instance
class Validity(XMLListElement):
_xml_tag = 'validity'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = ValidityInterval
def __init__(self, children=[]):
XMLListElement.__init__(self)
self.update(children)
def _parse_element(self, element):
iterator = iter(element)
for first_child in iterator:
second_child = iterator.next()
if first_child.tag == '{%s}from' % self._xml_namespace and second_child.tag == '{%s}until' % self._xml_namespace:
try:
item = ValidityInterval.from_elements(first_child, second_child, xml_document=self._xml_document)
except:
pass
else:
self._element_map[item.valid_from.element] = item
def _build_element(self):
for child in self:
child.valid_from.to_element()
child.valid_until.to_element()
def add(self, item):
if not isinstance(item, ValidityInterval):
raise TypeError("Validity element must be a ValidityInterval instance")
self._insert_element(item.valid_from.element)
self._insert_element(item.valid_until.element)
self._element_map[item.valid_from.element] = item
self.__dirty__ = True
def remove(self, item):
self.element.remove(item.valid_from.element)
self.element.remove(item.valid_until.element)
del self._element_map[item.valid_from.element]
self.__dirty__ = True
def check_validity(self):
if not self:
raise ValidationError("cannot have Validity element without any children")
super(Validity, self).check_validity(self)
class Conditions(XMLListElement):
_xml_tag = 'conditions'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_children_order = {Identity.qname: 0,
Sphere.qname: 1,
Validity.qname: 2}
_xml_item_type = (Identity, Sphere, Validity, ConditionElement)
def __init__(self, conditions=[]):
XMLListElement.__init__(self)
self.update(conditions)
class Actions(XMLListElement):
_xml_tag = 'actions'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = ActionElement
def __init__(self, actions=[]):
XMLListElement.__init__(self)
self.update(actions)
class Transformations(XMLListElement):
_xml_tag = 'transformations'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = TransformationElement
def __init__(self, transformations=[]):
XMLListElement.__init__(self)
self.update(transformations)
class Rule(XMLElement):
_xml_tag = 'rule'
_xml_namespace = namespace
_xml_extension_type = RuleExtension
_xml_document = CommonPolicyDocument
_xml_children_order = {Conditions.qname: 0,
Actions.qname: 1,
Transformations.qname: 2}
id = XMLElementID('id', type=ID, required=True, test_equal=True)
conditions = XMLElementChild('conditions', type=Conditions, required=False, test_equal=True)
actions = XMLElementChild('actions', type=Actions, required=False, test_equal=True)
transformations = XMLElementChild('transformations', type=Transformations, required=False, test_equal=True)
def __init__(self, id, conditions=None, actions=None, transformations=None):
XMLElement.__init__(self)
self.id = id
self.conditions = conditions
self.actions = actions
self.transformations = transformations
def __repr__(self):
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.id, self.conditions, self.actions, self.transformations)
class RuleSet(XMLListRootElement):
_xml_tag = 'ruleset'
_xml_namespace = namespace
_xml_document = CommonPolicyDocument
_xml_item_type = Rule
def __init__(self, rules=[]):
XMLListRootElement.__init__(self)
self.update(rules)
def __getitem__(self, key):
- return self._xmlid_map[Rule][key]
+ if key is IterateIDs:
+ return self._xmlid_map[Rule].iterkeys()
+ elif key is IterateItems:
+ return self._xmlid_map[Rule].itervalues()
+ else:
+ return self._xmlid_map[Rule][key]
def __delitem__(self, key):
- self.remove(self._xmlid_map[Rule][key])
+ if key is All:
+ for item in self._xmlid_map[Rule].values():
+ self.remove(item)
+ else:
+ self.remove(self._xmlid_map[Rule][key])
def get(self, key, default=None):
return self._xmlid_map[Rule].get(key, default)
#
# Extensions
#
agp_cp_namespace = 'urn:ag-projects:xml:ns:common-policy'
CommonPolicyDocument.register_namespace(agp_cp_namespace, prefix='agp-cp')
# A condition element in the AG Projects namespace, it will always be evaluated to false
# because it's not understood by servers
class FalseCondition(XMLElement, ConditionElement):
_xml_tag = 'false-condition'
_xml_namespace = agp_cp_namespace
_xml_document = CommonPolicyDocument
class RuleDisplayName(XMLLocalizedStringElement, RuleExtension):
_xml_tag = 'display-name'
_xml_namespace = agp_cp_namespace
_xml_document = CommonPolicyDocument
Rule.register_extension('display_name', RuleDisplayName)
diff --git a/sipsimple/payloads/dialoginfo.py b/sipsimple/payloads/dialoginfo.py
index f9f94713..5691621b 100644
--- a/sipsimple/payloads/dialoginfo.py
+++ b/sipsimple/payloads/dialoginfo.py
@@ -1,257 +1,267 @@
# Copyright (C) 2009-2011 AG Projects. See LICENSE for details.
#
"""Parses and produces dialog-info messages according to RFC4235."""
__all__ = ['namespace',
'DialogInfoDocument',
'DialogState',
'Replaces',
'ReferredBy',
'Identity',
'Param',
'Target',
'Local',
'Remote',
'Dialog',
'DialogInfo']
from sipsimple.payloads import XMLDocument, XMLListRootElement, XMLListElement, XMLStringElement, XMLNonNegativeIntegerElement, XMLElementChild, XMLEmptyElement, XMLElement, XMLElementID, XMLAttribute
+from sipsimple.payloads import IterateIDs, IterateItems, All
namespace = 'urn:ietf:params:xml:ns:dialog-info'
class DialogInfoDocument(XMLDocument):
content_type = "application/dialog-info+xml"
DialogInfoDocument.register_namespace(namespace, prefix=None, schema='dialog-info.xsd')
# Attribute value types
class StateValue(str):
def __new__(cls, value):
if value not in ('full', 'partial'):
raise ValueError("illegal value for state")
return str.__new__(cls, value)
class VersionValue(int):
def __new__(cls, value):
value = int.__new__(cls, value)
if value < 0:
raise ValueError("illegal value for version")
return value
class DirectionValue(str):
def __new__(cls, value):
if value not in ('initiator', 'recipient'):
raise ValueError("illegal value for direction")
return str.__new__(cls, value)
class DialogEventValue(str):
def __new__(cls, value):
if value not in ('rejected', 'cancelled', 'replaced', 'local-bye', 'remote-bye', 'error', 'timeout'):
raise ValueError("illegal value for dialog state event")
return str.__new__(cls, value)
class DialogStateValue(str):
def __new__(cls, value):
if value not in ('trying', 'proceeding', 'early', 'confirmed', 'terminated'):
raise ValueError("illegal value for dialog state")
return str.__new__(cls, value)
class CodeValue(int):
def __new__(cls, value):
value = int.__new__(cls, value)
if value < 100 or value > 699:
raise ValueError("illegal value for code")
return value
# Elements
class CallId(XMLStringElement):
_xml_tag = 'call-id'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
class LocalTag(XMLStringElement):
_xml_tag = 'local-tag'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
class RemoteTag(XMLStringElement):
_xml_tag = 'remote-tag'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
class DialogState(XMLStringElement):
_xml_tag = 'state'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
_xml_value_type = DialogStateValue
code = XMLAttribute('code', type=int, required=False, test_equal=True)
event = XMLAttribute('event', type=DialogEventValue, required=False, test_equal=True)
class Duration(XMLNonNegativeIntegerElement):
_xml_tag = 'duration'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
class Replaces(XMLEmptyElement):
_xml_tag = 'replaces'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
call_id = XMLAttribute('call_id', xmlname='call-id', type=str, required=True, test_equal=True)
local_tag = XMLAttribute('local_tag', xmlname='local-tag', type=str, required=True, test_equal=True)
remote_tag = XMLAttribute('remote_tag', xmlname='remote-tag', type=str, required=True, test_equal=True)
def __init__(self, call_id, local_tag, remote_tag):
XMLEmptyElement.__init__(self)
self.call_id = call_id
self.local_tag = local_tag
self.remote_tag = remote_tag
class ReferredBy(XMLStringElement):
_xml_tag = 'referred-by'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
display = XMLAttribute('display', type=str, required=False, test_equal=True)
class Identity(XMLStringElement):
_xml_tag = 'identity'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
display = XMLAttribute('display', type=str, required=False, test_equal=True)
class Param(XMLEmptyElement):
_xml_tag = 'param'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
pname = XMLAttribute('pname', type=str, required=True, test_equal=True)
pval = XMLAttribute('pval', type=str, required=True, test_equal=True)
def __init__(self, pname, pval):
XMLEmptyElement.__init__(self)
self.pname = pname
self.pval = pval
class Target(XMLListElement):
_xml_tag = 'target'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
_xml_item_type = Param
uri = XMLAttribute('uri', type=str, required=True, test_equal=True)
def __init__(self, uri, params=[]):
self.uri = uri
self.update(params)
class Participant(XMLElement):
identity = XMLElementChild('identity', type=Identity, required=False, test_equal=True)
target = XMLElementChild('target', type=Target, required=False, test_equal=True)
def __init__(self, identity=None, target=None):
XMLElement.__init__(self)
self.identity = identity
self.target = target
class Local(Participant):
_xml_tag = 'local'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
class Remote(Participant):
_xml_tag = 'remote'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
class Dialog(XMLElement):
_xml_tag = 'dialog'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
id = XMLElementID('id', type=str, required=True, test_equal=True)
call_id = XMLAttribute('call_id', xmlname='call-id', type=str, required=False, test_equal=True)
local_tag = XMLAttribute('local_tag', xmlname='local-tag', type=str, required=False, test_equal=True)
remote_tag = XMLAttribute('remote_tag', xmlname='remote-tag', type=str, required=False, test_equal=True)
direction = XMLAttribute('direction', type=DirectionValue, required=False, test_equal=True)
state = XMLElementChild('state', type=DialogState, required=True, test_equal=True)
duration = XMLElementChild('duration', type=Duration, required=False, test_equal=True)
replaces = XMLElementChild('replaces', type=Replaces, required=False, test_equal=True)
referred_by = XMLElementChild('referred_by', type=ReferredBy, required=False, test_equal=True)
local = XMLElementChild('local', type=Local, required=False, test_equal=True)
remote = XMLElementChild('remote', type=Remote, required=False, test_equal=True)
def __init__(self, id, state, call_id=None, local_tag=None, remote_tag=None, direction=None, duration=None, replaces=None, referred_by=None, local=None, remote=None):
XMLElement.__init__(self)
self.id = id
self.state = state
self.call_id = call_id
self.local_tag = local_tag
self.remote_tag = remote_tag
self.direction = direction
self.duration = duration
self.replaces = replaces
self.referred_by = referred_by
self.local = local
self.remote = remote
class DialogInfo(XMLListRootElement):
_xml_tag = 'dialog-info'
_xml_namespace = namespace
_xml_document = DialogInfoDocument
_xml_children_order = {Dialog.qname: 0,
None: 1}
_xml_item_type = Dialog
version = XMLAttribute('version', type=VersionValue, required=True, test_equal=True)
state = XMLAttribute('state', type=StateValue, required=True, test_equal=True)
entity = XMLAttribute('entity', type=str, required=True, test_equal=True)
def __init__(self, version, state, entity, dialogs=[]):
XMLListRootElement.__init__(self)
self.version = version
self.state = state
self.entity = entity
self.update(dialogs)
def __getitem__(self, key):
- return self._xmlid_map[Dialog][key]
+ if key is IterateIDs:
+ return self._xmlid_map[Dialog].iterkeys()
+ elif key is IterateItems:
+ return self._xmlid_map[Dialog].itervalues()
+ else:
+ return self._xmlid_map[Dialog][key]
def __delitem__(self, key):
- self.remove(self._xmlid_map[Dialog][key])
+ if key is All:
+ for item in self._xmlid_map[Dialog].values():
+ self.remove(item)
+ else:
+ self.remove(self._xmlid_map[Dialog][key])
def get(self, key, default=None):
return self._xmlid_map[Dialog].get(key, default)
diff --git a/sipsimple/payloads/resourcelists.py b/sipsimple/payloads/resourcelists.py
index 1dc956cd..aa6a3a98 100644
--- a/sipsimple/payloads/resourcelists.py
+++ b/sipsimple/payloads/resourcelists.py
@@ -1,368 +1,378 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""Resource lists (rfc4826) handling"""
__all__ = ['namespace',
'ResourceListsDocument',
'DisplayName',
'Entry',
'EntryRef',
'External',
'List',
'ResourceLists',
# Extensions
'EntryAttributes']
from collections import deque
from lxml import etree
from xml.sax.saxutils import quoteattr
from sipsimple.payloads import XMLDocument, XMLListRootElement, XMLElement, XMLListElement, XMLLocalizedStringElement, XMLElementID, XMLElementChild, ThisClass
+from sipsimple.payloads import IterateIDs, IterateItems, All
from sipsimple.payloads.datatypes import AnyURI
namespace = 'urn:ietf:params:xml:ns:resource-lists'
# excerpt from the RFC:
# <list>
# attribute "name" - optional, unique among the same level
# body: optional <display-name>, the sequence of entry/list/entry-ref/external
# <display-name>
# attribute xml:lang - optional
# body: utf8 string
# <entry>
# attribute "uri" - mandatory, unique among all other <uri> within the same parent
# body: optional <display-name>
# <entry-ref>
# attribute "ref" - mandatory, unique among all other <entry-ref> within the same parent
# body: optional <display-name>
# ref is a relative URI that resolves into <entry>
# <external>
# attribute "anchor" - mandatory, unique among all other anchor in <external> within the same parent
# anchor must be an absolute http uri that resolves into <list>
class ResourceListsDocument(XMLDocument):
content_type = 'application/resource-lists+xml'
ResourceListsDocument.register_namespace(namespace, prefix='rl', schema='resourcelists.xsd')
## Marker mixins
class ListElement(object): pass
class EntryExtension(object): pass
## Elements
class DisplayName(XMLLocalizedStringElement):
_xml_tag = 'display-name'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
class Entry(XMLElement):
_xml_tag = 'entry'
_xml_namespace = namespace
_xml_extension_type = EntryExtension
_xml_document = ResourceListsDocument
_xml_children_order = {DisplayName.qname: 0}
uri = XMLElementID('uri', type=AnyURI, required=True, test_equal=True)
display_name = XMLElementChild('display_name', type=DisplayName, required=False, test_equal=False)
def __init__(self, uri, display_name=None):
XMLElement.__init__(self)
self.uri = uri
self.display_name = display_name
def __unicode__(self):
return self.display_name and u'"%s" <%s>' % (self.display_name, self.uri) or self.uri
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.uri, self.display_name)
class EntryRef(XMLElement):
_xml_tag = 'entry-ref'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
_xml_children_order = {DisplayName.qname: 0}
ref = XMLElementID('ref', type=AnyURI, required=True, test_equal=True)
display_name = XMLElementChild('display_name', type=DisplayName, required=False, test_equal=False)
def __init__(self, ref, display_name=None):
XMLElement.__init__(self)
self.ref = ref
self.display_name = display_name
def __unicode__(self):
return self.display_name and '"%s" <%s>' % (self.display_name, self.ref) or self.ref
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.ref, self.display_name)
class External(XMLElement):
_xml_tag = 'external'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
_xml_children_order = {DisplayName.qname: 0}
anchor = XMLElementID('anchor', type=AnyURI, required=True, test_equal=True)
display_name = XMLElementChild('display_name', type=DisplayName, required=False, test_equal=False)
def __init__(self, anchor, display_name=None):
XMLElement.__init__(self)
self.anchor = anchor
self.display_name = display_name
def __unicode__(self):
return self.display_name and '"%s" <%s>' % (self.display_name, self.anchor) or self.anchor
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.anchor, self.display_name)
List = ThisClass # a List can contain items of its own kind
class List(XMLListElement):
_xml_tag = 'list'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
_xml_children_order = {DisplayName.qname: 0,
Entry.qname: 1,
EntryRef.qname: 1,
External.qname: 1}
_xml_item_type = (Entry, EntryRef, External, List, ListElement)
name = XMLElementID('name', type=unicode, required=False, test_equal=True)
display_name = XMLElementChild('display_name', type=DisplayName, required=False, test_equal=False)
def __init__(self, entries=[], name=None, display_name=None):
XMLListElement.__init__(self)
self.name = name
self.display_name = display_name
self.update(entries)
def __repr__(self):
return '%s(%s, %r, %r)' % (self.__class__.__name__, list(self), self.name, self.display_name)
def __unicode__(self):
name = u'List element'
if self.name is not None:
name += u' %s' % self.name
if self.display_name is not None:
name += u' (%s)' % self.display_name
return name
List._xml_children_order[List.qname] = 1 # cannot self reference in declaration
class ResourceLists(XMLListRootElement):
_xml_tag = 'resource-lists'
_xml_namespace = namespace
_xml_document = ResourceListsDocument
_xml_children_order = {List.qname: 0}
_xml_item_type = List
def __init__(self, lists=[]):
XMLListRootElement.__init__(self)
self.update(lists)
def __getitem__(self, key):
- return self._xmlid_map[List][key]
+ if key is IterateIDs:
+ return self._xmlid_map[List].iterkeys()
+ elif key is IterateItems:
+ return self._xmlid_map[List].itervalues()
+ else:
+ return self._xmlid_map[List][key]
def __delitem__(self, key):
- self.remove(self._xmlid_map[List][key])
+ if key is All:
+ for item in self._xmlid_map[List].values():
+ self.remove(item)
+ else:
+ self.remove(self._xmlid_map[List][key])
def get(self, key, default=None):
return self._xmlid_map[List].get(key, default)
def get_xpath(self, element):
if not isinstance(element, (List, Entry, EntryRef, External, ResourceLists)):
raise ValueError('can only find xpath for List, Entry, EntryRef or External elements')
nsmap = dict((namespace, prefix) for prefix, namespace in self._xml_document.nsmap.iteritems())
nsmap[self._xml_namespace] = None
xpath_nsmap = {}
root_xpath = '/' + self._xml_tag
if element is self:
return root_xpath
notexpanded = deque([self])
visited = set(notexpanded)
parents = {self: None}
obj = None
while notexpanded:
list = notexpanded.popleft()
for child in list:
if child is element:
parents[child] = list
obj = child
notexpanded.clear()
break
elif isinstance(child, List) and child not in visited:
parents[child] = list
notexpanded.append(child)
visited.add(child)
if obj is None:
return None
components = []
while obj is not self:
prefix = nsmap[obj._xml_namespace]
if prefix:
name = '%s:%s' % (prefix, obj._xml_tag)
xpath_nsmap[obj._xml_namespace] = prefix
else:
name = obj._xml_tag
if isinstance(obj, List):
if obj.name is not None:
components.append('/%s[@%s=%s]' % (name, List.name.xmlname, quoteattr(obj.name)))
else:
siblings = [l for l in parents[obj] if isinstance(l, List)]
components.append('/%s[%d]' % (name, siblings.index(obj)+1))
elif isinstance(obj, Entry):
components.append('/%s[@%s=%s]' % (name, Entry.uri.xmlname, quoteattr(obj.uri)))
elif isinstance(obj, EntryRef):
components.append('/%s[@%s=%s]' % (name, EntryRef.ref.xmlname, quoteattr(obj.ref)))
elif isinstance(obj, External):
components.append('/%s[@%s=%s]' % (name, External.anchor.xmlname, quoteattr(obj.anchor)))
obj = parents[obj]
components.reverse()
return root_xpath + ''.join(components) + ('?' + ''.join('xmlns(%s=%s)' % (prefix, namespace) for namespace, prefix in xpath_nsmap.iteritems()) if xpath_nsmap else '')
def find_parent(self, element):
if not isinstance(element, (List, Entry, EntryRef, External)):
raise ValueError('can only find parent for List, Entry, EntryRef or External elements')
if element is self:
return None
notexpanded = deque([self])
visited = set(notexpanded)
while notexpanded:
list = notexpanded.popleft()
for child in list:
if child is element:
return list
elif isinstance(child, List) and child not in visited:
notexpanded.append(child)
visited.add(child)
return None
#
# Extensions
#
class EntryAttributes(XMLElement, EntryExtension):
_xml_tag = 'attributes'
_xml_namespace = 'urn:ag-projects:xml:ns:resource-lists'
_xml_document = ResourceListsDocument
def __init__(self, iterable=(), **attributes):
XMLElement.__init__(self)
self._attributes = dict()
self.update(iterable, **attributes)
def _parse_element(self, element):
self._attributes = dict()
attribute_tag = '{%s}attribute' % self._xml_namespace
for child in (child for child in element if child.tag == attribute_tag):
if 'nil' in child.attrib:
self[child.attrib['name']] = None
else:
self[child.attrib['name']] = unicode(child.text or u'')
def _build_element(self):
self.element.clear()
attribute_tag = '{%s}attribute' % self._xml_namespace
for key, value in self.iteritems():
child = etree.SubElement(self.element, attribute_tag, nsmap=self._xml_document.nsmap)
child.attrib['name'] = key
if value is None:
child.attrib['nil'] = 'true'
else:
child.text = value
def __contains__(self, key):
return key in self._attributes
def __iter__(self):
return iter(self._attributes)
def __len__(self):
return len(self._attributes)
def __getitem__(self, key):
return self._attributes[key]
def __setitem__(self, key, value):
if self._attributes.get(key, None) == value:
return
self._attributes[key] = value
self.__dirty__ = True
def __delitem__(self, key):
del self._attributes[key]
self.__dirty__ = True
def clear(self):
if self._attributes:
self._attributes.clear()
self.__dirty__ = True
def get(self, key, default=None):
return self._attributes.get(key, default)
def has_key(self, key):
return key in self._attributes
def items(self):
return self._attributes.items()
def iteritems(self):
return self._attributes.iteritems()
def iterkeys(self):
return self._attributes.iterkeys()
def itervalues(self):
return self._attributes.itervalues()
def keys(self):
return self._attributes.keys()
def pop(self, key, *args):
value = self._attributes.pop(key, *args)
if not args or value is not args[0]:
self.__dirty__ = True
return value
def popitem(self):
value = self._attributes.popitem()
self.__dirty__ = True
return value
def setdefault(self, key, default=None):
value = self._attributes.setdefault(key, default)
if value is default:
self.__dirty__ = True
return value
def update(self, iterable=(), **attributes):
self._attributes.update(iterable, **attributes)
if iterable or attributes:
self.__dirty__ = True
ResourceListsDocument.register_namespace(EntryAttributes._xml_namespace, prefix='agp-rl')
Entry.register_extension('attributes', EntryAttributes)
diff --git a/sipsimple/payloads/rlsservices.py b/sipsimple/payloads/rlsservices.py
index c753a863..aa329e6a 100644
--- a/sipsimple/payloads/rlsservices.py
+++ b/sipsimple/payloads/rlsservices.py
@@ -1,127 +1,137 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""RFC4826 compliant parser/builder for application/rls-services+xml documents."""
__all__ = ['namespace',
'RLSServicesDocument',
'Packages',
'ResourceList',
'List',
'Service',
'RLSServices']
from sipsimple.payloads import XMLListRootElement, XMLElement, XMLListElement, XMLStringElement, XMLAnyURIElement, XMLElementID, XMLElementChild, XMLElementChoiceChild
+from sipsimple.payloads import IterateIDs, IterateItems, All
from sipsimple.payloads import resourcelists
from sipsimple.payloads.datatypes import AnyURI
namespace = 'urn:ietf:params:xml:ns:rls-services'
class RLSServicesDocument(resourcelists.ResourceListsDocument):
content_type = 'application/rls-services+xml'
RLSServicesDocument.register_namespace(namespace, prefix=None, schema='rlsservices.xsd')
## Marker mixins
class PackagesElement(object): pass
## Elements
class Package(XMLStringElement):
_xml_tag = 'package'
_xml_namespace = namespace
_xml_document = RLSServicesDocument
class Packages(XMLListElement):
_xml_tag = 'packages'
_xml_namespace = namespace
_xml_document = RLSServicesDocument
_xml_children_order = {Package.qname: 0}
_xml_item_type = (Package, PackagesElement)
def __init__(self, packages=[]):
XMLListElement.__init__(self)
self.update(packages)
def __iter__(self):
return (unicode(item) if type(item) is Package else item for item in super(Packages, self).__iter__())
def add(self, item):
if isinstance(item, basestring):
item = Package(item)
super(Packages, self).add(item)
def remove(self, item):
if isinstance(item, basestring):
package = Package(item)
try:
item = (entry for entry in super(Packages, self).__iter__() if entry == package).next()
except StopIteration:
raise KeyError(item)
super(Packages, self).remove(item)
class ResourceList(XMLAnyURIElement):
_xml_tag = 'resource-list'
_xml_namespace = namespace
_xml_document = RLSServicesDocument
# This is identical to the list element in resourcelists, except for the
# namespace. We'll redefine the xml tag just for readability purposes.
class List(resourcelists.List):
_xml_tag = 'list'
_xml_namespace = namespace
_xml_document = RLSServicesDocument
class Service(XMLElement):
_xml_tag = 'service'
_xml_namespace = namespace
_xml_document = RLSServicesDocument
_xml_children_order = {List.qname: 0,
ResourceList.qname: 0,
Packages.qname: 1}
uri = XMLElementID('uri', type=AnyURI, required=True, test_equal=True)
list = XMLElementChoiceChild('list', types=(ResourceList, List), required=True, test_equal=True)
packages = XMLElementChild('packages', type=Packages, required=False, test_equal=True)
def __init__(self, uri, list=None, packages=None):
XMLElement.__init__(self)
self.uri = uri
self.list = list if list is not None else List()
self.packages = packages if packages is not None else Packages()
def __repr__(self):
return '%s(%r, %r, %r)' % (self.__class__.__name__, self.uri, self.list, self.packages)
class RLSServices(XMLListRootElement):
_xml_tag = 'rls-services'
_xml_namespace = namespace
_xml_document = RLSServicesDocument
_xml_children_order = {Service.qname: 0}
_xml_item_type = Service
def __init__(self, services=[]):
XMLListRootElement.__init__(self)
self.update(services)
def __getitem__(self, key):
- return self._xmlid_map[Service][key]
+ if key is IterateIDs:
+ return self._xmlid_map[Service].iterkeys()
+ elif key is IterateItems:
+ return self._xmlid_map[Service].itervalues()
+ else:
+ return self._xmlid_map[Service][key]
def __delitem__(self, key):
- self.remove(self._xmlid_map[Service][key])
+ if key is All:
+ for item in self._xmlid_map[Service].values():
+ self.remove(item)
+ else:
+ self.remove(self._xmlid_map[Service][key])
def get(self, key, default=None):
return self._xmlid_map[Service].get(key, default)
diff --git a/sipsimple/payloads/watcherinfo.py b/sipsimple/payloads/watcherinfo.py
index 29c932c2..0499ba75 100644
--- a/sipsimple/payloads/watcherinfo.py
+++ b/sipsimple/payloads/watcherinfo.py
@@ -1,197 +1,216 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""Parses application/watcherinfo+xml documents according to RFC3857 and RFC3858."""
__all__ = ['namespace',
'NeedFullUpdateError',
'WatcherInfoDocument',
'Watcher',
'WatcherList',
'WatcherInfo']
from sipsimple.payloads import XMLDocument, XMLAnyURIElement, XMLListElement, XMLListRootElement, XMLElementID, XMLAttribute
+from sipsimple.payloads import IterateIDs, IterateItems, All
from sipsimple.payloads.datatypes import UnsignedLong, SIPURI
namespace = 'urn:ietf:params:xml:ns:watcherinfo'
class NeedFullUpdateError(Exception): pass
class WatcherInfoDocument(XMLDocument):
content_type = 'application/watcherinfo+xml'
WatcherInfoDocument.register_namespace(namespace, prefix=None, schema='watcherinfo.xsd')
## Attribute value types
class WatcherStatus(str):
def __new__(cls, value):
if value not in ('pending', 'active', 'waiting', 'terminated'):
raise ValueError('illegal status value for watcher')
return str.__new__(cls, value)
class WatcherEvent(str):
def __new__(cls, value):
if value not in ('subscribe', 'approved', 'deactivated', 'probation', 'rejected', 'timeout', 'giveup', 'noresource'):
raise ValueError('illegal event value for watcher')
return str.__new__(cls, value)
class WatcherInfoState(str):
def __new__(cls, value):
if value not in ('full', 'partial'):
raise ValueError('illegal state value for watcherinfo')
return str.__new__(cls, value)
## XMLElements
class Watcher(XMLAnyURIElement):
"""
Definition for a watcher in a watcherinfo document
Provides the attributes:
* id
* status
* event
* display_name
* expiration
* duration
* sipuri
Can be transformed to a string with the format DISPLAY_NAME <SIP_URI>.
"""
_xml_tag = 'watcher'
_xml_namespace = namespace
_xml_document = WatcherInfoDocument
_xml_value_type = SIPURI
id = XMLElementID('id', type=str, required=True, test_equal=True)
status = XMLAttribute('status', type=WatcherStatus, required=True, test_equal=True)
event = XMLAttribute('event', type=WatcherEvent, required=True, test_equal=True)
display_name = XMLAttribute('display_name', xmlname='display-name', type=str, required=False, test_equal=True)
expiration = XMLAttribute('expiration', type=UnsignedLong, required=False, test_equal=False)
duration = XMLAttribute('duration', xmlname='duration-subscribed', type=UnsignedLong, required=False, test_equal=False)
sipuri = XMLAnyURIElement.value
def __init__(self, sipuri, id, status, event, display_name=None, expiration=None, duration=None):
XMLAnyURIElement.__init__(self)
self.sipuri = sipuri
self.id = id
self.status = status
self.event = event
self.display_name = display_name
self.expiration = expiration
self.duration = duration
def __repr__(self):
return '%s(%r, %r, %r, %r, %r, %r, %r)' % (self.__class__.__name__, self.sipuri, self.id, self.status, self.event, self.display_name, self.expiration, self.duration)
def __str__(self):
return '"%s" <%s>' % (self.display_name, self.sipuri) if self.display_name else self.sipuri
class WatcherList(XMLListElement):
"""
Definition for a list of watchers in a watcherinfo document
It behaves like a list in that it can be indexed by a number, can be
iterated and counted.
It also provides the properties pending, active and terminated which are
generators returning Watcher objects with the corresponding status.
"""
_xml_tag = 'watcher-list'
_xml_namespace = namespace
_xml_document = WatcherInfoDocument
_xml_children_order = {Watcher.qname: 0}
_xml_item_type = Watcher
resource = XMLElementID('resource', type=SIPURI, required=True, test_equal=True)
package = XMLAttribute('package', type=str, required=True, test_equal=True)
def __init__(self, resource, package, watchers=[]):
XMLListElement.__init__(self)
self.resource = resource
self.package = package
self.update(watchers)
+ def __repr__(self):
+ return '%s(%r, %r, %r)' % (self.__class__.__name__, self.resource, self.package, list(self))
+
def __getitem__(self, key):
- return self._xmlid_map[Watcher][key]
+ if key is IterateIDs:
+ return self._xmlid_map[Watcher].iterkeys()
+ elif key is IterateItems:
+ return self._xmlid_map[Watcher].itervalues()
+ else:
+ return self._xmlid_map[Watcher][key]
def __delitem__(self, key):
- self.remove(self._xmlid_map[Watcher][key])
-
- def __repr__(self):
- return '%s(%r, %r, %r)' % (self.__class__.__name__, self.resource, self.package, list(self))
+ if key is All:
+ for item in self._xmlid_map[Watcher].values():
+ self.remove(item)
+ else:
+ self.remove(self._xmlid_map[Watcher][key])
def get(self, key, default=None):
return self._xmlid_map[Watcher].get(key, default)
pending = property(lambda self: (watcher for watcher in self if watcher.status == 'pending'))
waiting = property(lambda self: (watcher for watcher in self if watcher.status == 'waiting'))
active = property(lambda self: (watcher for watcher in self if watcher.status == 'active'))
terminated = property(lambda self: (watcher for watcher in self if watcher.status == 'terminated'))
class WatcherInfo(XMLListRootElement):
"""
Definition for watcher info: a list of WatcherList elements
The user agent instantiates this class once it subscribes to a *.winfo event
and calls its update() method with the applicatin/watcherinfo+xml documents
it receives via NOTIFY.
The watchers can be accessed in two ways:
1. via the wlists property, which returns a list of WatcherList elements;
2. via the pending, active and terminated properties, which return
dictionaries, mapping WatcherList objects to lists of Watcher objects.
Since WatcherList objects can be compared for equality to SIP URI strings,
representing the presentity to which the watchers have subscribed, the
dictionaries can also be indexed by such strings.
"""
_xml_tag = 'watcherinfo'
_xml_namespace = namespace
_xml_document = WatcherInfoDocument
_xml_children_order = {WatcherList.qname: 0}
_xml_item_type = WatcherList
version = XMLAttribute('version', type=int, required=True, test_equal=True)
state = XMLAttribute('state', type=WatcherInfoState, required=True, test_equal=True)
def __init__(self, version=-1, state='full', wlists=[]):
XMLListRootElement.__init__(self)
self.version = version
self.state = state
self.update(wlists)
+ def __repr__(self):
+ return '%s(%r, %r, %r)' % (self.__class__.__name__, self.version, self.state, list(self))
+
def __getitem__(self, key):
- return self._xmlid_map[WatcherList][key]
+ if key is IterateIDs:
+ return self._xmlid_map[WatcherList].iterkeys()
+ elif key is IterateItems:
+ return self._xmlid_map[WatcherList].itervalues()
+ else:
+ return self._xmlid_map[WatcherList][key]
def __delitem__(self, key):
- self.remove(self._xmlid_map[WatcherList][key])
-
- def __repr__(self):
- return '%s(%r, %r, %r)' % (self.__class__.__name__, self.version, self.state, list(self))
+ if key is All:
+ for item in self._xmlid_map[WatcherList].values():
+ self.remove(item)
+ else:
+ self.remove(self._xmlid_map[WatcherList][key])
def get(self, key, default=None):
return self._xmlid_map[WatcherList].get(key, default)
wlists = property(lambda self: self._element_map.values())
pending = property(lambda self: dict((wlist, list(wlist.pending)) for wlist in self._element_map.itervalues()))
waiting = property(lambda self: dict((wlist, list(wlist.waiting)) for wlist in self._element_map.itervalues()))
active = property(lambda self: dict((wlist, list(wlist.active)) for wlist in self._element_map.itervalues()))
terminated = property(lambda self: dict((wlist, list(wlist.terminated)) for wlist in self._element_map.itervalues()))

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 2:28 AM (12 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408600
Default Alt Text
(46 KB)

Event Timeline