diff --git a/sipsimple/account/xcap/__init__.py b/sipsimple/account/xcap/__init__.py
index c4f6406b..cc9c558a 100644
--- a/sipsimple/account/xcap/__init__.py
+++ b/sipsimple/account/xcap/__init__.py
@@ -1,1766 +1,1767 @@
"""High-level management of XCAP documents based on OMA specifications"""
__all__ = ['Group', 'Contact', 'ContactURI', 'EventHandling', 'Policy', 'Icon', 'OfflineStatus', 'XCAPManager', 'XCAPTransaction']
import base64
import pickle
import os
import random
import socket
import weakref
from io import StringIO
from collections import OrderedDict
from datetime import datetime
from itertools import chain
from operator import attrgetter
from urllib.error import URLError
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from eventlib import api, coros, proc
from eventlib.green.httplib import BadStatusLine
from twisted.internet.error import ConnectionLost
from xcaplib.green import XCAPClient
from xcaplib.error import HTTPError
from zope.interface import implementer
from sipsimple import log
from sipsimple.account.subscription import Subscriber, Content
from sipsimple.account.xcap.storage import IXCAPStorage, XCAPStorageError
from sipsimple.configuration.datatypes import SIPAddress
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import ParserError, IterateTypes, IterateIDs, IterateItems, All
from sipsimple.payloads import addressbook, commonpolicy, dialogrules, omapolicy, pidf, prescontent, presrules, resourcelists, rlsservices, xcapcaps, xcapdiff
from sipsimple.payloads import rpid; del rpid # needs to be imported to register its namespace
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, Worker, run_in_green_thread
from sipsimple.util import execute_once
class XCAPError(Exception): pass
class FetchRequiredError(XCAPError): pass
class Document(object):
name = None
application = None
payload_type = None
default_namespace = None
global_tree = None
filename = None
cached = True
def __init__(self, manager):
self.manager = weakref.proxy(manager)
self.content = None
self.etag = None
self.fetch_time = datetime.fromtimestamp(0)
self.update_time = datetime.fromtimestamp(0)
self.dirty = False
self.supported = False
def __bool__(self):
return self.content is not None
@property
def dirty(self):
return self.__dict__['dirty'] or (self.content is not None and self.content.__dirty__)
@dirty.setter
def dirty(self, dirty):
if self.content is not None and not dirty:
self.content.__dirty__ = dirty
self.__dict__['dirty'] = dirty
@property
def relative_url(self):
return self.url[len(self.manager.xcap_root):].lstrip('/')
@property
def url(self):
return self.manager.client.get_url(self.application, None, globaltree=self.global_tree, filename=self.filename)
def load_from_cache(self):
if not self.cached:
return
try:
- document = StringIO(self.manager.storage.load(self.name).decode())
+ doc_io = self.manager.storage.load(self.name)
+ document = StringIO(doc_io.decode())
self.etag = document.readline().strip() or None
self.content = self.payload_type.parse(document)
self.__dict__['dirty'] = False
except (XCAPStorageError, ParserError):
self.etag = None
self.content = None
self.dirty = False
self.fetch_time = datetime.utcnow()
def initialize(self, server_caps):
self.supported = self.application in server_caps.auids
if not self.supported:
self.reset()
def reset(self):
if self.cached and self.content is not None:
try:
self.manager.storage.delete(self.name)
except XCAPStorageError:
pass
self.content = None
self.etag = None
self.dirty = False
def fetch(self):
try:
document = self.manager.client.get(self.application, etagnot=self.etag, globaltree=self.global_tree, headers={'Accept': self.payload_type.content_type}, filename=self.filename)
self.content = self.payload_type.parse(document)
self.etag = document.etag
self.__dict__['dirty'] = False
except (BadStatusLine, ConnectionLost, URLError, socket.error) as e:
raise XCAPError("failed to fetch %s document: %s" % (self.name, e))
except HTTPError as e:
if e.status == 404: # Not Found
if self.content is not None:
self.reset()
self.fetch_time = datetime.utcnow()
elif e.status != 304: # Other than Not Modified:
raise XCAPError("failed to fetch %s document: %s" % (self.name, e))
except ParserError as e:
raise XCAPError("failed to parse %s document: %s" % (self.name, e))
else:
self.fetch_time = datetime.utcnow()
if self.cached:
try:
self.manager.storage.save(self.name, self.etag + os.linesep + document)
except XCAPStorageError:
pass
def update(self):
if not self.dirty:
return
data = self.content.toxml() if self.content is not None else None
try:
kw = dict(etag=self.etag) if self.etag is not None else dict(etagnot='*')
if data is not None:
response = self.manager.client.put(self.application, data, globaltree=self.global_tree, filename=self.filename, headers={'Content-Type': self.payload_type.content_type}, **kw)
else:
response = self.manager.client.delete(self.application, data, globaltree=self.global_tree, filename=self.filename, **kw)
except (BadStatusLine, ConnectionLost, URLError) as e:
raise XCAPError("failed to update %s document: %s" % (self.name, e))
except HTTPError as e:
if e.status == 412: # Precondition Failed
raise FetchRequiredError("document %s was modified externally" % self.name)
elif e.status == 404 and data is None: # attempted to delete a document that did't exist in the first place
pass
else:
raise XCAPError("failed to update %s document: %s" % (self.name, e))
self.etag = response.etag if data is not None else None
self.dirty = False
self.update_time = datetime.utcnow()
if self.cached:
try:
if data is not None:
self.manager.storage.save(self.name, self.etag + os.linesep + data)
else:
self.manager.storage.delete(self.name)
except XCAPStorageError:
pass
class DialogRulesDocument(Document):
name = 'dialog-rules'
application = 'org.openxcap.dialog-rules'
payload_type = dialogrules.DialogRulesDocument
default_namespace = dialogrules.namespace
global_tree = False
filename = 'index'
class PresRulesDocument(Document):
name = 'pres-rules'
application = 'org.openmobilealliance.pres-rules'
payload_type = presrules.PresRulesDocument
default_namespace = presrules.namespace
global_tree = False
filename = 'index'
class ResourceListsDocument(Document):
name = 'resource-lists'
application = 'resource-lists'
payload_type = resourcelists.ResourceListsDocument
default_namespace = resourcelists.namespace
global_tree = False
filename = 'index'
def update(self):
if self.content is not None:
sipsimple_addressbook = self.content['sipsimple_addressbook']
groups = ItemCollection(sipsimple_addressbook[addressbook.Group, IterateItems])
contacts = ItemCollection(sipsimple_addressbook[addressbook.Contact, IterateItems])
policies = ItemCollection(sipsimple_addressbook[addressbook.Policy, IterateItems])
for group, missing_id in ((group, missing_id) for group in groups for missing_id in [id for id in group.contacts if id not in contacts]):
group.contacts.remove(missing_id)
if any(item.__dirty__ for item in chain(contacts, policies)):
oma_grantedcontacts = self.content['oma_grantedcontacts']
oma_blockedcontacts = self.content['oma_blockedcontacts']
dialog_grantedcontacts = self.content['dialog_grantedcontacts']
dialog_blockedcontacts = self.content['dialog_blockedcontacts']
sipsimple_presence_rls = self.content['sipsimple_presence_rls']
sipsimple_dialog_rls = self.content['sipsimple_dialog_rls']
all_contact_uris = set(uri.uri for contact in contacts for uri in contact.uris)
contact_allow_presence_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.presence.policy=='allow')
contact_block_presence_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.presence.policy=='block')
contact_allow_dialog_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.dialog.policy=='allow')
contact_block_dialog_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.dialog.policy=='block')
contact_subscribe_presence_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.presence.subscribe==True)
contact_subscribe_dialog_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.dialog.subscribe==True)
policy_allow_presence_uris = set(policy.uri for policy in policies if policy.presence.policy=='allow')
policy_block_presence_uris = set(policy.uri for policy in policies if policy.presence.policy=='block')
policy_allow_dialog_uris = set(policy.uri for policy in policies if policy.dialog.policy=='allow')
policy_block_dialog_uris = set(policy.uri for policy in policies if policy.dialog.policy=='block')
policy_subscribe_presence_uris = set(policy.uri for policy in policies if policy.presence.subscribe==True)
policy_subscribe_dialog_uris = set(policy.uri for policy in policies if policy.dialog.subscribe==True)
allowed_presence_uris = contact_allow_presence_uris - contact_block_presence_uris | policy_allow_presence_uris - policy_block_presence_uris - all_contact_uris
blocked_presence_uris = contact_block_presence_uris | policy_block_presence_uris - all_contact_uris
allowed_dialog_uris = contact_allow_dialog_uris - contact_block_dialog_uris | policy_allow_dialog_uris - policy_block_dialog_uris - all_contact_uris
blocked_dialog_uris = contact_block_dialog_uris | policy_block_dialog_uris - all_contact_uris
subscribe_presence_uris = contact_subscribe_presence_uris | policy_subscribe_presence_uris - all_contact_uris
subscribe_dialog_uris = contact_subscribe_dialog_uris | policy_subscribe_dialog_uris - all_contact_uris
if allowed_presence_uris != set(entry.uri for entry in oma_grantedcontacts):
oma_grantedcontacts.clear()
oma_grantedcontacts.update(resourcelists.Entry(uri) for uri in allowed_presence_uris)
if blocked_presence_uris != set(entry.uri for entry in oma_blockedcontacts):
oma_blockedcontacts.clear()
oma_blockedcontacts.update(resourcelists.Entry(uri) for uri in blocked_presence_uris)
if allowed_dialog_uris != set(entry.uri for entry in dialog_grantedcontacts):
dialog_grantedcontacts.clear()
dialog_grantedcontacts.update(resourcelists.Entry(uri) for uri in allowed_dialog_uris)
if blocked_dialog_uris != set(entry.uri for entry in dialog_blockedcontacts):
dialog_blockedcontacts.clear()
dialog_blockedcontacts.update(resourcelists.Entry(uri) for uri in blocked_dialog_uris)
if subscribe_presence_uris != set(entry.uri for entry in sipsimple_presence_rls):
sipsimple_presence_rls.clear()
sipsimple_presence_rls.update(resourcelists.Entry(uri) for uri in subscribe_presence_uris)
if subscribe_dialog_uris != set(entry.uri for entry in sipsimple_dialog_rls):
sipsimple_dialog_rls.clear()
sipsimple_dialog_rls.update(resourcelists.Entry(uri) for uri in subscribe_dialog_uris)
super(ResourceListsDocument, self).update()
class RLSServicesDocument(Document):
name = 'rls-services'
application = 'rls-services'
payload_type = rlsservices.RLSServicesDocument
default_namespace = rlsservices.namespace
global_tree = False
filename = 'index'
class XCAPCapsDocument(Document):
name = 'xcap-caps'
application = 'xcap-caps'
payload_type = xcapcaps.XCAPCapabilitiesDocument
default_namespace = xcapcaps.namespace
global_tree = True
filename = 'index'
cached = False
def initialize(self):
self.supported = True
class StatusIconDocument(Document):
name = 'status-icon'
application = 'org.openmobilealliance.pres-content'
payload_type = prescontent.PresenceContentDocument
default_namespace = prescontent.namespace
global_tree = False
filename = 'oma_status-icon/index'
class PIDFManipulationDocument(Document):
name = 'pidf-manipulation'
application = 'pidf-manipulation'
payload_type = pidf.PIDFDocument
default_namespace = pidf.pidf_namespace
global_tree = False
filename = 'index'
class ItemCollection(object):
def __init__(self, items):
self.items = OrderedDict((item.id, item) for item in items)
def __getitem__(self, key):
return self.items[key]
def __contains__(self, key):
return key in self.items
def __iter__(self):
return iter(list(self.items.values()))
def __reversed__(self):
return (self[id] for id in reversed(self.items))
def __len__(self):
return len(self.items)
def __eq__(self, other):
if isinstance(other, ItemCollection):
return self.items == other.items
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, list(self.items.values()))
def ids(self):
return list(self.items.keys())
def iterids(self):
return iter(list(self.items.keys()))
def get(self, key, default=None):
return self.items.get(key, default)
def add(self, item):
self.items[item.id] = item
def remove(self, item):
del self.items[item.id]
class ContactList(ItemCollection):
pass
class ContactURIList(ItemCollection):
def __init__(self, items, default=None):
super(ContactURIList, self).__init__(items)
self.default = default
def __eq__(self, other):
if isinstance(other, ContactURIList):
return self.items == other.items and self.default == other.default
return NotImplemented
def __repr__(self):
return "%s(%r, default=%r)" % (self.__class__.__name__, list(self.items.values()), self.default)
class Group(object):
def __init__(self, id, name, contacts, **attributes):
self.id = id
self.name = name
self.contacts = contacts
self.attributes = attributes
def __eq__(self, other):
if isinstance(other, Group):
return self is other or (self.id == other.id and self.name == other.name and self.contacts.ids() == other.contacts.ids() and self.attributes == other.attributes)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def __setattr__(self, name, value):
if name == 'contacts' and not isinstance(value, ContactList):
value = ContactList(value)
object.__setattr__(self, name, value)
class ContactURI(object):
def __init__(self, id, uri, type, **attributes):
self.id = id
self.uri = uri
self.type = type
self.attributes = attributes
def __eq__(self, other):
if isinstance(other, ContactURI):
return self is other or (self.id == other.id and self.uri == other.uri and self.type == other.type and self.attributes == other.attributes)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
class EventHandling(object):
def __init__(self, policy, subscribe):
self.policy = policy
self.subscribe = subscribe
def __eq__(self, other):
if isinstance(other, EventHandling):
return self is other or (self.policy == other.policy and self.subscribe == other.subscribe)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.policy, self.subscribe)
class Contact(object):
def __init__(self, id, name, uris, presence_handling=None, dialog_handling=None, **attributes):
self.id = id
self.name = name
self.uris = uris
self.dialog = dialog_handling or EventHandling(policy='default', subscribe=False)
self.presence = presence_handling or EventHandling(policy='default', subscribe=False)
self.attributes = attributes
def __eq__(self, other):
if isinstance(other, Contact):
return self is other or (self.id == other.id and self.name == other.name and self.uris == other.uris and self.dialog == other.dialog and self.presence == other.presence and
self.attributes == other.attributes)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def __setattr__(self, name, value):
if name == 'uris' and not isinstance(value, ContactURIList):
value = ContactURIList(value)
object.__setattr__(self, name, value)
class Policy(object):
def __init__(self, id, uri, name, presence_handling=None, dialog_handling=None, **attributes):
self.id = id
self.uri = uri
self.name = name
self.dialog = dialog_handling or EventHandling(policy='default', subscribe=False)
self.presence = presence_handling or EventHandling(policy='default', subscribe=False)
self.attributes = attributes
def __eq__(self, other):
if isinstance(other, Policy):
return self is other or (self.id == other.id and self.uri == other.uri and self.name == other.name and self.dialog == other.dialog and self.presence == other.presence and
self.attributes == other.attributes)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
class Addressbook(object):
def __init__(self, contacts, groups, policies):
self.contacts = contacts
self.groups = groups
self.policies = policies
def __eq__(self, other):
if isinstance(other, Addressbook):
return self is other or (self.contacts == other.contacts and self.groups == other.groups and self.policies == other.policies)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
@classmethod
def from_payload(cls, payload):
def payload_to_contact(payload):
uris = ContactURIList((ContactURI(uri.id, uri.uri, uri.type, **(uri.attributes or {})) for uri in payload.uris), default=payload.uris.default)
presence_handling = EventHandling(payload.presence.policy.value, payload.presence.subscribe.value)
dialog_handling = EventHandling(payload.dialog.policy.value, payload.dialog.subscribe.value)
return Contact(payload.id, payload.name.value, uris, presence_handling, dialog_handling, **(payload.attributes or {}))
def payload_to_group(payload):
return Group(payload.id, payload.name.value, [contacts[contact_id] for contact_id in payload.contacts], **(payload.attributes or {}))
def payload_to_policy(payload):
presence_handling = EventHandling(payload.presence.policy.value, payload.presence.subscribe.value)
dialog_handling = EventHandling(payload.dialog.policy.value, payload.dialog.subscribe.value)
return Policy(payload.id, payload.uri, payload.name.value, presence_handling, dialog_handling, **(payload.attributes or {}))
contacts = ItemCollection(payload_to_contact(item) for item in payload[addressbook.Contact, IterateItems])
groups = ItemCollection(payload_to_group(item) for item in payload[addressbook.Group, IterateItems])
policies = ItemCollection(payload_to_policy(item) for item in payload[addressbook.Policy, IterateItems])
return cls(contacts, groups, policies)
class PresenceRules(object):
def __init__(self, default_policy):
self.default_policy = default_policy
def __eq__(self, other):
if isinstance(other, PresenceRules):
return self is other or (self.default_policy == other.default_policy)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
@classmethod
def from_payload(cls, default_rule):
default_policy = next(item for item in default_rule.actions if isinstance(item, presrules.SubHandling)).value
return cls(default_policy)
class DialogRules(object):
def __init__(self, default_policy):
self.default_policy = default_policy
def __eq__(self, other):
if isinstance(other, DialogRules):
return self is other or (self.default_policy == other.default_policy)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
@classmethod
def from_payload(cls, default_rule):
if default_rule is not None:
default_policy = next(item for item in default_rule.actions if isinstance(item, dialogrules.SubHandling)).value
else:
default_policy = None
return cls(default_policy)
class Icon(object):
__mimetypes__ = ('image/jpeg', 'image/png', 'image/gif')
def __init__(self, data, mime_type, description=None):
self.data = data
self.mime_type = mime_type
self.description = description
self.url = None
self.etag = None
def __eq__(self, other):
if isinstance(other, Icon):
return self is other or (self.data == other.data and self.mime_type == other.mime_type and self.description == other.description)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def __setattr__(self, name, value):
if name == 'mime_type' and value not in self.__mimetypes__:
raise ValueError("invalid mime type: '%s'. Should be one of: %s" % (value, ', '.join(self.__mimetypes__)))
object.__setattr__(self, name, value)
@classmethod
def from_payload(cls, payload):
try:
data = base64.decodestring(payload.data.value)
except Exception:
return None
else:
description = payload.description.value if payload.description else None
return cls(data, payload.mime_type.value, description)
class OfflineStatus(object):
__slots__ = ('pidf',)
def __init__(self, pidf):
self.pidf = pidf
def __setattr__(self, name, value):
if name == 'pidf' and not isinstance(value, pidf.PIDF):
raise ValueError("pidf must be a PIDF payload")
object.__setattr__(self, name, value)
def __eq__(self, other):
if isinstance(other, OfflineStatus):
return self is other or (self.pidf == other.pidf)
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def __getstate__(self):
return {'pidf': self.pidf.toxml()}
def __setstate__(self, state):
self.pidf = pidf.PIDFDocument.parse(state['pidf'])
class Operation(object):
__params__ = ()
def __init__(self, **params):
for name, value in list(params.items()):
setattr(self, name, value)
for param in set(self.__params__).difference(params):
raise ValueError("missing operation parameter: '%s'" % param)
self.applied = False
self.timestamp = datetime.utcnow()
class NormalizeOperation(Operation):
__params__ = ()
class AddContactOperation(Operation):
__params__ = ('contact',)
class UpdateContactOperation(Operation):
__params__ = ('contact', 'attributes')
class RemoveContactOperation(Operation):
__params__ = ('contact',)
class AddContactURIOperation(Operation):
__params__ = ('contact', 'uri')
class UpdateContactURIOperation(Operation):
__params__ = ('contact', 'uri', 'attributes')
class RemoveContactURIOperation(Operation):
__params__ = ('contact', 'uri')
class AddGroupOperation(Operation):
__params__ = ('group',)
class UpdateGroupOperation(Operation):
__params__ = ('group', 'attributes')
class RemoveGroupOperation(Operation):
__params__ = ('group',)
class AddGroupMemberOperation(Operation):
__params__ = ('group', 'contact')
class RemoveGroupMemberOperation(Operation):
__params__ = ('group', 'contact')
class AddPolicyOperation(Operation):
__params__ = ('policy',)
class UpdatePolicyOperation(Operation):
__params__ = ('policy', 'attributes')
class RemovePolicyOperation(Operation):
__params__ = ('policy',)
class SetDefaultPresencePolicyOperation(Operation):
__params__ = ('policy',)
class SetDefaultDialogPolicyOperation(Operation):
__params__ = ('policy',)
class SetStatusIconOperation(Operation):
__params__ = ('icon',)
class SetOfflineStatusOperation(Operation):
__params__ = ('status',)
class XCAPSubscriber(Subscriber):
__transports__ = frozenset(['tls', 'tcp'])
@property
def event(self):
return 'xcap-diff'
@property
def content(self):
rlist = resourcelists.List()
for document in (doc for doc in self.account.xcap_manager.documents if doc.supported):
rlist.add(resourcelists.Entry(document.relative_url))
return Content(resourcelists.ResourceLists([rlist]).toxml(), resourcelists.ResourceListsDocument.content_type)
@implementer(IObserver)
class XCAPManager(object):
def __init__(self, account):
from sipsimple.application import SIPApplication
if SIPApplication.storage is None:
raise RuntimeError("SIPApplication.storage must be defined before instantiating XCAPManager")
storage = SIPApplication.storage.xcap_storage_factory(account.id)
if not IXCAPStorage.providedBy(storage):
raise TypeError("storage must implement the IXCAPStorage interface")
self.account = account
self.storage = storage
self.storage_factory = SIPApplication.storage.xcap_storage_factory
self.client = None
self.command_proc = None
self.command_channel = coros.queue()
self.last_fetch_time = datetime.fromtimestamp(0)
self.last_update_time = datetime.fromtimestamp(0)
self.not_executed_fetch = None
self.state = 'stopped'
self.timer = None
self.transaction_level = 0
self.xcap_subscriber = None
self.server_caps = XCAPCapsDocument(self)
self.dialog_rules = DialogRulesDocument(self)
self.pidf_manipulation = PIDFManipulationDocument(self)
self.pres_rules = PresRulesDocument(self)
self.resource_lists = ResourceListsDocument(self)
self.rls_services = RLSServicesDocument(self)
self.status_icon = StatusIconDocument(self)
for document in self.documents:
document.load_from_cache()
try:
journal = self.storage.load('journal')
except XCAPStorageError:
self.journal = []
else:
try:
self.journal = pickle.loads(journal)
except Exception:
self.journal = []
for operation in self.journal:
operation.applied = False
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=account, name='CFGSettingsObjectDidChange')
notification_center.add_observer(self, sender=account, name='CFGSettingsObjectWasDeleted')
@property
def state(self):
return self.__dict__['state']
@state.setter
def state(self, value):
old_value = self.__dict__.get('state', Null)
self.__dict__['state'] = value
if old_value != value and old_value is not Null:
notification_center = NotificationCenter()
notification_center.post_notification('XCAPManagerDidChangeState', sender=self, data=NotificationData(prev_state=old_value, state=value))
@property
def documents(self):
return [self.resource_lists, self.rls_services, self.pres_rules, self.dialog_rules, self.pidf_manipulation, self.status_icon]
@property
def document_names(self):
return [document.name for document in self.documents]
@property
def xcap_root(self):
return getattr(self.client, 'root', None)
@property
def rls_presence_uri(self):
return SIPAddress('%s+presence@%s' % (self.account.id.username, self.account.id.domain))
@property
def rls_dialog_uri(self):
return SIPAddress('%s+dialog@%s' % (self.account.id.username, self.account.id.domain))
@execute_once
def init(self):
"""
Initializes the XCAP manager before it can be started. Needs to be
called before any other method and in a green thread.
"""
self.command_proc = proc.spawn(self._run)
@run_in_green_thread
def start(self):
"""
Starts the XCAP manager. This method needs to be called in a green
thread.
"""
command = Command('start')
self.command_channel.send(command)
command.wait()
@run_in_green_thread
def stop(self):
"""
Stops the XCAP manager. This method blocks until all the operations are
stopped and needs to be called in a green thread.
"""
command = Command('stop')
self.command_channel.send(command)
command.wait()
def transaction(self):
return XCAPTransaction(self)
@run_in_twisted_thread
def start_transaction(self):
self.transaction_level += 1
@run_in_twisted_thread
def commit_transaction(self):
if self.transaction_level == 0:
return
self.transaction_level -= 1
if self.transaction_level == 0 and self.journal:
self._save_journal()
self.command_channel.send(Command('update'))
def add_contact(self, contact):
self._schedule_operation(AddContactOperation(contact=contact))
def update_contact(self, contact, attributes):
self._schedule_operation(UpdateContactOperation(contact=contact, attributes=attributes))
def remove_contact(self, contact):
self._schedule_operation(RemoveContactOperation(contact=contact))
def add_contact_uri(self, contact, uri):
self._schedule_operation(AddContactURIOperation(contact=contact, uri=uri))
def update_contact_uri(self, contact, uri, attributes):
self._schedule_operation(UpdateContactURIOperation(contact=contact, uri=uri, attributes=attributes))
def remove_contact_uri(self, contact, uri):
self._schedule_operation(RemoveContactURIOperation(contact=contact, uri=uri))
def add_group(self, group):
self._schedule_operation(AddGroupOperation(group=group))
def update_group(self, group, attributes):
self._schedule_operation(UpdateGroupOperation(group=group, attributes=attributes))
def remove_group(self, group):
self._schedule_operation(RemoveGroupOperation(group=group))
def add_group_member(self, group, contact):
self._schedule_operation(AddGroupMemberOperation(group=group, contact=contact))
def remove_group_member(self, group, contact):
self._schedule_operation(RemoveGroupMemberOperation(group=group, contact=contact))
def add_policy(self, policy):
self._schedule_operation(AddPolicyOperation(policy=policy))
def update_policy(self, policy, attributes):
self._schedule_operation(UpdatePolicyOperation(policy=policy, attributes=attributes))
def remove_policy(self, policy):
self._schedule_operation(RemovePolicyOperation(policy=policy))
def set_default_presence_policy(self, policy):
self._schedule_operation(SetDefaultPresencePolicyOperation(policy=presrules.SubHandlingValue(policy)))
def set_default_dialog_policy(self, policy):
self._schedule_operation(SetDefaultDialogPolicyOperation(policy=dialogrules.SubHandlingValue(policy)))
def set_status_icon(self, icon):
self._schedule_operation(SetStatusIconOperation(icon=icon))
def set_offline_status(self, status):
self._schedule_operation(SetOfflineStatusOperation(status=status))
@run_in_twisted_thread
def _schedule_operation(self, operation):
self.journal.append(operation)
if self.transaction_level == 0:
self._save_journal()
self.command_channel.send(Command('update'))
def _run(self):
while True:
command = self.command_channel.wait()
try:
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
except:
self.command_proc = None
raise
# Command handlers
#
def _CH_start(self, command):
if self.state != 'stopped':
command.signal()
return
self.state = 'initializing'
self.xcap_subscriber = XCAPSubscriber(self.account)
notification_center = NotificationCenter()
notification_center.post_notification('XCAPManagerWillStart', sender=self)
notification_center.add_observer(self, sender=self.xcap_subscriber)
notification_center.add_observer(self, sender=SIPSimpleSettings(), name='CFGSettingsObjectDidChange')
self.xcap_subscriber.start()
self.command_channel.send(Command('initialize'))
notification_center.post_notification('XCAPManagerDidStart', sender=self)
command.signal()
def _CH_stop(self, command):
if self.state in ('stopped', 'terminated'):
command.signal()
return
notification_center = NotificationCenter()
notification_center.post_notification('XCAPManagerWillEnd', sender=self)
notification_center.remove_observer(self, sender=self.xcap_subscriber)
notification_center.remove_observer(self, sender=SIPSimpleSettings(), name='CFGSettingsObjectDidChange')
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.timer = None
self.xcap_subscriber.stop()
self.xcap_subscriber = None
self.client = None
self.state = 'stopped'
self._save_journal()
notification_center.post_notification('XCAPManagerDidEnd', sender=self)
command.signal()
def _CH_cleanup(self, command):
if self.state != 'stopped':
command.signal()
return
try:
self.storage.purge()
except XCAPStorageError:
pass
self.journal = []
self.state = 'terminated'
command.signal()
raise proc.ProcExit
def _CH_initialize(self, command):
self.state = 'initializing'
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.timer = None
if self.account.xcap.xcap_root:
self.client = XCAPClient(self.account.xcap.xcap_root, self.account.id, password=self.account.auth.password)
else:
try:
lookup = DNSLookup()
xcap_root = random.choice(lookup.lookup_xcap_server(self.account.uri).wait())
except DNSLookupError:
self.timer = self._schedule_command(60, Command('initialize', command.event))
return
else:
self.client = XCAPClient(xcap_root, self.account.id, password=self.account.auth.password)
try:
self.server_caps.fetch()
except XCAPError:
self.timer = self._schedule_command(60, Command('initialize', command.event))
return
else:
if self.server_caps.content is None:
# XCAP server must always return some content for xcap-caps
self.timer = self._schedule_command(60, Command('initialize', command.event))
return
if not set(self.server_caps.content.auids).issuperset(('resource-lists', 'rls-services', 'org.openmobilealliance.pres-rules')):
# Server must support at least resource-lists, rls-services and org.openmobilealliance.pres-rules
self.timer = self._schedule_command(3600, Command('initialize', command.event))
return
self.server_caps.initialize()
for document in self.documents:
document.initialize(self.server_caps.content)
notification_center = NotificationCenter()
notification_center.post_notification('XCAPManagerDidDiscoverServerCapabilities', sender=self, data=NotificationData(auids=self.server_caps.content.auids))
self.state = 'fetching'
self.command_channel.send(Command('fetch', documents=set(self.document_names)))
self.xcap_subscriber.activate()
def _CH_reload(self, command):
if self.state == 'terminated':
command.signal()
return
if '__id__' in command.modified:
try:
self.storage.purge()
except XCAPStorageError:
pass
self.storage = self.storage_factory(self.account.id)
self.journal = []
self._save_journal()
if {'__id__', 'xcap.xcap_root'}.intersection(command.modified):
for document in self.documents:
document.reset()
if self.state == 'stopped':
command.signal()
return
if {'__id__', 'auth.username', 'auth.password', 'xcap.xcap_root'}.intersection(command.modified):
self.state = 'initializing'
self.command_channel.send(Command('initialize'))
else:
self.xcap_subscriber.resubscribe()
command.signal()
def _CH_fetch(self, command):
if self.state not in ('insync', 'fetching'):
if self.not_executed_fetch is not None:
command.documents.update(self.not_executed_fetch.documents)
self.not_executed_fetch = command
return
if self.not_executed_fetch is not None:
command.documents.update(self.not_executed_fetch.documents)
self.not_executed_fetch = None
self.state = 'fetching'
if self.timer is not None and self.timer.active():
command.documents.update(self.timer.command.documents)
self.timer.cancel()
self.timer = None
try:
self._fetch_documents(command.documents)
except XCAPError:
self.timer = self._schedule_command(60, Command('fetch', command.event, documents=command.documents))
return
if not self.journal and self.last_fetch_time > datetime.fromtimestamp(0) and all(doc.fetch_time < command.timestamp for doc in self.documents):
self.last_fetch_time = datetime.utcnow()
self.state = 'insync'
return
else:
self.last_fetch_time = datetime.utcnow()
self.state = 'updating'
if not self.journal or type(self.journal[0]) is not NormalizeOperation:
self.journal.insert(0, NormalizeOperation())
self.command_channel.send(Command('update', command.event))
def _CH_update(self, command):
if self.state not in ('insync', 'updating'):
return
if self.transaction_level != 0:
return
self.state = 'updating'
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.timer = None
journal = self.journal[:]
for operation in (operation for operation in journal if not operation.applied):
handler = getattr(self, '_OH_%s' % operation.__class__.__name__)
try:
handler(operation)
except Exception:
# Error while applying operation, needs to be logged -Luci
log.exception()
operation.applied = True
api.sleep(0) # Operations are quite CPU intensive
try:
for document in (doc for doc in self.documents if doc.dirty and doc.supported):
document.update()
except FetchRequiredError:
for document in (doc for doc in self.documents if doc.dirty and doc.supported):
document.reset()
for operation in journal:
operation.applied = False
self.state = 'fetching'
self.command_channel.send(Command('fetch', documents=set(self.document_names))) # Try to fetch them all just in case
except XCAPError:
self.timer = self._schedule_command(60, Command('update'))
else:
del self.journal[:len(journal)]
if not self.journal:
self.state = 'insync'
if any(max(doc.update_time, doc.fetch_time) > self.last_update_time for doc in self.documents):
self._load_data()
self.last_update_time = datetime.utcnow()
command.signal()
if self.not_executed_fetch is not None:
self.command_channel.send(self.not_executed_fetch)
self.not_executed_fetch = None
self._save_journal()
# Operation handlers
#
def _OH_NormalizeOperation(self, operation):
# Normalize resource-lists
#
if self.resource_lists.content is None:
self.resource_lists.content = resourcelists.ResourceLists()
resource_lists = self.resource_lists.content
try:
oma_buddylist = resource_lists['oma_buddylist']
except KeyError:
oma_buddylist = resourcelists.List(name='oma_buddylist')
resource_lists.add(oma_buddylist)
try:
oma_grantedcontacts = resource_lists['oma_grantedcontacts']
except KeyError:
oma_grantedcontacts = resourcelists.List(name='oma_grantedcontacts')
resource_lists.add(oma_grantedcontacts)
try:
oma_blockedcontacts = resource_lists['oma_blockedcontacts']
except KeyError:
oma_blockedcontacts = resourcelists.List(name='oma_blockedcontacts')
resource_lists.add(oma_blockedcontacts)
try:
oma_allcontacts = resource_lists['oma_allcontacts']
except KeyError:
oma_allcontacts = resourcelists.List(name='oma_allcontacts')
oma_allcontacts.add(resourcelists.External(self.resource_lists.url + '/~~' + resource_lists.get_xpath(oma_buddylist)))
oma_allcontacts.add(resourcelists.External(self.resource_lists.url + '/~~' + resource_lists.get_xpath(oma_grantedcontacts)))
oma_allcontacts.add(resourcelists.External(self.resource_lists.url + '/~~' + resource_lists.get_xpath(oma_blockedcontacts)))
resource_lists.add(oma_allcontacts)
try:
dialog_grantedcontacts = resource_lists['dialog_grantedcontacts']
except KeyError:
dialog_grantedcontacts = resourcelists.List(name='dialog_grantedcontacts')
resource_lists.add(dialog_grantedcontacts)
try:
dialog_blockedcontacts = resource_lists['dialog_blockedcontacts']
except KeyError:
dialog_blockedcontacts = resourcelists.List(name='dialog_blockedcontacts')
resource_lists.add(dialog_blockedcontacts)
try:
sipsimple_presence_rls = resource_lists['sipsimple_presence_rls']
except KeyError:
sipsimple_presence_rls = resourcelists.List(name='sipsimple_presence_rls')
resource_lists.add(sipsimple_presence_rls)
try:
sipsimple_dialog_rls = resource_lists['sipsimple_dialog_rls']
except KeyError:
sipsimple_dialog_rls = resourcelists.List(name='sipsimple_dialog_rls')
resource_lists.add(sipsimple_dialog_rls)
try:
sipsimple_addressbook = resource_lists['sipsimple_addressbook']
except KeyError:
sipsimple_addressbook = resourcelists.List(name='sipsimple_addressbook')
resource_lists.add(sipsimple_addressbook)
for cls in (cls for cls in sipsimple_addressbook[IterateTypes] if cls not in (addressbook.Contact, addressbook.Group, addressbook.Policy)):
del sipsimple_addressbook[cls, All]
for cls in (cls for cls in oma_grantedcontacts[IterateTypes] if cls is not resourcelists.Entry):
del oma_grantedcontacts[cls, All]
for cls in (cls for cls in oma_blockedcontacts[IterateTypes] if cls is not resourcelists.Entry):
del oma_blockedcontacts[cls, All]
for cls in (cls for cls in dialog_grantedcontacts[IterateTypes] if cls is not resourcelists.Entry):
del dialog_grantedcontacts[cls, All]
for cls in (cls for cls in dialog_blockedcontacts[IterateTypes] if cls is not resourcelists.Entry):
del dialog_blockedcontacts[cls, All]
for cls in (cls for cls in sipsimple_presence_rls[IterateTypes] if cls is not resourcelists.Entry):
del sipsimple_presence_rls[cls, All]
for cls in (cls for cls in sipsimple_dialog_rls[IterateTypes] if cls is not resourcelists.Entry):
del sipsimple_dialog_rls[cls, All]
groups = ItemCollection(sipsimple_addressbook[addressbook.Group, IterateItems])
contacts = ItemCollection(sipsimple_addressbook[addressbook.Contact, IterateItems])
policies = ItemCollection(sipsimple_addressbook[addressbook.Policy, IterateItems])
for group, missing_id in [(group, missing_id) for group in groups for missing_id in (id for id in group.contacts if id not in contacts)]:
group.contacts.remove(missing_id)
all_contact_uris = set(uri.uri for contact in contacts for uri in contact.uris)
contact_allow_presence_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.presence.policy=='allow')
contact_block_presence_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.presence.policy=='block')
contact_allow_dialog_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.dialog.policy=='allow')
contact_block_dialog_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.dialog.policy=='block')
contact_subscribe_presence_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.presence.subscribe==True)
contact_subscribe_dialog_uris = set(uri.uri for contact in contacts for uri in contact.uris if contact.dialog.subscribe==True)
policy_allow_presence_uris = set(policy.uri for policy in policies if policy.presence.policy=='allow')
policy_block_presence_uris = set(policy.uri for policy in policies if policy.presence.policy=='block')
policy_allow_dialog_uris = set(policy.uri for policy in policies if policy.dialog.policy=='allow')
policy_block_dialog_uris = set(policy.uri for policy in policies if policy.dialog.policy=='block')
policy_subscribe_presence_uris = set(policy.uri for policy in policies if policy.presence.subscribe==True)
policy_subscribe_dialog_uris = set(policy.uri for policy in policies if policy.dialog.subscribe==True)
allowed_presence_uris = contact_allow_presence_uris - contact_block_presence_uris | policy_allow_presence_uris - policy_block_presence_uris - all_contact_uris
blocked_presence_uris = contact_block_presence_uris | policy_block_presence_uris - all_contact_uris
allowed_dialog_uris = contact_allow_dialog_uris - contact_block_dialog_uris | policy_allow_dialog_uris - policy_block_dialog_uris - all_contact_uris
blocked_dialog_uris = contact_block_dialog_uris | policy_block_dialog_uris - all_contact_uris
subscribe_presence_uris = contact_subscribe_presence_uris | policy_subscribe_presence_uris - all_contact_uris
subscribe_dialog_uris = contact_subscribe_dialog_uris | policy_subscribe_dialog_uris - all_contact_uris
if allowed_presence_uris != set(entry.uri for entry in oma_grantedcontacts):
oma_grantedcontacts.clear()
oma_grantedcontacts.update(resourcelists.Entry(uri) for uri in allowed_presence_uris)
if blocked_presence_uris != set(entry.uri for entry in oma_blockedcontacts):
oma_blockedcontacts.clear()
oma_blockedcontacts.update(resourcelists.Entry(uri) for uri in blocked_presence_uris)
if allowed_dialog_uris != set(entry.uri for entry in dialog_grantedcontacts):
dialog_grantedcontacts.clear()
dialog_grantedcontacts.update(resourcelists.Entry(uri) for uri in allowed_dialog_uris)
if blocked_dialog_uris != set(entry.uri for entry in dialog_blockedcontacts):
dialog_blockedcontacts.clear()
dialog_blockedcontacts.update(resourcelists.Entry(uri) for uri in blocked_dialog_uris)
if subscribe_presence_uris != set(entry.uri for entry in sipsimple_presence_rls):
sipsimple_presence_rls.clear()
sipsimple_presence_rls.update(resourcelists.Entry(uri) for uri in subscribe_presence_uris)
if subscribe_dialog_uris != set(entry.uri for entry in sipsimple_dialog_rls):
sipsimple_dialog_rls.clear()
sipsimple_dialog_rls.update(resourcelists.Entry(uri) for uri in subscribe_dialog_uris)
# Normalize rls-services
#
if self.rls_services.content is None:
self.rls_services.content = rlsservices.RLSServices()
rls_services = self.rls_services.content
rls_presence_uri = 'sip:' + self.rls_presence_uri
rls_dialog_uri = 'sip:' + self.rls_dialog_uri
rls_presence_list = rlsservices.ResourceList(self.resource_lists.url + '/~~' + resource_lists.get_xpath(sipsimple_presence_rls))
rls_dialog_list = rlsservices.ResourceList(self.resource_lists.url + '/~~' + resource_lists.get_xpath(sipsimple_dialog_rls))
try:
rls_presence_service = rls_services[rls_presence_uri]
except KeyError:
rls_presence_service = rlsservices.Service(rls_presence_uri, list=rls_presence_list, packages=['presence'])
rls_services.add(rls_presence_service)
else:
if rls_presence_service.list != rls_presence_list:
rls_presence_service.list = rls_presence_list
if list(rls_presence_service.packages) != ['presence']:
rls_presence_service.packages = ['presence']
try:
rls_dialog_service = rls_services[rls_dialog_uri]
except KeyError:
rls_dialog_service = rlsservices.Service(rls_dialog_uri, list=rls_dialog_list, packages=['dialog'])
rls_services.add(rls_dialog_service)
else:
if rls_dialog_service.list != rls_dialog_list:
rls_dialog_service.list = rls_dialog_list
if list(rls_dialog_service.packages) != ['dialog']:
rls_dialog_service.packages = ['dialog']
# Normalize pres-rules
#
if self.pres_rules.content is None:
self.pres_rules.content = presrules.PresRules()
def fix_subhandling(rule, valid_values=[]):
subhandling_elements = sorted((item for item in rule.actions if isinstance(item, presrules.SubHandling)), key=attrgetter('value.priority'))
if not subhandling_elements:
subhandling_elements = [presrules.SubHandling('block')] # spec specifies that missing SubHandling means block
rule.actions.update(subhandling_elements)
subhandling = subhandling_elements.pop()
for item in subhandling_elements: # remove any extraneous SubHandling elements
rule.actions.remove(item)
if subhandling.value not in valid_values:
subhandling.value = valid_values[0]
pres_rules = self.pres_rules.content
oma_grantedcontacts_ref = omapolicy.ExternalList([self.resource_lists.url + '/~~' + resource_lists.get_xpath(oma_grantedcontacts)])
oma_blockedcontacts_ref = omapolicy.ExternalList([self.resource_lists.url + '/~~' + resource_lists.get_xpath(oma_blockedcontacts)])
try:
wp_prs_grantedcontacts = pres_rules['wp_prs_grantedcontacts']
except KeyError:
wp_prs_grantedcontacts = commonpolicy.Rule('wp_prs_grantedcontacts', conditions=[oma_grantedcontacts_ref], actions=[presrules.SubHandling('allow')])
pres_rules.add(wp_prs_grantedcontacts)
else:
fix_subhandling(wp_prs_grantedcontacts, valid_values=['allow'])
if list(wp_prs_grantedcontacts.conditions) != [oma_grantedcontacts_ref]:
wp_prs_grantedcontacts.conditions = [oma_grantedcontacts_ref]
if wp_prs_grantedcontacts.transformations:
wp_prs_grantedcontacts.transformations = None
try:
wp_prs_blockedcontacts = pres_rules['wp_prs_blockedcontacts']
except KeyError:
wp_prs_blockedcontacts = commonpolicy.Rule('wp_prs_blockedcontacts', conditions=[oma_blockedcontacts_ref], actions=[presrules.SubHandling('polite-block')])
pres_rules.add(wp_prs_blockedcontacts)
else:
fix_subhandling(wp_prs_blockedcontacts, valid_values=['polite-block'])
if list(wp_prs_blockedcontacts.conditions) != [oma_blockedcontacts_ref]:
wp_prs_blockedcontacts.conditions = [oma_blockedcontacts_ref]
if wp_prs_blockedcontacts.transformations:
wp_prs_blockedcontacts.transformations = None
wp_prs_unlisted = pres_rules.get('wp_prs_unlisted', None)
wp_prs_allow_unlisted = pres_rules.get('wp_prs_allow_unlisted', None)
if wp_prs_unlisted is not None and wp_prs_allow_unlisted is not None:
pres_rules.remove(wp_prs_allow_unlisted)
wp_prs_allow_unlisted = None
wp_prs_unlisted_rule = wp_prs_unlisted or wp_prs_allow_unlisted
if wp_prs_unlisted_rule is None:
wp_prs_unlisted = commonpolicy.Rule('wp_prs_unlisted', conditions=[omapolicy.OtherIdentity()], actions=[presrules.SubHandling('confirm')])
pres_rules.add(wp_prs_unlisted)
wp_prs_unlisted_rule = wp_prs_unlisted
else:
if wp_prs_unlisted_rule is wp_prs_unlisted:
fix_subhandling(wp_prs_unlisted_rule, valid_values=['confirm', 'block', 'polite-block'])
else:
fix_subhandling(wp_prs_unlisted_rule, valid_values=['allow'])
if list(wp_prs_unlisted_rule.conditions) != [omapolicy.OtherIdentity()]:
wp_prs_unlisted_rule.conditions = [omapolicy.OtherIdentity()]
if wp_prs_unlisted_rule.transformations:
wp_prs_unlisted_rule.transformations = None
match_anonymous = omapolicy.AnonymousRequest()
try:
wp_prs_block_anonymous = pres_rules['wp_prs_block_anonymous']
except KeyError:
wp_prs_block_anonymous = commonpolicy.Rule('wp_prs_block_anonymous', conditions=[match_anonymous], actions=[presrules.SubHandling('block')])
pres_rules.add(wp_prs_block_anonymous)
else:
fix_subhandling(wp_prs_block_anonymous, valid_values=['block', 'polite-block'])
if list(wp_prs_block_anonymous.conditions) != [match_anonymous]:
wp_prs_block_anonymous.conditions = [match_anonymous]
if wp_prs_block_anonymous.transformations:
wp_prs_block_anonymous.transformations = None
match_self = commonpolicy.Identity([commonpolicy.IdentityOne('sip:' + self.account.id)])
try:
wp_prs_allow_own = pres_rules['wp_prs_allow_own']
except KeyError:
wp_prs_allow_own = commonpolicy.Rule('wp_prs_allow_own', conditions=[match_self], actions=[presrules.SubHandling('allow')])
pres_rules.add(wp_prs_allow_own)
else:
fix_subhandling(wp_prs_allow_own, valid_values=['allow'])
if list(wp_prs_allow_own.conditions) != [match_self]:
wp_prs_allow_own.conditions = [match_self]
if wp_prs_allow_own.transformations:
wp_prs_allow_own.transformations = None
# Remove any other rules
all_rule_names = set(pres_rules[IterateIDs])
known_rule_names = {'wp_prs_grantedcontacts', 'wp_prs_blockedcontacts', 'wp_prs_unlisted', 'wp_prs_allow_unlisted', 'wp_prs_block_anonymous', 'wp_prs_allow_own'}
for name in all_rule_names - known_rule_names:
del pres_rules[name]
del fix_subhandling
# Normalize dialog-rules
#
if self.dialog_rules.supported:
if self.dialog_rules.content is None:
self.dialog_rules.content = dialogrules.DialogRules()
elif self.dialog_rules.content.element.nsmap.get('dr') != dialogrules.namespace: # TODO: this elif branch should be removed in a later version as it is
self.dialog_rules.content = dialogrules.DialogRules() # only used to discard documents created with the old namespace. -Dan
def fix_subhandling(rule, valid_values=()):
subhandling_elements = sorted((item for item in rule.actions if isinstance(item, dialogrules.SubHandling)), key=attrgetter('value.priority'))
if not subhandling_elements:
subhandling_elements = [dialogrules.SubHandling('block')] # spec specifies that missing SubHandling means block
rule.actions.update(subhandling_elements)
subhandling = subhandling_elements.pop()
for item in subhandling_elements: # remove any extraneous SubHandling elements
rule.actions.remove(item)
if subhandling.value not in valid_values:
subhandling.value = valid_values[0]
dialog_rules = self.dialog_rules.content
dialog_grantedcontacts_ref = omapolicy.ExternalList([self.resource_lists.url + '/~~' + resource_lists.get_xpath(dialog_grantedcontacts)])
dialog_blockedcontacts_ref = omapolicy.ExternalList([self.resource_lists.url + '/~~' + resource_lists.get_xpath(dialog_blockedcontacts)])
try:
wp_dlg_grantedcontacts = dialog_rules['wp_dlg_grantedcontacts']
except KeyError:
wp_dlg_grantedcontacts = commonpolicy.Rule('wp_dlg_grantedcontacts', conditions=[dialog_grantedcontacts_ref], actions=[dialogrules.SubHandling('allow')])
dialog_rules.add(wp_dlg_grantedcontacts)
else:
fix_subhandling(wp_dlg_grantedcontacts, valid_values=['allow'])
if list(wp_dlg_grantedcontacts.conditions) != [dialog_grantedcontacts_ref]:
wp_dlg_grantedcontacts.conditions = [dialog_grantedcontacts_ref]
if wp_dlg_grantedcontacts.transformations:
wp_dlg_grantedcontacts.transformations = None
try:
wp_dlg_blockedcontacts = dialog_rules['wp_dlg_blockedcontacts']
except KeyError:
wp_dlg_blockedcontacts = commonpolicy.Rule('wp_dlg_blockedcontacts', conditions=[dialog_blockedcontacts_ref], actions=[dialogrules.SubHandling('polite-block')])
dialog_rules.add(wp_dlg_blockedcontacts)
else:
fix_subhandling(wp_dlg_blockedcontacts, valid_values=['polite-block'])
if list(wp_dlg_blockedcontacts.conditions) != [dialog_blockedcontacts_ref]:
wp_dlg_blockedcontacts.conditions = [dialog_blockedcontacts_ref]
if wp_dlg_blockedcontacts.transformations:
wp_dlg_blockedcontacts.transformations = None
wp_dlg_unlisted = dialog_rules.get('wp_dlg_unlisted', None)
wp_dlg_allow_unlisted = dialog_rules.get('wp_dlg_allow_unlisted', None)
if wp_dlg_unlisted is not None and wp_dlg_allow_unlisted is not None:
dialog_rules.remove(wp_dlg_allow_unlisted)
wp_dlg_allow_unlisted = None
wp_dlg_unlisted_rule = wp_dlg_unlisted or wp_dlg_allow_unlisted
if wp_dlg_unlisted_rule is None:
wp_dlg_unlisted = commonpolicy.Rule('wp_dlg_unlisted', conditions=[omapolicy.OtherIdentity()], actions=[dialogrules.SubHandling('confirm')])
dialog_rules.add(wp_dlg_unlisted)
wp_dlg_unlisted_rule = wp_dlg_unlisted
else:
if wp_dlg_unlisted_rule is wp_dlg_unlisted:
fix_subhandling(wp_dlg_unlisted_rule, valid_values=['confirm', 'block', 'polite-block'])
else:
fix_subhandling(wp_dlg_unlisted_rule, valid_values=['allow'])
if list(wp_dlg_unlisted_rule.conditions) != [omapolicy.OtherIdentity()]:
wp_dlg_unlisted_rule.conditions = [omapolicy.OtherIdentity()]
if wp_dlg_unlisted_rule.transformations:
wp_dlg_unlisted_rule.transformations = None
match_anonymous = omapolicy.AnonymousRequest()
try:
wp_dlg_block_anonymous = dialog_rules['wp_dlg_block_anonymous']
except KeyError:
wp_dlg_block_anonymous = commonpolicy.Rule('wp_dlg_block_anonymous', conditions=[match_anonymous], actions=[dialogrules.SubHandling('block')])
dialog_rules.add(wp_dlg_block_anonymous)
else:
fix_subhandling(wp_dlg_block_anonymous, valid_values=['block', 'polite-block'])
if list(wp_dlg_block_anonymous.conditions) != [match_anonymous]:
wp_dlg_block_anonymous.conditions = [match_anonymous]
if wp_dlg_block_anonymous.transformations:
wp_dlg_block_anonymous.transformations = None
match_self = commonpolicy.Identity([commonpolicy.IdentityOne('sip:' + self.account.id)])
try:
wp_dlg_allow_own = dialog_rules['wp_dlg_allow_own']
except KeyError:
wp_dlg_allow_own = commonpolicy.Rule('wp_dlg_allow_own', conditions=[match_self], actions=[dialogrules.SubHandling('allow')])
dialog_rules.add(wp_dlg_allow_own)
else:
fix_subhandling(wp_dlg_allow_own, valid_values=['allow'])
if list(wp_dlg_allow_own.conditions) != [match_self]:
wp_dlg_allow_own.conditions = [match_self]
if wp_dlg_allow_own.transformations:
wp_dlg_allow_own.transformations = None
# Remove any other rules
all_rule_names = set(dialog_rules[IterateIDs])
known_rule_names = {'wp_dlg_grantedcontacts', 'wp_dlg_blockedcontacts', 'wp_dlg_unlisted', 'wp_dlg_allow_unlisted', 'wp_dlg_block_anonymous', 'wp_dlg_allow_own'}
for name in all_rule_names - known_rule_names:
del dialog_rules[name]
# Normalize status icon
#
if self.status_icon.supported and self.status_icon.content is not None:
content = self.status_icon.content
if None in (content.encoding, content.mime_type) or content.encoding.value.lower() != 'base64' or content.mime_type.value.lower() not in Icon.__mimetypes__:
self.status_icon.content = None
self.status_icon.dirty = True
def _OH_AddContactOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
contact = operation.contact
presence_handling = addressbook.PresenceHandling(contact.presence.policy, contact.presence.subscribe)
dialog_handling = addressbook.DialogHandling(contact.dialog.policy, contact.dialog.subscribe)
xml_contact = addressbook.Contact(contact.id, contact.name, presence_handling=presence_handling, dialog_handling=dialog_handling)
for uri in contact.uris:
contact_uri = addressbook.ContactURI(uri.id, uri.uri, uri.type)
contact_uri.attributes = addressbook.ContactURI.attributes.type(uri.attributes)
xml_contact.uris.add(contact_uri)
xml_contact.uris.default = contact.uris.default
xml_contact.attributes = addressbook.Contact.attributes.type(contact.attributes)
sipsimple_addressbook.add(xml_contact)
def _OH_UpdateContactOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
contact = sipsimple_addressbook[addressbook.Contact, operation.contact.id]
except KeyError:
return
attributes = dict(operation.attributes)
attributes.pop('id', None) # id is never modified
attributes.pop('uris', None) # uris are modified using dedicated methods
if 'name' in attributes:
contact.name = attributes.pop('name')
if 'uris.default' in attributes:
contact.uris.default = attributes.pop('uris.default')
if 'presence.policy' in attributes:
contact.presence.policy = attributes.pop('presence.policy')
if 'presence.subscribe' in attributes:
contact.presence.subscribe = attributes.pop('presence.subscribe')
if 'dialog.policy' in attributes:
contact.dialog.policy = attributes.pop('dialog.policy')
if 'dialog.subscribe' in attributes:
contact.dialog.subscribe = attributes.pop('dialog.subscribe')
if contact.attributes is None:
contact.attributes = addressbook.Contact.attributes.type()
contact.attributes.update(attributes)
def _OH_RemoveContactOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
for group in (group for group in sipsimple_addressbook[addressbook.Group, IterateItems] if operation.contact.id in group.contacts):
group.contacts.remove(operation.contact.id)
try:
del sipsimple_addressbook[addressbook.Contact, operation.contact.id]
except KeyError:
pass
def _OH_AddContactURIOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
contact = sipsimple_addressbook[addressbook.Contact, operation.contact.id]
except KeyError:
return
uri = addressbook.ContactURI(operation.uri.id, operation.uri.uri, operation.uri.type)
uri.attributes = addressbook.ContactURI.attributes.type(operation.uri.attributes)
contact.uris.add(uri)
def _OH_UpdateContactURIOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
contact = sipsimple_addressbook[addressbook.Contact, operation.contact.id]
uri = contact.uris[operation.uri.id]
except KeyError:
return
attributes = dict(operation.attributes)
attributes.pop('id', None) # id is never modified
if 'uri' in attributes:
uri.uri = attributes.pop('uri')
if 'type' in attributes:
uri.type = attributes.pop('type')
if uri.attributes is None:
uri.attributes = addressbook.ContactURI.attributes.type()
uri.attributes.update(attributes)
def _OH_RemoveContactURIOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
contact = sipsimple_addressbook[addressbook.Contact, operation.contact.id]
del contact.uris[operation.uri.id]
except KeyError:
pass
def _OH_AddGroupOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
group = addressbook.Group(operation.group.id, operation.group.name, [contact.id for contact in operation.group.contacts])
group.attributes = addressbook.Group.attributes.type(operation.group.attributes)
sipsimple_addressbook.add(group)
def _OH_UpdateGroupOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
group = sipsimple_addressbook[addressbook.Group, operation.group.id]
except KeyError:
return
attributes = dict(operation.attributes)
attributes.pop('id', None) # id is never modified
attributes.pop('contacts', None) # contacts are added/removed using dedicated methods
if 'name' in attributes:
group.name = attributes.pop('name')
if group.attributes is None:
group.attributes = addressbook.Group.attributes.type()
group.attributes.update(attributes)
def _OH_RemoveGroupOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
del sipsimple_addressbook[addressbook.Group, operation.group.id]
except KeyError:
pass
def _OH_AddGroupMemberOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
group = sipsimple_addressbook[addressbook.Group, operation.group.id]
except KeyError:
return
if operation.contact.id in group.contacts:
return
group.contacts.add(operation.contact.id)
def _OH_RemoveGroupMemberOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
group = sipsimple_addressbook[addressbook.Group, operation.group.id]
group.contacts.remove(operation.contact.id)
except KeyError:
return
def _OH_AddPolicyOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
presence_handling = addressbook.PresenceHandling(operation.policy.presence.policy, operation.policy.presence.subscribe)
dialog_handling = addressbook.DialogHandling(operation.policy.dialog.policy, operation.policy.dialog.subscribe)
policy = addressbook.Policy(operation.policy.id, operation.policy.uri, operation.policy.name, presence_handling=presence_handling, dialog_handling=dialog_handling)
policy.attributes = addressbook.Policy.attributes.type(operation.policy.attributes)
sipsimple_addressbook.add(policy)
def _OH_UpdatePolicyOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
policy = sipsimple_addressbook[addressbook.Policy, operation.policy.id]
except KeyError:
return
attributes = dict(operation.attributes)
attributes.pop('id', None) # id is never modified
if 'uri' in attributes:
policy.uri = attributes.pop('uri')
if 'name' in attributes:
policy.name = attributes.pop('name')
if 'presence.policy' in attributes:
policy.presence.policy = attributes.pop('presence.policy')
if 'presence.subscribe' in attributes:
policy.presence.subscribe = attributes.pop('presence.subscribe')
if 'dialog.policy' in attributes:
policy.dialog.policy = attributes.pop('dialog.policy')
if 'dialog.subscribe' in attributes:
policy.dialog.subscribe = attributes.pop('dialog.subscribe')
if policy.attributes is None:
policy.attributes = addressbook.Policy.attributes.type()
policy.attributes.update(attributes)
def _OH_RemovePolicyOperation(self, operation):
sipsimple_addressbook = self.resource_lists.content['sipsimple_addressbook']
try:
del sipsimple_addressbook[addressbook.Policy, operation.policy.id]
except KeyError:
pass
def _OH_SetStatusIconOperation(self, operation):
if not self.status_icon.supported:
return
icon = operation.icon
if icon is None or not icon.data:
self.status_icon.dirty = self.status_icon.content is not None
self.status_icon.content = None
else:
content = prescontent.PresenceContent(data=base64.encodestring(icon.data), mime_type=icon.mime_type, encoding='base64', description=icon.description)
if self.status_icon.content == content:
return
self.status_icon.content = content
def _OH_SetOfflineStatusOperation(self, operation):
pidf = operation.status.pidf if operation.status is not None else None
if not self.pidf_manipulation.supported or pidf == self.pidf_manipulation.content:
return
self.pidf_manipulation.content = pidf
self.pidf_manipulation.dirty = True
def _OH_SetDefaultPresencePolicyOperation(self, operation):
pres_rules = self.pres_rules.content
if operation.policy == 'allow':
rule_id, other_rule_id = 'wp_prs_allow_unlisted', 'wp_prs_unlisted'
else:
rule_id, other_rule_id = 'wp_prs_unlisted', 'wp_prs_allow_unlisted'
try:
del pres_rules[other_rule_id]
except KeyError:
rule = pres_rules[rule_id]
subhandling = next(item for item in rule.actions if isinstance(item, presrules.SubHandling))
subhandling.value = operation.policy
else:
rule = commonpolicy.Rule(rule_id, conditions=[omapolicy.OtherIdentity()], actions=[presrules.SubHandling(operation.policy)])
pres_rules.add(rule)
def _OH_SetDefaultDialogPolicyOperation(self, operation):
if not self.dialog_rules.supported:
return
dialog_rules = self.dialog_rules.content
if operation.policy == 'allow':
rule_id, other_rule_id = 'wp_dlg_allow_unlisted', 'wp_dlg_unlisted'
else:
rule_id, other_rule_id = 'wp_dlg_unlisted', 'wp_dlg_allow_unlisted'
try:
del dialog_rules[other_rule_id]
except KeyError:
rule = dialog_rules[rule_id]
subhandling = next(item for item in rule.actions if isinstance(item, dialogrules.SubHandling))
subhandling.value = operation.policy
else:
rule = commonpolicy.Rule(rule_id, conditions=[omapolicy.OtherIdentity()], actions=[dialogrules.SubHandling(operation.policy)])
dialog_rules.add(rule)
# Notification handlers
#
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if {'__id__', 'xcap.xcap_root', 'auth.username', 'auth.password', 'sip.subscribe_interval', 'sip.transport_list'}.intersection(notification.data.modified):
self.command_channel.send(Command('reload', modified=notification.data.modified))
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
if self.account.enabled and 'xcap.enabled' in notification.data.modified:
if self.account.xcap.enabled:
self.start()
else:
self.stop()
def _NH_CFGSettingsObjectWasDeleted(self, notification):
notification.center.remove_observer(self, sender=self.account, name='CFGSettingsObjectDidChange')
notification.center.remove_observer(self, sender=self.account, name='CFGSettingsObjectWasDeleted')
self.command_channel.send(Command('stop'))
self.command_channel.send(Command('cleanup'))
def _NH_XCAPSubscriptionDidStart(self, notification):
self.command_channel.send(Command('fetch', documents=set(self.document_names)))
def _NH_XCAPSubscriptionDidFail(self, notification):
self.command_channel.send(Command('fetch', documents=set(self.document_names)))
def _NH_XCAPSubscriptionGotNotify(self, notification):
if notification.data.content_type == xcapdiff.XCAPDiffDocument.content_type:
try:
xcap_diff = xcapdiff.XCAPDiffDocument.parse(notification.data.body)
except ParserError:
self.command_channel.send(Command('fetch', documents=set(self.document_names)))
else:
applications = set(child.selector.auid for child in xcap_diff if isinstance(child, xcapdiff.Document))
documents = set(document.name for document in self.documents if document.application in applications)
self.command_channel.send(Command('fetch', documents=documents))
def _load_data(self):
addressbook = Addressbook.from_payload(self.resource_lists.content['sipsimple_addressbook'])
default_presence_rule = self.pres_rules.content.get('wp_prs_unlisted', None) or self.pres_rules.content.get('wp_prs_allow_unlisted', None)
if self.dialog_rules.supported:
default_dialog_rule = self.dialog_rules.content.get('wp_dlg_unlisted', None) or self.dialog_rules.content.get('wp_dlg_allow_unlisted', None)
else:
default_dialog_rule = None
presence_rules = PresenceRules.from_payload(default_presence_rule)
dialog_rules = DialogRules.from_payload(default_dialog_rule)
if self.status_icon.supported and self.status_icon.content:
status_icon = Icon.from_payload(self.status_icon.content)
status_icon.url = self.status_icon.url
status_icon.etag = self.status_icon.etag
else:
status_icon = None
if self.pidf_manipulation.supported and self.pidf_manipulation.content:
offline_status = OfflineStatus(self.pidf_manipulation.content)
else:
offline_status = None
data=NotificationData(addressbook=addressbook, presence_rules=presence_rules, dialog_rules=dialog_rules, status_icon=status_icon, offline_status=offline_status)
NotificationCenter().post_notification('XCAPManagerDidReloadData', sender=self, data=data)
def _fetch_documents(self, documents):
workers = [Worker.spawn(document.fetch) for document in (doc for doc in self.documents if doc.name in documents and doc.supported)]
try:
while workers:
worker = workers.pop()
worker.wait()
finally:
for worker in workers:
worker.wait_ex()
def _save_journal(self):
try:
self.storage.save('journal', pickle.dumps(self.journal))
except XCAPStorageError:
pass
def _schedule_command(self, timeout, command):
from twisted.internet import reactor
timer = reactor.callLater(timeout, self.command_channel.send, command)
timer.command = command
return timer
class XCAPTransaction(object):
def __init__(self, xcap_manager):
self.xcap_manager = xcap_manager
def __enter__(self):
self.xcap_manager.start_transaction()
return self
def __exit__(self, type, value, traceback):
self.xcap_manager.commit_transaction()
diff --git a/sipsimple/payloads/__init__.py b/sipsimple/payloads/__init__.py
index 8039a111..5b944130 100644
--- a/sipsimple/payloads/__init__.py
+++ b/sipsimple/payloads/__init__.py
@@ -1,1204 +1,1201 @@
__all__ = ['ParserError',
'BuilderError',
'ValidationError',
'IterateTypes',
'IterateIDs',
'IterateItems',
'All',
'parse_qname',
'XMLDocument',
'XMLAttribute',
'XMLElementID',
'XMLElementChild',
'XMLElementChoiceChild',
'XMLStringChoiceChild',
'XMLElement',
'XMLRootElement',
'XMLSimpleElement',
'XMLStringElement',
'XMLLocalizedStringElement',
'XMLBooleanElement',
'XMLByteElement',
'XMLUnsignedByteElement',
'XMLShortElement',
'XMLUnsignedShortElement',
'XMLIntElement',
'XMLUnsignedIntElement',
'XMLLongElement',
'XMLUnsignedLongElement',
'XMLIntegerElement',
'XMLPositiveIntegerElement',
'XMLNegativeIntegerElement',
'XMLNonNegativeIntegerElement',
'XMLNonPositiveIntegerElement',
'XMLDecimalElement',
'XMLDateTimeElement',
'XMLAnyURIElement',
'XMLEmptyElement',
'XMLEmptyElementRegistryType',
'XMLListElement',
'XMLListRootElement',
'XMLStringListElement']
import os
import sys
import urllib.request, urllib.parse, urllib.error
from collections import defaultdict, deque
from copy import deepcopy
from decimal import Decimal
from itertools import chain
from weakref import WeakValueDictionary
from application.python import Null
from application.python.descriptor import classproperty
from application.python.types import MarkerType
from application.python.weakref import weakobjectmap
from lxml import etree
from sipsimple.payloads.datatypes import Boolean, Byte, UnsignedByte, Short, UnsignedShort, Int, UnsignedInt, Long, UnsignedLong
from sipsimple.payloads.datatypes import PositiveInteger, NegativeInteger, NonNegativeInteger, NonPositiveInteger, DateTime, AnyURI
from sipsimple.util import All
## Exceptions
class ParserError(Exception): pass
class BuilderError(Exception): pass
class ValidationError(ParserError): pass
## Markers
class IterateTypes(metaclass=MarkerType): pass
class IterateIDs(metaclass=MarkerType): pass
class IterateItems(metaclass=MarkerType): pass
class StoredAttribute(metaclass=MarkerType): pass
## Utilities
def parse_qname(qname):
if qname[0] == '{':
qname = qname[1:]
return qname.split('}')
else:
return None, qname
## XMLDocument
class XMLDocumentType(type):
def __init__(cls, name, bases, dct):
cls.nsmap = {}
cls.schema_map = {}
cls.element_map = {}
cls.root_element = None
cls.schema = None
cls.parser = None
for base in reversed(bases):
if hasattr(base, 'element_map'):
cls.element_map.update(base.element_map)
if hasattr(base, 'schema_map'):
cls.schema_map.update(base.schema_map)
if hasattr(base, 'nsmap'):
cls.nsmap.update(base.nsmap)
cls._update_schema()
def __setattr__(cls, name, value):
if name == 'schema_path':
if cls is not XMLDocument:
raise AttributeError("%s can only be changed on XMLDocument" % name)
super(XMLDocumentType, cls).__setattr__(name, value)
def update_schema(document_class):
document_class._update_schema()
for document_subclass in document_class.__subclasses__():
update_schema(document_subclass)
update_schema(XMLDocument)
else:
super(XMLDocumentType, cls).__setattr__(name, value)
def _update_schema(cls):
if cls.schema_map:
location_map = {ns: urllib.parse.quote(os.path.abspath(os.path.join(cls.schema_path, schema_file)).replace('\\', '//')) for ns, schema_file in list(cls.schema_map.items())}
schema = """
%s
""" % '\r\n'.join('' % (namespace, schema_location) for namespace, schema_location in list(location_map.items()))
cls.schema = etree.XMLSchema(etree.XML(schema))
cls.parser = etree.XMLParser(schema=cls.schema, remove_blank_text=True)
else:
cls.schema = None
cls.parser = etree.XMLParser(remove_blank_text=True)
class XMLDocument(object, metaclass=XMLDocumentType):
encoding = 'UTF-8'
content_type = None
schema_path = os.path.join(os.path.dirname(__file__), 'xml-schemas')
@classmethod
def parse(cls, document):
try:
-# if isinstance(document, str):
-# xml = etree.XML(document, parser=cls.parser)
-# elif isinstance(document, bytes):
if isinstance(document, bytes):
- xml = etree.XML(document.decode('utf-8'), parser=cls.parser)
+ xml = etree.XML(document, parser=cls.parser)
else:
xml = etree.parse(document, parser=cls.parser).getroot()
if cls.schema is not None:
cls.schema.assertValid(xml)
return cls.root_element.from_element(xml, xml_document=cls)
except (etree.DocumentInvalid, etree.XMLSyntaxError, ValueError) as e:
raise ParserError(str(e))
@classmethod
def build(cls, root_element, encoding=None, pretty_print=False, validate=True):
if type(root_element) is not cls.root_element:
raise TypeError("can only build XML documents from root elements of type %s" % cls.root_element.__name__)
element = root_element.to_element()
if validate and cls.schema is not None:
cls.schema.assertValid(element)
# Cleanup namespaces and move element NS mappings to the global scope.
normalized_element = etree.Element(element.tag, attrib=element.attrib, nsmap=dict(chain(iter(list(element.nsmap.items())), iter(list(cls.nsmap.items())))))
normalized_element.text = element.text
normalized_element.tail = element.tail
normalized_element.extend(deepcopy(child) for child in element)
etree.cleanup_namespaces(normalized_element)
return etree.tostring(normalized_element, encoding=encoding or cls.encoding, method='xml', xml_declaration=True, pretty_print=pretty_print)
@classmethod
def create(cls, build_kw={}, **kw):
return cls.build(cls.root_element(**kw), **build_kw)
@classmethod
def register_element(cls, xml_class):
cls.element_map[xml_class.qname] = xml_class
for child in cls.__subclasses__():
child.register_element(xml_class)
@classmethod
def get_element(cls, qname, default=None):
return cls.element_map.get(qname, default)
@classmethod
def register_namespace(cls, namespace, prefix=None, schema=None):
if prefix in cls.nsmap:
raise ValueError("prefix %s is already registered in %s" % (prefix, cls.__name__))
if namespace in iter(list(cls.nsmap.values())):
raise ValueError("namespace %s is already registered in %s" % (namespace, cls.__name__))
cls.nsmap[prefix] = namespace
if schema is not None:
cls.schema_map[namespace] = schema
cls._update_schema()
for child in cls.__subclasses__():
child.register_namespace(namespace, prefix, schema)
@classmethod
def unregister_namespace(cls, namespace):
try:
prefix = next((prefix for prefix in cls.nsmap if cls.nsmap[prefix]==namespace))
except StopIteration:
raise KeyError("namespace %s is not registered in %s" % (namespace, cls.__name__))
del cls.nsmap[prefix]
schema = cls.schema_map.pop(namespace, None)
if schema is not None:
cls._update_schema()
for child in cls.__subclasses__():
try:
child.unregister_namespace(namespace)
except KeyError:
pass
## Children descriptors
class XMLAttribute(object):
def __init__(self, name, xmlname=None, type=str, default=None, required=False, test_equal=True, onset=None, ondel=None):
self.name = name
self.xmlname = xmlname or name
self.type = type
self.default = default
self.__xmlparse__ = getattr(type, '__xmlparse__', lambda value: value)
self.__xmlbuild__ = getattr(type, '__xmlbuild__', str)
self.required = required
self.test_equal = test_equal
self.onset = onset
self.ondel = ondel
self.values = weakobjectmap()
def __get__(self, obj, objtype):
if obj is None:
return self
try:
return self.values[obj]
except KeyError:
value = self.values.setdefault(obj, self.default)
if value is not None:
obj.element.set(self.xmlname, self.build(value))
return value
def __set__(self, obj, value):
if value is not None and not isinstance(value, self.type):
value = self.type(value)
old_value = self.values.get(obj, self.default)
if value == old_value:
return
if value is not None:
obj.element.set(self.xmlname, self.build(value))
else:
obj.element.attrib.pop(self.xmlname, None)
self.values[obj] = value
obj.__dirty__ = True
if self.onset:
self.onset(obj, self, value)
def __delete__(self, obj):
obj.element.attrib.pop(self.xmlname, None)
try:
value = self.values.pop(obj)
except KeyError:
pass
else:
if value != self.default:
obj.__dirty__ = True
if self.ondel:
self.ondel(obj, self)
def parse(self, xmlvalue):
return self.__xmlparse__(xmlvalue)
def build(self, value):
return self.__xmlbuild__(value)
class XMLElementID(XMLAttribute):
"""An XMLAttribute that represents the ID of an element (immutable)."""
def __set__(self, obj, value):
if obj in self.values:
raise AttributeError("An XML element ID cannot be changed")
super(XMLElementID, self).__set__(obj, value)
def __delete__(self, obj):
raise AttributeError("An XML element ID cannot be deleted")
class XMLElementChild(object):
def __init__(self, name, type, required=False, test_equal=True, onset=None, ondel=None):
self.name = name
self.type = type
self.required = required
self.test_equal = test_equal
self.onset = onset
self.ondel = ondel
self.values = weakobjectmap()
def __get__(self, obj, objtype):
if obj is None:
return self
try:
return self.values[obj]
except KeyError:
return None
def __set__(self, obj, value):
if value is not None and not isinstance(value, self.type):
value = self.type(value)
same_value = False
old_value = self.values.get(obj)
if value is old_value:
return
elif value is not None and value == old_value:
value.__dirty__ = old_value.__dirty__
same_value = True
if old_value is not None:
obj.element.remove(old_value.element)
if value is not None:
obj._insert_element(value.element)
self.values[obj] = value
if not same_value:
obj.__dirty__ = True
if self.onset:
self.onset(obj, self, value)
def __delete__(self, obj):
try:
old_value = self.values.pop(obj)
except KeyError:
pass
else:
if old_value is not None:
obj.element.remove(old_value.element)
obj.__dirty__ = True
if self.ondel:
self.ondel(obj, self)
class XMLElementChoiceChildWrapper(object):
__slots__ = ('descriptor', 'type')
def __init__(self, descriptor, type):
self.descriptor = descriptor
self.type = type
def __getattribute__(self, name):
if name in ('descriptor', 'type', 'register_extension', 'unregister_extension'):
return super(XMLElementChoiceChildWrapper, self).__getattribute__(name)
else:
return self.descriptor.__getattribute__(name)
def __setattr__(self, name, value):
if name in ('descriptor', 'type'):
super(XMLElementChoiceChildWrapper, self).__setattr__(name, value)
else:
setattr(self.descriptor, name, value)
def __dir__(self):
return dir(self.descriptor) + ['descriptor', 'type', 'register_extension', 'unregister_extension']
def register_extension(self, type):
if self.extension_type is None:
raise ValueError("The %s XML choice element of %s does not support extensions" % (self.name, self.type.__name__))
if not issubclass(type, XMLElement) or not issubclass(type, self.extension_type):
raise TypeError("type is not a subclass of XMLElement and/or %s: %s" % (self.extension_type.__name__, type.__name__))
if type in self.types:
raise ValueError("%s is already registered as a choice extension" % type.__name__)
self.types.add(type)
self.type._xml_children_qname_map[type.qname] = (self.descriptor, type)
for child_class in self.type.__subclasses__():
child_class._xml_children_qname_map[type.qname] = (self.descriptor, type)
def unregister_extension(self, type):
if self.extension_type is None:
raise ValueError("The %s XML choice element of %s does not support extensions" % (self.name, self.type.__name__))
try:
self.types.remove(type)
except ValueError:
raise ValueError("%s is not a registered choice extension on %s" % (type.__name__, self.type.__name__))
del self.type._xml_children_qname_map[type.qname]
for child_class in self.type.__subclasses__():
del child_class._xml_children_qname_map[type.qname]
class XMLElementChoiceChild(object):
def __init__(self, name, types, extension_type=None, required=False, test_equal=True, onset=None, ondel=None):
self.name = name
self.types = set(types)
self.extension_type = extension_type
self.required = required
self.test_equal = test_equal
self.onset = onset
self.ondel = ondel
self.values = weakobjectmap()
def __get__(self, obj, objtype):
if obj is None:
return XMLElementChoiceChildWrapper(self, objtype)
try:
return self.values[obj]
except KeyError:
return None
def __set__(self, obj, value):
if value is not None and type(value) not in self.types:
raise TypeError("%s is not an acceptable type for %s" % (value.__class__.__name__, obj.__class__.__name__))
same_value = False
old_value = self.values.get(obj)
if value is old_value:
return
elif value is not None and value == old_value:
value.__dirty__ = old_value.__dirty__
same_value = True
if old_value is not None:
obj.element.remove(old_value.element)
if value is not None:
obj._insert_element(value.element)
self.values[obj] = value
if not same_value:
obj.__dirty__ = True
if self.onset:
self.onset(obj, self, value)
def __delete__(self, obj):
try:
old_value = self.values.pop(obj)
except KeyError:
pass
else:
if old_value is not None:
obj.element.remove(old_value.element)
obj.__dirty__ = True
if self.ondel:
self.ondel(obj, self)
class XMLStringChoiceChild(XMLElementChoiceChild):
"""
A choice between keyword strings from a registry, custom strings from the
other type and custom extensions. This descriptor will accept and return
strings instead of requiring XMLElement instances for the values in the
registry and the other type. Check XMLEmptyElementRegistryType for a
metaclass for building registries of XMLEmptyElement classes for keywords.
"""
def __init__(self, name, registry=None, other_type=None, extension_type=None):
self.registry = registry
self.other_type = other_type
self.extension_type = extension_type
types = registry.classes if registry is not None else ()
types += (other_type,) if other_type is not None else ()
super(XMLStringChoiceChild, self).__init__(name, types, extension_type=extension_type, required=True, test_equal=True)
def __get__(self, obj, objtype):
value = super(XMLStringChoiceChild, self).__get__(obj, objtype)
if obj is None or objtype is StoredAttribute or value is None or isinstance(value, self.extension_type or ()):
return value
else:
return str(value)
def __set__(self, obj, value):
if isinstance(value, str):
if self.registry is not None and value in self.registry.names:
value = self.registry.class_map[value]()
elif self.other_type is not None:
value = self.other_type.from_string(value)
super(XMLStringChoiceChild, self).__set__(obj, value)
## XMLElement base classes
class XMLElementBase(object):
"""
This class is used as a common ancestor for XML elements and provides
the means for super() to find at least dummy implementations for the
methods that are supposed to be implemented by subclasses, even when
they are not implemented by any other ancestor class. This is necessary
in order to simplify access to these methods when multiple inheritance
is involved and none or only some of the classes implement them.
The methods declared here should to be implemented in subclasses as
necessary.
"""
def __get_dirty__(self):
return False
def __set_dirty__(self, dirty):
return
def _build_element(self):
return
def _parse_element(self, element):
return
class XMLElementType(type):
def __init__(cls, name, bases, dct):
super(XMLElementType, cls).__init__(name, bases, dct)
# set dictionary of xml attributes and xml child elements
cls._xml_attributes = {}
cls._xml_element_children = {}
cls._xml_children_qname_map = {}
for base in reversed(bases):
if hasattr(base, '_xml_attributes'):
cls._xml_attributes.update(base._xml_attributes)
if hasattr(base, '_xml_element_children') and hasattr(base, '_xml_children_qname_map'):
cls._xml_element_children.update(base._xml_element_children)
cls._xml_children_qname_map.update(base._xml_children_qname_map)
for name, value in list(dct.items()):
if isinstance(value, XMLElementID):
if cls._xml_id is not None:
raise AttributeError("Only one XMLElementID attribute can be defined in the %s class" % cls.__name__)
cls._xml_id = value
cls._xml_attributes[value.name] = value
elif isinstance(value, XMLAttribute):
cls._xml_attributes[value.name] = value
elif isinstance(value, XMLElementChild):
cls._xml_element_children[value.name] = value
cls._xml_children_qname_map[value.type.qname] = (value, value.type)
elif isinstance(value, XMLElementChoiceChild):
cls._xml_element_children[value.name] = value
for type in value.types:
cls._xml_children_qname_map[type.qname] = (value, type)
# register class in its XMLDocument
if cls._xml_document is not None:
cls._xml_document.register_element(cls)
class XMLElement(XMLElementBase, metaclass=XMLElementType):
_xml_tag = None # To be defined in subclass
_xml_namespace = None # To be defined in subclass
_xml_document = None # To be defined in subclass
_xml_extension_type = None # Can be defined in subclass
_xml_id = None # Can be defined in subclass, or will be set by the metaclass to the XMLElementID attribute (if present)
_xml_children_order = {} # Can be defined in subclass
# dynamically generated
_xml_attributes = {}
_xml_element_children = {}
_xml_children_qname_map = {}
qname = classproperty(lambda cls: '{%s}%s' % (cls._xml_namespace, cls._xml_tag))
def __init__(self):
self.element = etree.Element(self.qname, nsmap=self._xml_document.nsmap)
self.__dirty__ = True
def __get_dirty__(self):
return (self.__dict__['__dirty__']
or any(child.__dirty__ for child in (getattr(self, name) for name in self._xml_element_children) if child is not None)
or super(XMLElement, self).__get_dirty__())
def __set_dirty__(self, dirty):
super(XMLElement, self).__set_dirty__(dirty)
if not dirty:
for child in (child for child in (getattr(self, name) for name in self._xml_element_children) if child is not None):
child.__dirty__ = dirty
self.__dict__['__dirty__'] = dirty
__dirty__ = property(__get_dirty__, __set_dirty__)
def check_validity(self):
# check attributes
for name, attribute in list(self._xml_attributes.items()):
# if attribute has default but it was not set, will also be added with this occasion
value = getattr(self, name, None)
if attribute.required and value is None:
raise ValidationError("required attribute %s of %s is not set" % (name, self.__class__.__name__))
# check element children
for name, element_child in list(self._xml_element_children.items()):
# if child has default but it was not set, will also be added with this occasion
child = getattr(self, name, None)
if child is None and element_child.required:
raise ValidationError("element child %s of %s is not set" % (name, self.__class__.__name__))
def to_element(self):
try:
self.check_validity()
except ValidationError as e:
raise BuilderError(str(e))
# build element children
for name in self._xml_element_children:
descriptor = getattr(self.__class__, name)
child = descriptor.__get__(self, StoredAttribute)
if child is not None:
child.to_element()
self._build_element()
return self.element
@classmethod
def from_element(cls, element, xml_document=None):
obj = cls.__new__(cls)
obj._xml_document = xml_document if xml_document is not None else cls._xml_document
obj.element = element
# set known attributes
for name, attribute in list(cls._xml_attributes.items()):
xmlvalue = element.get(attribute.xmlname, None)
if xmlvalue is not None:
try:
setattr(obj, name, attribute.parse(xmlvalue))
except (ValueError, TypeError):
raise ValidationError("got illegal value for attribute %s of %s: %s" % (name, cls.__name__, xmlvalue))
# set element children
for child in element:
element_child, type = cls._xml_children_qname_map.get(child.tag, (None, None))
if element_child is not None:
try:
value = type.from_element(child, xml_document=obj._xml_document)
except ValidationError:
pass # we should accept partially valid documents
else:
setattr(obj, element_child.name, value)
obj._parse_element(element)
obj.check_validity()
obj.__dirty__ = False
return obj
@classmethod
def _register_xml_attribute(cls, attribute, element):
cls._xml_element_children[attribute] = element
cls._xml_children_qname_map[element.type.qname] = (element, element.type)
for subclass in cls.__subclasses__():
subclass._register_xml_attribute(attribute, element)
@classmethod
def _unregister_xml_attribute(cls, attribute):
element = cls._xml_element_children.pop(attribute)
del cls._xml_children_qname_map[element.type.qname]
for subclass in cls.__subclasses__():
subclass._unregister_xml_attribute(attribute)
@classmethod
def register_extension(cls, attribute, type, test_equal=True):
if cls._xml_extension_type is None:
raise ValueError("XMLElement type %s does not support extensions (requested extension type %s)" % (cls.__name__, type.__name__))
elif not issubclass(type, cls._xml_extension_type):
raise TypeError("XMLElement type %s only supports extensions of type %s (requested extension type %s)" % (cls.__name__, cls._xml_extension_type, type.__name__))
elif hasattr(cls, attribute):
raise ValueError("XMLElement type %s already has an attribute named %s (requested extension type %s)" % (cls.__name__, attribute, type.__name__))
extension = XMLElementChild(attribute, type=type, required=False, test_equal=test_equal)
setattr(cls, attribute, extension)
cls._register_xml_attribute(attribute, extension)
@classmethod
def unregister_extension(cls, attribute):
if cls._xml_extension_type is None:
raise ValueError("XMLElement type %s does not support extensions" % cls.__name__)
cls._unregister_xml_attribute(attribute)
delattr(cls, attribute)
def _insert_element(self, element):
if element in self.element:
return
order = self._xml_children_order.get(element.tag, self._xml_children_order.get(None, sys.maxsize))
for i in range(len(self.element)):
child_order = self._xml_children_order.get(self.element[i].tag, self._xml_children_order.get(None, sys.maxsize))
if child_order > order:
position = i
break
else:
position = len(self.element)
self.element.insert(position, element)
def __eq__(self, other):
if isinstance(other, XMLElement):
if self is other:
return True
for name, attribute in list(self._xml_attributes.items()):
if attribute.test_equal:
if not hasattr(other, name) or getattr(self, name) != getattr(other, name):
return False
for name, element_child in list(self._xml_element_children.items()):
if element_child.test_equal:
if not hasattr(other, name) or getattr(self, name) != getattr(other, name):
return False
try:
__eq__ = super(XMLElement, self).__eq__
except AttributeError:
return True
else:
return __eq__(other)
elif self._xml_id is not None:
return self._xml_id == other
else:
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def __hash__(self):
if self._xml_id is not None:
return hash(self._xml_id)
else:
return object.__hash__(self)
class XMLRootElementType(XMLElementType):
def __init__(cls, name, bases, dct):
super(XMLRootElementType, cls).__init__(name, bases, dct)
if cls._xml_document is not None:
if cls._xml_document.root_element is not None:
raise TypeError('there is already a root element registered for %s' % cls.__name__)
cls._xml_document.root_element = cls
class XMLRootElement(XMLElement, metaclass=XMLRootElementType):
def __init__(self):
XMLElement.__init__(self)
self.__cache__ = WeakValueDictionary({self.element: self})
@classmethod
def from_element(cls, element, xml_document=None):
obj = super(XMLRootElement, cls).from_element(element, xml_document)
obj.__cache__ = WeakValueDictionary({obj.element: obj})
return obj
@classmethod
def parse(cls, document):
return cls._xml_document.parse(document)
def toxml(self, encoding=None, pretty_print=False, validate=True):
return self._xml_document.build(self, encoding=encoding, pretty_print=pretty_print, validate=validate)
def xpath(self, xpath, namespaces=None):
result = []
try:
nodes = self.element.xpath(xpath, namespaces=namespaces)
except etree.XPathError:
raise ValueError("illegal XPath expression")
for element in (node for node in nodes if isinstance(node, etree._Element)):
if element in self.__cache__:
result.append(self.__cache__[element])
continue
if element is self.element:
self.__cache__[element] = self
result.append(self)
continue
for ancestor in element.iterancestors():
if ancestor in self.__cache__:
container = self.__cache__[ancestor]
break
else:
container = self
notvisited = deque([container])
visited = set()
while notvisited:
container = notvisited.popleft()
self.__cache__[container.element] = container
if isinstance(container, XMLListMixin):
children = set(child for child in container if isinstance(child, XMLElement) and child not in visited)
visited.update(children)
notvisited.extend(children)
for child in container._xml_element_children:
value = getattr(container, child)
if value is not None and value not in visited:
visited.add(value)
notvisited.append(value)
if element in self.__cache__:
result.append(self.__cache__[element])
return result
def get_xpath(self, element):
raise NotImplementedError
def find_parent(self, element):
raise NotImplementedError
## Mixin classes
class ThisClass(object):
"""
Special marker class that is used to indicate that an XMLListElement
subclass can be an item of itself. This is necessary because a class
cannot reference itself when defining _xml_item_type
"""
class XMLListMixinType(type):
def __init__(cls, name, bases, dct):
super(XMLListMixinType, cls).__init__(name, bases, dct)
if '_xml_item_type' in dct:
cls._xml_item_type = cls._xml_item_type # trigger __setattr__
def __setattr__(cls, name, value):
if name == '_xml_item_type':
if value is ThisClass:
value = cls
elif isinstance(value, tuple) and ThisClass in value:
value = tuple(cls if type is ThisClass else type for type in value)
if value is None:
cls._xml_item_element_types = ()
cls._xml_item_extension_types = ()
else:
item_types = value if isinstance(value, tuple) else (value,)
cls._xml_item_element_types = tuple(type for type in item_types if issubclass(type, XMLElement))
cls._xml_item_extension_types = tuple(type for type in item_types if not issubclass(type, XMLElement))
super(XMLListMixinType, cls).__setattr__(name, value)
class XMLListMixin(XMLElementBase, metaclass=XMLListMixinType):
"""A mixin representing a list of other XML elements"""
_xml_item_type = None
def __new__(cls, *args, **kw):
if cls._xml_item_type is None:
raise TypeError("The %s class cannot be instantiated because it doesn't define the _xml_item_type attribute" % cls.__name__)
instance = super(XMLListMixin, cls).__new__(cls)
instance._element_map = {}
instance._xmlid_map = defaultdict(dict)
return instance
def __contains__(self, item):
return item in iter(list(self._element_map.values()))
def __iter__(self):
return (self._element_map[element] for element in self.element if element in self._element_map)
def __len__(self):
return len(self._element_map)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, list(self))
def __eq__(self, other):
if isinstance(other, XMLListMixin):
return self is other or (len(self) == len(other) and all(self_item == other_item for self_item, other_item in zip(self, other)))
else:
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
def __getitem__(self, key):
if key is IterateTypes:
return (cls for cls, mapping in list(self._xmlid_map.items()) if mapping)
if not isinstance(key, tuple):
raise KeyError(key)
try:
cls, id = key
except ValueError:
raise KeyError(key)
if id is IterateIDs:
return iter(list(self._xmlid_map[cls].keys()))
elif id is IterateItems:
return iter(list(self._xmlid_map[cls].values()))
else:
return self._xmlid_map[cls][id]
def __delitem__(self, key):
if not isinstance(key, tuple):
raise KeyError(key)
try:
cls, id = key
except ValueError:
raise KeyError(key)
if id is All:
for item in list(self._xmlid_map[cls].values()):
self.remove(item)
else:
self.remove(self._xmlid_map[cls][id])
def __get_dirty__(self):
return any(item.__dirty__ for item in list(self._element_map.values())) or super(XMLListMixin, self).__get_dirty__()
def __set_dirty__(self, dirty):
super(XMLListMixin, self).__set_dirty__(dirty)
if not dirty:
for item in list(self._element_map.values()):
item.__dirty__ = dirty
def _parse_element(self, element):
super(XMLListMixin, self)._parse_element(element)
self._element_map.clear()
self._xmlid_map.clear()
for child in element[:]:
child_class = self._xml_document.get_element(child.tag, type(None))
if child_class in self._xml_item_element_types or issubclass(child_class, self._xml_item_extension_types):
try:
value = child_class.from_element(child, xml_document=self._xml_document)
except ValidationError:
pass
else:
if value._xml_id is not None and value._xml_id in self._xmlid_map[child_class]:
element.remove(child)
else:
if value._xml_id is not None:
self._xmlid_map[child_class][value._xml_id] = value
self._element_map[value.element] = value
def _build_element(self):
super(XMLListMixin, self)._build_element()
for child in list(self._element_map.values()):
child.to_element()
def add(self, item):
if not (item.__class__ in self._xml_item_element_types or isinstance(item, self._xml_item_extension_types)):
raise TypeError("%s cannot add items of type %s" % (self.__class__.__name__, item.__class__.__name__))
same_value = False
if item._xml_id is not None and item._xml_id in self._xmlid_map[item.__class__]:
old_item = self._xmlid_map[item.__class__][item._xml_id]
if item is old_item:
return
elif item == old_item:
item.__dirty__ = old_item.__dirty__
same_value = True
self.element.remove(old_item.element)
del self._xmlid_map[item.__class__][item._xml_id]
del self._element_map[old_item.element]
self._insert_element(item.element)
if item._xml_id is not None:
self._xmlid_map[item.__class__][item._xml_id] = item
self._element_map[item.element] = item
if not same_value:
self.__dirty__ = True
def remove(self, item):
self.element.remove(item.element)
if item._xml_id is not None:
del self._xmlid_map[item.__class__][item._xml_id]
del self._element_map[item.element]
self.__dirty__ = True
def update(self, sequence):
for item in sequence:
self.add(item)
def clear(self):
for item in list(self._element_map.values()):
self.remove(item)
## Element types
class XMLSimpleElement(XMLElement):
_xml_value_type = None # To be defined in subclass
def __new__(cls, *args, **kw):
if cls._xml_value_type is None:
raise TypeError("The %s class cannot be instantiated because it doesn't define the _xml_value_type attribute" % cls.__name__)
return super(XMLSimpleElement, cls).__new__(cls)
def __init__(self, value):
XMLElement.__init__(self)
self.value = value
def __eq__(self, other):
if isinstance(other, XMLSimpleElement):
return self is other or self.value == other.value
else:
return self.value == other
def __bool__(self):
return bool(self.value)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.value)
def __str__(self):
return str(self.value)
def __unicode__(self):
return str(self.value)
@property
def value(self):
return self.__dict__['value']
@value.setter
def value(self, value):
if not isinstance(value, self._xml_value_type):
value = self._xml_value_type(value)
if self.__dict__.get('value', Null) == value:
return
self.__dict__['value'] = value
self.__dirty__ = True
def _parse_element(self, element):
super(XMLSimpleElement, self)._parse_element(element)
value = element.text or ''
if hasattr(self._xml_value_type, '__xmlparse__'):
self.value = self._xml_value_type.__xmlparse__(value)
else:
self.value = self._xml_value_type(value)
def _build_element(self):
super(XMLSimpleElement, self)._build_element()
if hasattr(self.value, '__xmlbuild__'):
self.element.text = self.value.__xmlbuild__()
else:
self.element.text = str(self.value)
class XMLStringElement(XMLSimpleElement):
_xml_value_type = str # Can be overwritten in subclasses
def __len__(self):
return len(self.value)
class XMLLocalizedStringElement(XMLStringElement):
lang = XMLAttribute('lang', xmlname='{http://www.w3.org/XML/1998/namespace}lang', type=str, required=False, test_equal=True)
def __init__(self, value, lang=None):
XMLStringElement.__init__(self, value)
self.lang = lang
def __eq__(self, other):
if isinstance(other, XMLLocalizedStringElement):
return self is other or (self.lang == other.lang and self.value == other.value)
elif self.lang is None:
return XMLStringElement.__eq__(self, other)
else:
return NotImplemented
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.value, self.lang)
def _parse_element(self, element):
super(XMLLocalizedStringElement, self)._parse_element(element)
self.lang = element.get('{http://www.w3.org/XML/1998/namespace}lang', None)
class XMLBooleanElement(XMLSimpleElement):
_xml_value_type = Boolean
class XMLByteElement(XMLSimpleElement):
_xml_value_type = Byte
class XMLUnsignedByteElement(XMLSimpleElement):
_xml_value_type = UnsignedByte
class XMLShortElement(XMLSimpleElement):
_xml_value_type = Short
class XMLUnsignedShortElement(XMLSimpleElement):
_xml_value_type = UnsignedShort
class XMLIntElement(XMLSimpleElement):
_xml_value_type = Int
class XMLUnsignedIntElement(XMLSimpleElement):
_xml_value_type = UnsignedInt
class XMLLongElement(XMLSimpleElement):
_xml_value_type = Long
class XMLUnsignedLongElement(XMLSimpleElement):
_xml_value_type = UnsignedLong
class XMLIntegerElement(XMLSimpleElement):
_xml_value_type = int
class XMLPositiveIntegerElement(XMLSimpleElement):
_xml_value_type = PositiveInteger
class XMLNegativeIntegerElement(XMLSimpleElement):
_xml_value_type = NegativeInteger
class XMLNonNegativeIntegerElement(XMLSimpleElement):
_xml_value_type = NonNegativeInteger
class XMLNonPositiveIntegerElement(XMLSimpleElement):
_xml_value_type = NonPositiveInteger
class XMLDecimalElement(XMLSimpleElement):
_xml_value_type = Decimal
class XMLDateTimeElement(XMLSimpleElement):
_xml_value_type = DateTime
class XMLAnyURIElement(XMLStringElement):
_xml_value_type = AnyURI
class XMLEmptyElement(XMLElement):
def __repr__(self):
return '%s()' % self.__class__.__name__
def __eq__(self, other):
return type(self) is type(other) or NotImplemented
def __hash__(self):
return hash(self.__class__)
class XMLEmptyElementRegistryType(type):
"""A metaclass for building registries of XMLEmptyElement subclasses from names"""
def __init__(cls, name, bases, dct):
super(XMLEmptyElementRegistryType, cls).__init__(name, bases, dct)
typename = getattr(cls, '__typename__', name.partition('Registry')[0]).capitalize()
class BaseElementType(XMLEmptyElement):
def __str__(self): return self._xml_tag
def __unicode__(self): return str(self._xml_tag)
cls.__basetype__ = BaseElementType
cls.__basetype__.__name__ = 'Base%sType' % typename
cls.class_map = {}
for name in cls.names:
class ElementType(BaseElementType):
_xml_tag = name
_xml_namespace = cls._xml_namespace
_xml_document = cls._xml_document
_xml_id = name
translation_table = dict.fromkeys(map(ord, '-_'), None)
ElementType.__name__ = typename + name.title().translate(translation_table)
cls.class_map[name] = ElementType
cls.classes = tuple(cls.class_map[name] for name in cls.names)
## Created using mixins
class XMLListElementType(XMLElementType, XMLListMixinType): pass
class XMLListRootElementType(XMLRootElementType, XMLListMixinType): pass
class XMLListElement(XMLElement, XMLListMixin, metaclass=XMLListElementType):
def __bool__(self):
if self._xml_attributes or self._xml_element_children:
return True
else:
return len(self._element_map) != 0
class XMLListRootElement(XMLRootElement, XMLListMixin, metaclass=XMLListRootElementType):
def __bool__(self):
if self._xml_attributes or self._xml_element_children:
return True
else:
return len(self._element_map) != 0
class XMLStringListElementType(XMLListElementType):
def __init__(cls, name, bases, dct):
if cls._xml_item_type is not None:
raise TypeError("The %s class should not define _xml_item_type, but define _xml_item_registry, _xml_item_other_type and _xml_item_extension_type instead" % cls.__name__)
types = cls._xml_item_registry.classes if cls._xml_item_registry is not None else ()
types += tuple(type for type in (cls._xml_item_other_type, cls._xml_item_extension_type) if type is not None)
cls._xml_item_type = types or None
super(XMLStringListElementType, cls).__init__(name, bases, dct)
class XMLStringListElement(XMLListElement, metaclass=XMLStringListElementType):
_xml_item_registry = None
_xml_item_other_type = None
_xml_item_extension_type = None
def __contains__(self, item):
if isinstance(item, str):
if self._xml_item_registry is not None and item in self._xml_item_registry.names:
item = self._xml_item_registry.class_map[item]()
elif self._xml_item_other_type is not None:
item = self._xml_item_other_type.from_string(item)
return item in iter(list(self._element_map.values()))
def __iter__(self):
return (item if isinstance(item, self._xml_item_extension_types) else str(item) for item in super(XMLStringListElement, self).__iter__())
def add(self, item):
if isinstance(item, str):
if self._xml_item_registry is not None and item in self._xml_item_registry.names:
item = self._xml_item_registry.class_map[item]()
elif self._xml_item_other_type is not None:
item = self._xml_item_other_type.from_string(item)
super(XMLStringListElement, self).add(item)
def remove(self, item):
if isinstance(item, str):
if self._xml_item_registry is not None and item in self._xml_item_registry.names:
xmlitem = self._xml_item_registry.class_map[item]()
try:
item = next((entry for entry in super(XMLStringListElement, self).__iter__() if entry == xmlitem))
except StopIteration:
raise KeyError(item)
elif self._xml_item_other_type is not None:
xmlitem = self._xml_item_other_type.from_string(item)
try:
item = next((entry for entry in super(XMLStringListElement, self).__iter__() if entry == xmlitem))
except StopIteration:
raise KeyError(item)
super(XMLStringListElement, self).remove(item)