diff --git a/xcap/__init__.py b/xcap/__init__.py index 5b4ce9b..519f535 100644 --- a/xcap/__init__.py +++ b/xcap/__init__.py @@ -1,9 +1,6 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - """XCAP package""" __version__ = "2.3.0" __cfgfile__ = "config.ini" diff --git a/xcap/appusage/__init__.py b/xcap/appusage/__init__.py index 4891cf2..70832e8 100644 --- a/xcap/appusage/__init__.py +++ b/xcap/appusage/__init__.py @@ -1,377 +1,374 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - """XCAP application usage module""" import os import sys from cStringIO import StringIO from lxml import etree from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import StringList from application import log import xcap from xcap import errors from xcap import element from xcap.backend import StatusResponse class Backend(object): """Configuration datatype, used to select a backend module from the configuration file.""" def __new__(typ, value): value = value.lower() try: return __import__('xcap.backend.%s' % value, globals(), locals(), ['']) except (ImportError, AssertionError), e: log.fatal("Cannot load '%s' backend module: %s" % (value, str(e))) sys.exit(1) except Exception, e: log.err() sys.exit(1) class ServerConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Server' backend = ConfigSetting(type=Backend, value=None) disabled_applications = ConfigSetting(type=StringList, value=[]) document_validation = True if ServerConfig.backend is None: log.fatal("OpenXCAP needs a backend to be specified in order to run") sys.exit(1) class ApplicationUsage(object): """Base class defining an XCAP application""" id = None ## the Application Unique ID (AUID) default_ns = None ## the default XML namespace mime_type = None ## the MIME type schema_file = None ## filename of the schema for the application def __init__(self, storage): ## the XML schema that defines valid documents for this application if self.schema_file: xml_schema_doc = etree.parse(open(os.path.join(os.path.dirname(__file__), 'xml-schemas', self.schema_file), 'r')) self.xml_schema = etree.XMLSchema(xml_schema_doc) else: class EverythingIsValid(object): def __call__(self, *args, **kw): return True def validate(self, *args, **kw): return True self.xml_schema = EverythingIsValid() if storage is not None: self.storage = storage ## Validation def _check_UTF8_encoding(self, xml_doc): """Check if the document is UTF8 encoded. Raise an NotUTF8Error if it's not.""" if xml_doc.docinfo.encoding.lower() != 'utf-8': raise errors.NotUTF8Error(comment='document encoding is %s' % xml_doc.docinfo.encoding) def _check_schema_validation(self, xml_doc): """Check if the given XCAP document validates against the application's schema""" if not self.xml_schema(xml_doc): raise errors.SchemaValidationError(comment=self.xml_schema.error_log) def _check_additional_constraints(self, xml_doc): """Check additional validations constraints for this XCAP document. Should be overriden in subclasses if specified by the application usage, and raise a ConstraintFailureError if needed.""" def validate_document(self, xcap_doc): """Check if a document is valid for this application.""" try: xml_doc = etree.parse(StringIO(xcap_doc)) # XXX do not use TreeBuilder here except etree.XMLSyntaxError, ex: ex.http_error = errors.NotWellFormedError(comment=str(ex)) raise except Exception, ex: ex.http_error = errors.NotWellFormedError() raise self._check_UTF8_encoding(xml_doc) if ServerConfig.document_validation: self._check_schema_validation(xml_doc) self._check_additional_constraints(xml_doc) ## Authorization policy def is_authorized(self, xcap_user, xcap_uri): """Default authorization policy. Authorizes an XCAPUser for an XCAPUri. Return True if the user is authorized, False otherwise.""" return xcap_user and xcap_user == xcap_uri.user ## Document management def _not_implemented(self, context): raise errors.ResourceNotFound("Application %s does not implement %s context" % (self.id, context)) def get_document(self, uri, check_etag): context = uri.doc_selector.context if context == 'global': return self.get_document_global(uri, check_etag) elif context == 'users': return self.get_document_local(uri, check_etag) else: self._not_implemented(context) def get_document_global(self, uri, check_etag): self._not_implemented('global') def get_document_local(self, uri, check_etag): return self.storage.get_document(uri, check_etag) def put_document(self, uri, document, check_etag): self.validate_document(document) return self.storage.put_document(uri, document, check_etag) def delete_document(self, uri, check_etag): return self.storage.delete_document(uri, check_etag) ## Element management def _cb_put_element(self, response, uri, element_body, check_etag): """This is called when the document that relates to the element is retrieved.""" if response.code == 404: ### XXX let the storate raise raise errors.NoParentError ### catch error in errback and attach http_error fixed_element_selector = uri.node_selector.element_selector.fix_star(element_body) try: result = element.put(response.data, fixed_element_selector, element_body) except element.SelectorError, ex: ex.http_error = errors.NoParentError(comment=str(ex)) raise if result is None: raise errors.NoParentError new_document, created = result get_result = element.get(new_document, uri.node_selector.element_selector) if get_result != element_body.strip(): raise errors.CannotInsertError('PUT request failed GET(PUT(x))==x invariant') d = self.put_document(uri, new_document, check_etag) def set_201_code(response): try: if response.code==200: response.code = 201 except AttributeError: pass return response if created: d.addCallback(set_201_code) return d def put_element(self, uri, element_body, check_etag): try: element.check_xml_fragment(element_body) except element.sax.SAXParseException, ex: ex.http_error = errors.NotXMLFragmentError(comment=str(ex)) raise except Exception, ex: ex.http_error = errors.NotXMLFragmentError() raise d = self.get_document(uri, check_etag) return d.addCallbacks(self._cb_put_element, callbackArgs=(uri, element_body, check_etag)) def _cb_get_element(self, response, uri): """This is called when the document related to the element is retrieved.""" if response.code == 404: ## XXX why not let the storage raise? raise errors.ResourceNotFound("The requested document %s was not found on this server" % uri.doc_selector) result = element.get(response.data, uri.node_selector.element_selector) if not result: msg = "The requested element %s was not found in the document %s" % (uri.node_selector, uri.doc_selector) raise errors.ResourceNotFound(msg) return StatusResponse(200, response.etag, result) def get_element(self, uri, check_etag): d = self.get_document(uri, check_etag) return d.addCallbacks(self._cb_get_element, callbackArgs=(uri, )) def _cb_delete_element(self, response, uri, check_etag): if response.code == 404: raise errors.ResourceNotFound("The requested document %s was not found on this server" % uri.doc_selector) new_document = element.delete(response.data, uri.node_selector.element_selector) if not new_document: raise errors.ResourceNotFound get_result = element.find(new_document, uri.node_selector.element_selector) if get_result: raise errors.CannotDeleteError('DELETE request failed GET(DELETE(x))==404 invariant') return self.put_document(uri, new_document, check_etag) def delete_element(self, uri, check_etag): d = self.get_document(uri, check_etag) return d.addCallbacks(self._cb_delete_element, callbackArgs=(uri, check_etag)) ## Attribute management def _cb_get_attribute(self, response, uri): """This is called when the document that relates to the attribute is retrieved.""" if response.code == 404: raise errors.ResourceNotFound document = response.data xml_doc = etree.parse(StringIO(document)) application = getApplicationForURI(uri) ns_dict = uri.node_selector.get_ns_bindings(application.default_ns) try: xpath = uri.node_selector.replace_default_prefix() attribute = xml_doc.xpath(xpath, namespaces = ns_dict) except Exception, ex: ex.http_error = errors.ResourceNotFound() raise if not attribute: raise errors.ResourceNotFound elif len(attribute) != 1: raise errors.ResourceNotFound('XPATH expression is ambiguous') # TODO # The server MUST NOT add namespace bindings representing namespaces # used by the element or its children, but declared in ancestor elements return StatusResponse(200, response.etag, attribute[0]) def get_attribute(self, uri, check_etag): d = self.get_document(uri, check_etag) return d.addCallbacks(self._cb_get_attribute, callbackArgs=(uri, )) def _cb_delete_attribute(self, response, uri, check_etag): if response.code == 404: raise errors.ResourceNotFound document = response.data xml_doc = etree.parse(StringIO(document)) application = getApplicationForURI(uri) ns_dict = uri.node_selector.get_ns_bindings(application.default_ns) try: elem = xml_doc.xpath(uri.node_selector.replace_default_prefix(append_terminal=False),namespaces=ns_dict) except Exception, ex: ex.http_error = errors.ResourceNotFound() raise if not elem: raise errors.ResourceNotFound if len(elem) != 1: raise errors.ResourceNotFound('XPATH expression is ambiguous') elem = elem[0] attribute = uri.node_selector.terminal_selector.attribute if elem.get(attribute): ## check if the attribute exists XXX use KeyError instead del elem.attrib[attribute] else: raise errors.ResourceNotFound new_document = etree.tostring(xml_doc, encoding='UTF-8', xml_declaration=True) return self.put_document(uri, new_document, check_etag) def delete_attribute(self, uri, check_etag): d = self.get_document(uri, check_etag) return d.addCallbacks(self._cb_delete_attribute, callbackArgs=(uri, check_etag)) def _cb_put_attribute(self, response, uri, attribute, check_etag): """This is called when the document that relates to the element is retrieved.""" if response.code == 404: raise errors.NoParentError document = response.data xml_doc = etree.parse(StringIO(document)) application = getApplicationForURI(uri) ns_dict = uri.node_selector.get_ns_bindings(application.default_ns) try: elem = xml_doc.xpath(uri.node_selector.replace_default_prefix(append_terminal=False),namespaces=ns_dict) except Exception, ex: ex.http_error = errors.NoParentError() raise if not elem: raise errors.NoParentError if len(elem) != 1: raise errors.NoParentError('XPATH expression is ambiguous') elem = elem[0] attr_name = uri.node_selector.terminal_selector.attribute elem.set(attr_name, attribute) new_document = etree.tostring(xml_doc, encoding='UTF-8', xml_declaration=True) return self.put_document(uri, new_document, check_etag) def put_attribute(self, uri, attribute, check_etag): ## TODO verify if the attribute is valid d = self.get_document(uri, check_etag) return d.addCallbacks(self._cb_put_attribute, callbackArgs=(uri, attribute, check_etag)) ## Namespace Bindings def _cb_get_ns_bindings(self, response, uri): """This is called when the document that relates to the element is retrieved.""" if response.code == 404: raise errors.ResourceNotFound document = response.data xml_doc = etree.parse(StringIO(document)) application = getApplicationForURI(uri) ns_dict = uri.node_selector.get_ns_bindings(application.default_ns) try: elem = xml_doc.xpath(uri.node_selector.replace_default_prefix(append_terminal=False),namespaces=ns_dict) except Exception, ex: ex.http_error = errors.ResourceNotFound() raise if not elem: raise errors.ResourceNotFound elif len(elem)!=1: raise errors.ResourceNotFound('XPATH expression is ambiguous') elem = elem[0] namespaces = '' for prefix, ns in elem.nsmap.items(): namespaces += ' xmlns%s="%s"' % (prefix and ':%s' % prefix or '', ns) result = '<%s %s/>' % (elem.tag, namespaces) return StatusResponse(200, response.etag, result) def get_ns_bindings(self, uri, check_etag): d = self.get_document(uri, check_etag) return d.addCallbacks(self._cb_get_ns_bindings, callbackArgs=(uri, )) from xcap.appusage.capabilities import XCAPCapabilitiesApplication from xcap.appusage.dialogrules import DialogRulesApplication from xcap.appusage.directory import XCAPDirectoryApplication from xcap.appusage.pidf import PIDFManipulationApplication from xcap.appusage.prescontent import PresContentApplication from xcap.appusage.presrules import PresenceRulesApplication from xcap.appusage.purge import PurgeApplication from xcap.appusage.resourcelists import ResourceListsApplication from xcap.appusage.rlsservices import RLSServicesApplication from xcap.appusage.test import TestApplication from xcap.appusage.watchers import WatchersApplication storage = ServerConfig.backend.Storage() applications = { DialogRulesApplication.id: DialogRulesApplication(storage), PIDFManipulationApplication.id: PIDFManipulationApplication(storage), PresenceRulesApplication.id: PresenceRulesApplication(storage), PresenceRulesApplication.oma_id: PresenceRulesApplication(storage), PurgeApplication.id: PurgeApplication(storage), ResourceListsApplication.id: ResourceListsApplication(storage), RLSServicesApplication.id: RLSServicesApplication(storage), TestApplication.id: TestApplication(storage), WatchersApplication.id: WatchersApplication(storage), XCAPCapabilitiesApplication.id: XCAPCapabilitiesApplication(), XCAPDirectoryApplication.id: XCAPDirectoryApplication(storage) } # public GET applications (GET is not challenged for auth) public_get_applications = {PresContentApplication.id: PresContentApplication(storage)} applications.update(public_get_applications) for application in ServerConfig.disabled_applications: applications.pop(application, None) namespaces = dict((k, v.default_ns) for (k, v) in applications.items()) def getApplicationForURI(xcap_uri): return applications.get(xcap_uri.application_id, None) __all__ = ['applications', 'namespaces', 'public_get_applications', 'getApplicationForURI', 'ApplicationUsage', 'Backend'] diff --git a/xcap/appusage/capabilities.py b/xcap/appusage/capabilities.py index fe0fe19..f4e18f2 100644 --- a/xcap/appusage/capabilities.py +++ b/xcap/appusage/capabilities.py @@ -1,47 +1,44 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from lxml import etree from twisted.internet import defer from xcap import errors from xcap.appusage import ApplicationUsage from xcap.dbutil import make_etag from xcap.backend import StatusResponse class XCAPCapabilitiesApplication(ApplicationUsage): ## RFC 4825 id = "xcap-caps" default_ns = "urn:ietf:params:xml:ns:xcap-caps" mime_type= "application/xcap-caps+xml" def __init__(self): pass def _get_document(self): if hasattr(self, 'doc'): return self.doc, self.etag root = etree.Element("xcap-caps", nsmap={None: self.default_ns}) auids = etree.SubElement(root, "auids") extensions = etree.SubElement(root, "extensions") namespaces = etree.SubElement(root, "namespaces") from xcap.appusage import applications for (id, app) in applications.items(): etree.SubElement(auids, "auid").text = id etree.SubElement(namespaces, "namespace").text = app.default_ns self.doc = etree.tostring(root, encoding="UTF-8", pretty_print=True, xml_declaration=True) self.etag = make_etag('xcap-caps', self.doc) return self.doc, self.etag def get_document_global(self, uri, check_etag): doc, etag = self._get_document() return defer.succeed(StatusResponse(200, etag=etag, data=doc)) def get_document_local(self, uri, check_etag): self._not_implemented('users') def put_document(self, uri, document, check_etag): raise errors.ResourceNotFound("This application does not support PUT method") diff --git a/xcap/appusage/dialogrules.py b/xcap/appusage/dialogrules.py index 3ccb082..b497570 100644 --- a/xcap/appusage/dialogrules.py +++ b/xcap/appusage/dialogrules.py @@ -1,13 +1,10 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from xcap.appusage import ApplicationUsage class DialogRulesApplication(ApplicationUsage): id = "org.openxcap.dialog-rules" default_ns = "http://openxcap.org/ns/dialog-rules" mime_type = "application/auth-policy+xml" schema_file = 'common-policy.xsd' diff --git a/xcap/appusage/directory.py b/xcap/appusage/directory.py index 970bc83..1ba61d8 100644 --- a/xcap/appusage/directory.py +++ b/xcap/appusage/directory.py @@ -1,42 +1,39 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from lxml import etree from twisted.internet import defer from xcap import errors from xcap.appusage import ApplicationUsage from xcap.backend import StatusResponse class XCAPDirectoryApplication(ApplicationUsage): id = "org.openmobilealliance.xcap-directory" default_ns = "urn:oma:xml:xdm:xcap-directory" mime_type= "application/vnd.oma.xcap-directory+xml" schema_file = "xcap-directory.xsd" def _docs_to_xml(self, docs, uri): sip_uri = "sip:%s@%s" % (uri.user.username, uri.user.domain) root = etree.Element("xcap-directory", nsmap={None: self.default_ns}) if docs: for k, v in docs.iteritems(): folder = etree.SubElement(root, "folder", attrib={'auid': k}) for item in v: # We may have more than one document for the same application entry_uri = "%s/%s/users/%s/%s" % (uri.xcap_root, k, sip_uri, item[0]) entry = etree.SubElement(folder, "entry") entry.set("uri", entry_uri) entry.set("etag", item[1]) doc = etree.tostring(root, encoding="UTF-8", pretty_print=True, xml_declaration=True) #self.validate_document(doc) return defer.succeed(StatusResponse(200, etag=None, data=doc)) def get_document_local(self, uri, check_etag): docs_def = self.storage.get_documents_list(uri) docs_def.addCallback(self._docs_to_xml, uri) return docs_def def put_document(self, uri, document, check_etag): raise errors.ResourceNotFound("This application does not support PUT method") diff --git a/xcap/appusage/pidf.py b/xcap/appusage/pidf.py index 80b1129..f6fe2c2 100644 --- a/xcap/appusage/pidf.py +++ b/xcap/appusage/pidf.py @@ -1,14 +1,11 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from xcap.appusage import ApplicationUsage class PIDFManipulationApplication(ApplicationUsage): ## RFC 4827 id = "pidf-manipulation" default_ns = "urn:ietf:params:xml:ns:pidf" mime_type= "application/pidf+xml" schema_file = 'pidf.xsd' diff --git a/xcap/appusage/prescontent.py b/xcap/appusage/prescontent.py index 66e964d..cc411c3 100644 --- a/xcap/appusage/prescontent.py +++ b/xcap/appusage/prescontent.py @@ -1,52 +1,49 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from cStringIO import StringIO from lxml import etree from xcap import errors from xcap.appusage import ApplicationUsage class PresContentApplication(ApplicationUsage): id = "org.openmobilealliance.pres-content" default_ns = "urn:oma:xml:prs:pres-content" mime_type = "application/vnd.oma.pres-content+xml" icon_mime_types = ('image/jpeg', 'image/gif', 'image/png') icon_encoding = 'base64' icon_max_size = 300*1024 def _validate_icon(self, document): mime_type = None encoding = None data = None xml = StringIO(document) try: tree = etree.parse(xml) root = tree.getroot() ns = root.nsmap[None] for element in root: if element.tag == "{%s}mime-type" % ns: mime_type = element.text.lower() if element.tag == "{%s}encoding" % ns: encoding = element.text.lower() if element.tag == "{%s}data" % ns: data = element.text except etree.ParseError: raise errors.NotWellFormedError() else: if mime_type not in self.icon_mime_types: raise errors.ConstraintFailureError(phrase="Unsupported MIME type. Allowed MIME types: %s" % ','.join(self.icon_mime_types)) if encoding != self.icon_encoding: raise errors.ConstraintFailureError(phrase="Unsupported encoding. Allowed enconding: %s" % self.icon_encoding) if data is None: raise errors.ConstraintFailureError(phrase="No icon data was provided") if len(data) > self.icon_max_size: raise errors.ConstraintFailureError(phrase="Size limit exceeded, maximum allowed size is %d bytes" % self.icon_max_size) def put_document(self, uri, document, check_etag): if uri.doc_selector.document_path.startswith('oma_status-icon'): self._validate_icon(document) return self.storage.put_document(uri, document, check_etag) diff --git a/xcap/appusage/presrules.py b/xcap/appusage/presrules.py index 7d8a453..913a108 100644 --- a/xcap/appusage/presrules.py +++ b/xcap/appusage/presrules.py @@ -1,119 +1,116 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from application.configuration import ConfigSection, ConfigSetting from cStringIO import StringIO from lxml import etree from urllib import unquote import xcap from xcap import errors from xcap.appusage import ApplicationUsage from xcap.datatypes import XCAPRootURI from xcap.uri import XCAPUri from xcap.xpath import DocumentSelectorError, NodeParsingError class AuthenticationConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Authentication' default_realm = ConfigSetting(type=str, value=None) class ServerConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Server' allow_external_references = False root = ConfigSetting(type=XCAPRootURI, value=None) def parseExternalListURI(node_uri, default_realm): from xcap.appusage import namespaces xcap_root = None for uri in ServerConfig.root.uris: if node_uri.startswith(uri): xcap_root = uri break if xcap_root is None: raise errors.ConstraintFailureError("XCAP root not found for URI: %s" % node_uri) resource_selector = node_uri[len(xcap_root):] if not resource_selector or resource_selector == '/': raise errors.ConstraintFailureError("Resource selector missing") try: uri = XCAPUri(xcap_root, resource_selector, namespaces) except (DocumentSelectorError, NodeParsingError), e: raise errors.ConstraintFailureError(phrase=str(e)) else: if uri.user.domain is None: uri.user.domain = default_realm return uri class PresenceRulesApplication(ApplicationUsage): id = "pres-rules" oma_id = "org.openmobilealliance.pres-rules" default_ns = "urn:ietf:params:xml:ns:pres-rules" mime_type = "application/auth-policy+xml" schema_file = 'presence-rules.xsd' def _check_external_list(self, external_list, node_uri): if not external_list: return external_list = unquote(external_list) external_list_uri = parseExternalListURI(external_list, AuthenticationConfig.default_realm) if external_list_uri.xcap_root != node_uri.xcap_root: raise errors.ConstraintFailureError(phrase="XCAP root in the external list doesn't match PUT requests'") if external_list_uri.user != node_uri.user: raise errors.ConstraintFailureError(phrase="Cannot link to another user's list") def _validate_rules(self, document, node_uri): common_policy_namespace = 'urn:ietf:params:xml:ns:common-policy' oma_namespace = 'urn:oma:xml:xdm:common-policy' actions_tag = '{%s}actions' % common_policy_namespace conditions_tag = '{%s}conditions' % common_policy_namespace identity_tag = '{%s}identity' % common_policy_namespace rule_tag = '{%s}rule' % common_policy_namespace transformations_tag = '{%s}transformations' % common_policy_namespace sub_handling_tag = '{%s}sub-handling' % self.default_ns oma_anonymous_request_tag = '{%s}anonymous-request' % oma_namespace oma_entry_tag = '{%s}entry' % oma_namespace oma_external_list_tag = '{%s}external-list' % oma_namespace oma_other_identity_tag = '{%s}other-identity' % oma_namespace try: xml = StringIO(document) tree = etree.parse(xml) root = tree.getroot() if oma_namespace in root.nsmap.values(): # Condition constraints for element in root.iter(conditions_tag): if any([len(element.findall(item)) > 1 for item in (identity_tag, oma_external_list_tag, oma_other_identity_tag, oma_anonymous_request_tag)]): raise errors.ConstraintFailureError(phrase="Complex rules are not allowed") # Transformations constraints for rule in root.iter(rule_tag): actions = rule.find(actions_tag) if actions is not None: sub_handling = actions.find(sub_handling_tag) transformations = rule.find(transformations_tag) if sub_handling is not None and sub_handling.text != 'allow' and transformations is not None and transformations.getchildren(): raise errors.ConstraintFailureError(phrase="transformations element not allowed") # External list constraints if not ServerConfig.allow_external_references: for element in root.iter(oma_external_list_tag): for entry in element.iter(oma_entry_tag): self._check_external_list(entry.attrib.get('anc', None), node_uri) except etree.ParseError: raise errors.NotWellFormedError() def put_document(self, uri, document, check_etag): self.validate_document(document) self._validate_rules(document, uri) return self.storage.put_document(uri, document, check_etag) diff --git a/xcap/appusage/purge.py b/xcap/appusage/purge.py index 06bc1ca..6c08241 100644 --- a/xcap/appusage/purge.py +++ b/xcap/appusage/purge.py @@ -1,26 +1,24 @@ -# Copyright (C) 2011 AG-Projects. -# from xcap import errors from xcap.appusage import ApplicationUsage from xcap.backend import StatusResponse class PurgeApplication(ApplicationUsage): id = "org.openxcap.purge" default_ns = "http://openxcap.org/ns/purge" def _purge_cb(self, result, uri): return StatusResponse(200) def get_document_local(self, uri, check_etag): d = self.storage.delete_documents(uri) d.addCallback(self._purge_cb, uri) return d def put_document(self, uri, document, check_etag): raise errors.ResourceNotFound("This application does not support PUT method") def delete_document(self, uri, document, check_etag): raise errors.ResourceNotFound("This application does not support DELETE method") diff --git a/xcap/appusage/resourcelists.py b/xcap/appusage/resourcelists.py index 5e534e1..8d57bf7 100644 --- a/xcap/appusage/resourcelists.py +++ b/xcap/appusage/resourcelists.py @@ -1,152 +1,149 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from application.configuration import ConfigSection, ConfigSetting from cStringIO import StringIO from lxml import etree from urllib import unquote from urlparse import urlparse import xcap from xcap import errors from xcap.appusage import ApplicationUsage from xcap.datatypes import XCAPRootURI from xcap.uri import XCAPUri from xcap.xpath import DocumentSelectorError, NodeParsingError class AuthenticationConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Authentication' default_realm = ConfigSetting(type=str, value=None) class ServerConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Server' allow_external_references = False root = ConfigSetting(type=XCAPRootURI, value=None) def parseExternalListURI(node_uri, default_realm): from xcap.appusage import namespaces xcap_root = None for uri in ServerConfig.root.uris: if node_uri.startswith(uri): xcap_root = uri break if xcap_root is None: raise errors.ConstraintFailureError("XCAP root not found for URI: %s" % node_uri) resource_selector = node_uri[len(xcap_root):] if not resource_selector or resource_selector == '/': raise errors.ConstraintFailureError("Resource selector missing") try: uri = XCAPUri(xcap_root, resource_selector, namespaces) except (DocumentSelectorError, NodeParsingError), e: raise errors.ConstraintFailureError(phrase=str(e)) else: if uri.user.domain is None: uri.user.domain = default_realm return uri def get_xpath(elem): """Return XPATH expression to obtain elem in the document. This could be done better, of course, not using stars, but the real tags. But that would be much more complicated and I'm not sure if such effort is justified""" res = '' while elem is not None: parent = elem.getparent() if parent is None: res = '/*' + res else: res = '/*[%s]' % parent.index(elem) + res elem = parent return res def attribute_not_unique(elem, attr): raise errors.UniquenessFailureError(exists = get_xpath(elem) + '/@' + attr) class ResourceListsApplication(ApplicationUsage): # RFC 4826 id = "resource-lists" default_ns = "urn:ietf:params:xml:ns:resource-lists" mime_type= "application/resource-lists+xml" schema_file = 'resource-lists.xsd' @classmethod def check_list(cls, element, node_uri): from xcap.authentication import parseNodeURI entry_tag = "{%s}entry" % cls.default_ns entry_ref_tag = "{%s}entry-ref" % cls.default_ns external_tag ="{%s}external" % cls.default_ns list_tag = "{%s}list" % cls.default_ns anchor_attrs = set() name_attrs = set() ref_attrs = set() uri_attrs = set() for child in element.getchildren(): if child.tag == list_tag: name = child.get("name") if name in name_attrs: attribute_not_unique(child, 'name') else: name_attrs.add(name) cls.check_list(child, node_uri) elif child.tag == entry_tag: uri = child.get("uri") if uri in uri_attrs: attribute_not_unique(child, 'uri') else: uri_attrs.add(uri) elif child.tag == entry_ref_tag: ref = child.get("ref") if ref in ref_attrs: attribute_not_unique(child, 'ref') else: try: ref = unquote(ref) ref_uri = parseNodeURI("%s/%s" % (node_uri.xcap_root, ref), AuthenticationConfig.default_realm) if not ServerConfig.allow_external_references and ref_uri.user != node_uri.user: raise errors.ConstraintFailureError(phrase="Cannot link to another users' list") try: if ref_uri.node_selector.element_selector[-1].name[1] != "entry": raise ValueError except LookupError: raise ValueError except (DocumentSelectorError, NodeParsingError), e: raise errors.ConstraintFailureError(phrase=str(e)) except ValueError: raise errors.ConstraintFailureError else: ref_attrs.add(ref) elif child.tag == external_tag: anchor = child.get("anchor") if anchor in anchor_attrs: attribute_not_unique(child, 'anchor') else: anchor = unquote(anchor) if not ServerConfig.allow_external_references: external_list_uri = parseExternalListURI(anchor, AuthenticationConfig.default_realm) if external_list_uri.xcap_root != node_uri.xcap_root: raise errors.ConstraintFailureError(phrase="XCAP root in the external list doesn't match PUT requests'") if external_list_uri.user != node_uri.user: raise errors.ConstraintFailureError(phrase="Cannot link to another users' list") else: parsed_url = urlparse(anchor) if parsed_url.scheme not in ('http', 'https'): raise errors.ConstraintFailureError(phrase='Specified anchor is not a valid URL') else: anchor_attrs.add(anchor) def put_document(self, uri, document, check_etag): self.validate_document(document) # Check additional constraints (see section 3.4.5 of RFC 4826) xml_doc = etree.parse(StringIO(document)) self.check_list(xml_doc.getroot(), uri) return self.storage.put_document(uri, document, check_etag) diff --git a/xcap/appusage/rlsservices.py b/xcap/appusage/rlsservices.py index 0745020..757d096 100644 --- a/xcap/appusage/rlsservices.py +++ b/xcap/appusage/rlsservices.py @@ -1,20 +1,17 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from xcap.appusage import ApplicationUsage from xcap.appusage.resourcelists import ResourceListsApplication class RLSServicesApplication(ApplicationUsage): ## RFC 4826 id = "rls-services" default_ns = "urn:ietf:params:xml:ns:rls-services" mime_type= "application/rls-services+xml" schema_file = 'rls-services.xsd' def _check_additional_constraints(self, xml_doc): """Check additional constraints (see section 3.4.5 of RFC 4826).""" ResourceListsApplication.check_list(xml_doc.getroot(), "{%s}list" % self.default_ns) diff --git a/xcap/appusage/test.py b/xcap/appusage/test.py index 7087b95..09636d8 100644 --- a/xcap/appusage/test.py +++ b/xcap/appusage/test.py @@ -1,14 +1,11 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from xcap.appusage import ApplicationUsage class TestApplication(ApplicationUsage): "Application for tests described in Section 8.2.3. Creation of RFC 4825" id = "test-app" default_ns = 'test-app' mime_type= "application/test-app+xml" schema_file = None diff --git a/xcap/appusage/watchers.py b/xcap/appusage/watchers.py index cf741cf..50d7a30 100644 --- a/xcap/appusage/watchers.py +++ b/xcap/appusage/watchers.py @@ -1,38 +1,35 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - from lxml import etree from xcap import errors from xcap.appusage import ApplicationUsage from xcap.dbutil import make_etag from xcap.backend import StatusResponse class WatchersApplication(ApplicationUsage): id = "org.openxcap.watchers" default_ns = "http://openxcap.org/ns/watchers" mime_type= "application/xml" schema_file = 'watchers.xsd' # who needs schema for readonly application? def _watchers_to_xml(self, watchers, uri, check_etag): root = etree.Element("watchers", nsmap={None: self.default_ns}) for watcher in watchers: watcher_elem = etree.SubElement(root, "watcher") for name, value in watcher.iteritems(): etree.SubElement(watcher_elem, name).text = value doc = etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=True) #self.validate_document(doc) etag = make_etag(uri, doc) check_etag(etag) return StatusResponse(200, data=doc, etag=etag) def get_document_local(self, uri, check_etag): watchers_def = self.storage.get_watchers(uri) watchers_def.addCallback(self._watchers_to_xml, uri, check_etag) return watchers_def def put_document(self, uri, document, check_etag): raise errors.ResourceNotFound("This application does not support PUT method") diff --git a/xcap/authentication.py b/xcap/authentication.py index 22fe7d3..5ca2d78 100644 --- a/xcap/authentication.py +++ b/xcap/authentication.py @@ -1,339 +1,336 @@ -# Copyright (C) 2007-2010 AG Projects. -# - """XCAP authentication module""" # XXX this module should be either renamed or refactored as it does more then just auth. from xcap import tweaks; tweaks.tweak_BasicCredentialFactory() from zope.interface import Interface, implements from twisted.internet import defer from twisted.python import failure from twisted.cred import credentials, portal, checkers, error as credError from twisted.web2 import http, server, stream, responsecode, http_headers from twisted.web2.auth.wrapper import HTTPAuthResource, UnauthorizedResponse from application.configuration.datatypes import NetworkRangeList from application.configuration import ConfigSection, ConfigSetting import struct import socket import urlparse import xcap from xcap.datatypes import XCAPRootURI from xcap.appusage import getApplicationForURI, namespaces, public_get_applications from xcap.errors import ResourceNotFound from xcap.uri import XCAPUser, XCAPUri # body of 404 error message to render when user requests xcap-root # it's html, because XCAP root is often published on the web. # NOTE: there're no plans to convert other error messages to html. # Since a web-browser is not the primary tool for accessing XCAP server, text/plain # is easier for clients to present to user/save to logs/etc. WELCOME = ('Not Found' '

Not Found

XCAP server does not serve anything ' 'directly under XCAP Root URL. You have to be more specific.' '

' '
OpenXCAP/%s
' '') % xcap.__version__ class AuthenticationConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Authentication' default_realm = ConfigSetting(type=str, value=None) trusted_peers = ConfigSetting(type=NetworkRangeList, value=NetworkRangeList('none')) class ServerConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Server' root = ConfigSetting(type=XCAPRootURI, value=None) def generateWWWAuthenticate(headers): _generated = [] for seq in headers: scheme, challenge = seq[0], seq[1] # If we're going to parse out to something other than a dict # we need to be able to generate from something other than a dict try: l = [] for k,v in dict(challenge).iteritems(): l.append("%s=%s" % (k, k in ("algorithm", "stale") and v or http_headers.quoteString(v))) _generated.append("%s %s" % (scheme, ", ".join(l))) except ValueError: _generated.append("%s %s" % (scheme, challenge)) return _generated http_headers.generator_response_headers["WWW-Authenticate"] = (generateWWWAuthenticate,) http_headers.DefaultHTTPHandler.updateGenerators(http_headers.generator_response_headers) del generateWWWAuthenticate def parseNodeURI(node_uri, default_realm): """Parses the given Node URI, containing the XCAP root, document selector, and node selector, and returns an XCAPUri instance if succesful.""" xcap_root = None for uri in ServerConfig.root.uris: if node_uri.startswith(uri): xcap_root = uri break if xcap_root is None: raise ResourceNotFound("XCAP root not found for URI: %s" % node_uri) resource_selector = node_uri[len(xcap_root):] if not resource_selector or resource_selector=='/': raise ResourceNotFound(WELCOME, http_headers.MimeType("text", "html")) r = XCAPUri(xcap_root, resource_selector, namespaces) if r.user.domain is None: r.user.domain = default_realm return r class ITrustedPeerCredentials(credentials.ICredentials): def checkPeer(self, trusted_peers): pass class TrustedPeerCredentials(object): implements(ITrustedPeerCredentials) def __init__(self, peer): self.peer = peer def checkPeer(self, trusted_peers): for range in trusted_peers: if struct.unpack('!L', socket.inet_aton(self.peer))[0] & range[1] == range[0]: return True return False class IPublicGetApplicationCredentials(credentials.ICredentials): def checkApplication(self): pass class PublicGetApplicationCredentials(object): implements(IPublicGetApplicationCredentials) def checkApplication(self): return True ## credentials checkers class TrustedPeerChecker(object): implements(checkers.ICredentialsChecker) credentialInterfaces = (ITrustedPeerCredentials,) def __init__(self, trusted_peers): self.trusted_peers = trusted_peers def requestAvatarId(self, credentials): """Return the avatar ID for the credentials which must have a 'peer' attribute, or an UnauthorizedLogin in case of a failure.""" if credentials.checkPeer(self.trusted_peers): return defer.succeed(credentials.peer) return defer.fail(credError.UnauthorizedLogin()) class PublicGetApplicationChecker(object): implements(checkers.ICredentialsChecker) credentialInterfaces = (IPublicGetApplicationCredentials,) def requestAvatarId(self, credentials): """We already know that the method is GET and the application is a 'public GET application', we just need to say that the authentication succeeded.""" if credentials.checkApplication(): return defer.succeed(None) return defer.fail(credError.UnauthorizedLogin()) ## avatars class IAuthUser(Interface): pass class ITrustedPeer(Interface): pass class IPublicGetApplication(Interface): pass class AuthUser(str): """Authenticated XCAP User avatar.""" implements(IAuthUser) class TrustedPeer(str): """Trusted peer avatar.""" implements(ITrustedPeer) class PublicGetApplication(str): """Public get application avatar.""" implements(IPublicGetApplication) ## realm class XCAPAuthRealm(object): """XCAP authentication realm. Receives an avatar ID (a string identifying the user) and a list of interfaces the avatar needs to support. It returns an avatar that encapsulates data about that user.""" implements(portal.IRealm) def requestAvatar(self, avatarId, mind, *interfaces): if IAuthUser in interfaces: return IAuthUser, AuthUser(avatarId) elif ITrustedPeer in interfaces: return ITrustedPeer, TrustedPeer(avatarId) elif IPublicGetApplication in interfaces: return IPublicGetApplication, PublicGetApplication(avatarId) raise NotImplementedError("Only IAuthUser and ITrustedPeer interfaces are supported") def get_cred(request, default_realm): auth = request.headers.getHeader('authorization') if auth: typ, data = auth if typ == 'basic': return data.decode('base64').split(':', 1)[0], default_realm elif typ == 'digest': raise NotImplementedError return None, default_realm ## authentication wrapper for XCAP resources class XCAPAuthResource(HTTPAuthResource): def allowedMethods(self): return ('GET', 'PUT', 'DELETE') def _updateRealm(self, realm): """Updates the realm of the attached credential factories.""" for factory in self.credentialFactories.values(): factory.realm = realm def authenticate(self, request): """Authenticates an XCAP request.""" parsed_url = urlparse.urlparse(request.uri) if request.port in (80, 443): uri = request.scheme + "://" + request.host + parsed_url.path else: uri = request.scheme + "://" + request.host + ":" + str(request.port) + parsed_url.path if parsed_url.query: uri += "?%s" % parsed_url.query xcap_uri = parseNodeURI(uri, AuthenticationConfig.default_realm) request.xcap_uri = xcap_uri if xcap_uri.doc_selector.context=='global': return defer.succeed(self.wrappedResource) ## For each request the authentication realm must be ## dinamically deducted from the XCAP request URI realm = xcap_uri.user.domain if realm is None: raise ResourceNotFound('Unknown domain (the domain part of "username@domain" is required because this server has no default domain)') if not xcap_uri.user.username: # for 'global' requests there's no username@domain in the URI, # so we will use username and domain from Authorization header xcap_uri.user.username, xcap_uri.user.domain = get_cred(request, AuthenticationConfig.default_realm) self._updateRealm(realm) # If we receive a GET to a 'public GET application' we will not authenticate it if request.method == "GET" and public_get_applications.has_key(xcap_uri.application_id): return self.portal.login(PublicGetApplicationCredentials(), None, IPublicGetApplication ).addCallbacks(self._loginSucceeded, self._publicGetApplicationLoginFailed, (request,), None, (request,), None) remote_addr = request.remoteAddr.host if AuthenticationConfig.trusted_peers: return self.portal.login(TrustedPeerCredentials(remote_addr), None, ITrustedPeer ).addCallbacks(self._loginSucceeded, self._trustedPeerLoginFailed, (request,), None, (request,), None) return HTTPAuthResource.authenticate(self, request) def _trustedPeerLoginFailed(self, result, request): """If the peer is not trusted, fallback to HTTP basic/digest authentication.""" return HTTPAuthResource.authenticate(self, request) def _publicGetApplicationLoginFailed(self, result, request): return HTTPAuthResource.authenticate(self, request) def _loginSucceeded(self, avatar, request): """Authorizes an XCAP request after it has been authenticated.""" interface, avatar_id = avatar ## the avatar is the authenticated XCAP User xcap_uri = request.xcap_uri application = getApplicationForURI(xcap_uri) if not application: raise ResourceNotFound if interface is IAuthUser and application.is_authorized(XCAPUser.parse(avatar_id), xcap_uri): return HTTPAuthResource._loginSucceeded(self, avatar, request) elif interface is ITrustedPeer or interface is IPublicGetApplication: return HTTPAuthResource._loginSucceeded(self, avatar, request) else: return failure.Failure( http.HTTPError( UnauthorizedResponse( self.credentialFactories, request.remoteAddr))) def locateChild(self, request, seg): """ Authenticate the request then return the C{self.wrappedResource} and the unmodified segments. We're not using path location, we want to fall back to the renderHTTP() call. """ #return self.authenticate(request), seg return self, server.StopTraversal def renderHTTP(self, request): """ Authenticate the request then return the result of calling renderHTTP on C{self.wrappedResource} """ if request.method not in self.allowedMethods(): response = http.Response(responsecode.NOT_ALLOWED) response.headers.setHeader("allow", self.allowedMethods()) return response def _renderResource(resource): return resource.renderHTTP(request) def _finished_reading(ignore, result): data = ''.join(result) request.attachment = data d = self.authenticate(request) d.addCallback(_renderResource) return d if request.method in ('PUT', 'DELETE'): # we need to authenticate the request after all the attachment stream # has been read # QQQ DELETE doesn't have any attachments, does it? nor does GET. # QQQ Reading attachment when there isn't one won't hurt, will it? # QQQ So why don't we just do it all the time for all requests? data = [] d = stream.readStream(request.stream, data.append) d.addCallback(_finished_reading, data) return d else: d = self.authenticate(request) d.addCallback(_renderResource) return d diff --git a/xcap/backend/__init__.py b/xcap/backend/__init__.py index c90c897..010e65c 100644 --- a/xcap/backend/__init__.py +++ b/xcap/backend/__init__.py @@ -1,55 +1,52 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - """Interface to the backend subsystem""" __all__ = ['database', 'opensips'] from zope.interface import Interface class StatusResponse(object): def __init__(self, code, etag=None, data=None, old_etag=None): self.code = code self.etag = etag self.data = data self.old_etag = old_etag @property def succeed(self): return 200 <= self.code <= 299 class StorageError(Exception): pass class IStorage(Interface): """Storage interface. It defines the methods an XCAP storage class must implement.""" def get_document(self, uri, check_etag): """Fetch an XCAP document. @param uri: an XCAP URI that contains the XCAP user and the document selector @param check_etag: a callable used to check the etag of the stored document @returns: a deferred that'll be fired when the document is fetched""" def put_document(self, uri, document, check_etag): """Insert or replace an XCAP document. @param uri: an XCAP URI that contains the XCAP user and the document selector @param document: the XCAP document @param check_etag: a callable used to check the etag of the stored document @returns: a deferred that'll be fired when the action was completed.""" def delete_document(self, uri, check_etag): """Delete an XCAP document. @param uri: an XCAP URI that contains the XCAP user and the document selector @param check_etag: a callable used to check the etag of the document to be deleted """ diff --git a/xcap/backend/database.py b/xcap/backend/database.py index b449065..7ece902 100644 --- a/xcap/backend/database.py +++ b/xcap/backend/database.py @@ -1,410 +1,407 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - """Implementation of a database backend.""" import sys from application import log from application.configuration import ConfigSection from application.python.types import Singleton from zope.interface import implements from twisted.cred import credentials, checkers, error as credError from twisted.internet import defer from _mysql_exceptions import IntegrityError import xcap from xcap.backend import IStorage, StatusResponse from xcap.dbutil import connectionForURI, repeat_on_error, make_random_etag class Config(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Database' authentication_db_uri = '' storage_db_uri = '' subscriber_table = 'subscriber' user_col = 'username' domain_col = 'domain' password_col = 'password' ha1_col = 'ha1' xcap_table = 'xcap' if not Config.authentication_db_uri or not Config.storage_db_uri: log.fatal("Authentication DB URI and Storage DB URI must be provided") sys.exit(1) class DBBase(object): def __init__(self): self._db_connect() class PasswordChecker(DBBase): """A credentials checker against a database subscriber table.""" implements(checkers.ICredentialsChecker) credentialInterfaces = (credentials.IUsernamePassword, credentials.IUsernameHashedPassword) def _db_connect(self): self.conn = auth_db_connection(Config.authentication_db_uri) def _query_credentials(self, credentials): raise NotImplementedError def _got_query_results(self, rows, credentials): if not rows: raise credError.UnauthorizedLogin("Unauthorized login") else: return self._authenticate_credentials(rows[0][0], credentials) def _authenticate_credentials(self, password, credentials): raise NotImplementedError def _checkedPassword(self, matched, username, realm): if matched: username = username.split('@', 1)[0] ## this is the avatar ID return "%s@%s" % (username, realm) else: raise credError.UnauthorizedLogin("Unauthorized login") def requestAvatarId(self, credentials): """Return the avatar ID for the credentials which must have the username and realm attributes, or an UnauthorizedLogin in case of a failure.""" d = self._query_credentials(credentials) return d class PlainPasswordChecker(PasswordChecker): """A credentials checker against a database subscriber table, where the passwords are stored in plain text.""" implements(checkers.ICredentialsChecker) def _query_credentials(self, credentials): username, domain = credentials.username.split('@', 1)[0], credentials.realm query = """SELECT %(password_col)s FROM %(table)s WHERE %(user_col)s = %%(username)s AND %(domain_col)s = %%(domain)s""" % { "password_col": Config.password_col, "user_col": Config.user_col, "domain_col": Config.domain_col, "table": Config.subscriber_table } params = {"username": username, "domain": domain} return self.conn.runQuery(query, params).addCallback(self._got_query_results, credentials) def _authenticate_credentials(self, hash, credentials): return defer.maybeDeferred( credentials.checkPassword, hash).addCallback( self._checkedPassword, credentials.username, credentials.realm) class HashPasswordChecker(PasswordChecker): """A credentials checker against a database subscriber table, where the passwords are stored as MD5 hashes.""" implements(checkers.ICredentialsChecker) def _query_credentials(self, credentials): username, domain = credentials.username.split('@', 1)[0], credentials.realm query = """SELECT %(ha1_col)s FROM %(table)s WHERE %(user_col)s = %%(username)s AND %(domain_col)s = %%(domain)s""" % { "ha1_col": Config.ha1_col, "user_col": Config.user_col, "domain_col": Config.domain_col, "table": Config.subscriber_table} params = {"username": username, "domain": domain} return self.conn.runQuery(query, params).addCallback(self._got_query_results, credentials) def _authenticate_credentials(self, hash, credentials): return defer.maybeDeferred( credentials.checkHash, hash).addCallback( self._checkedPassword, credentials.username, credentials.realm) class Error(Exception): def __init__(self): if hasattr(self, 'msg'): return Exception.__init__(self, self.msg) else: return Exception.__init__(self) class RaceError(Error): """The errors of this type are raised for the requests that failed because of concurrent modification of the database by other clients. For example, before DELETE we do SELECT first, to check that a document of the right etag exists. The actual check is performed by a function in twisted that is passed as a callback. Then etag from the SELECT request is used in the DELETE request. This seems unnecessary convoluted and probably should be changed to 'DELETE .. WHERE etag=ETAG'. We still need to find out whether DELETE was actually performed. """ class UpdateFailed(RaceError): msg = 'UPDATE request failed' class DeleteFailed(RaceError): msg = 'DELETE request failed' class MultipleResultsError(Error): """This should never happen. If it did happen. that means either the table was corrupted or there's a logic error""" def __init__(self, params): Exception.__init__(self, 'database request has more than one result: ' + repr(params)) class Storage(DBBase): __metaclass__ = Singleton implements(IStorage) app_mapping = {"pres-rules" : 1<<1, "resource-lists" : 1<<2, "rls-services" : 1<<3, "pidf-manipulation" : 1<<4, "org.openmobilealliance.pres-rules" : 1<<5, "org.openmobilealliance.pres-content" : 1<<6, "org.openxcap.dialog-rules" : 1<<7, "test-app" : 0} def _db_connect(self): self.conn = storage_db_connection(Config.storage_db_uri) def _normalize_document_path(self, uri): if uri.application_id in ("pres-rules", "org.openmobilealliance.pres-rules"): # some clients e.g. counterpath's eyebeam save presence rules under # different filenames between versions and they expect to find the same # information, thus we are forcing all presence rules documents to be # saved under "index.xml" default filename uri.doc_selector.document_path = "index.xml" def _get_document(self, trans, uri, check_etag): username, domain = uri.user.username, uri.user.domain self._normalize_document_path(uri) doc_type = self.app_mapping[uri.application_id] query = """SELECT doc, etag FROM %(table)s WHERE username = %%(username)s AND domain = %%(domain)s AND doc_type= %%(doc_type)s AND doc_uri=%%(document_path)s""" % { "table": Config.xcap_table} params = {"username": username, "domain" : domain, "doc_type": doc_type, "document_path": uri.doc_selector.document_path} trans.execute(query, params) result = trans.fetchall() if len(result)>1: raise MultipleResultsError(params) elif result: doc, etag = result[0] if isinstance(doc, unicode): doc = doc.encode('utf-8') check_etag(etag) response = StatusResponse(200, etag, doc) else: response = StatusResponse(404) return response def _put_document(self, trans, uri, document, check_etag): username, domain = uri.user.username, uri.user.domain self._normalize_document_path(uri) doc_type = self.app_mapping[uri.application_id] document_path = uri.doc_selector.document_path query = """SELECT etag FROM %(table)s WHERE username = %%(username)s AND domain = %%(domain)s AND doc_type= %%(doc_type)s AND doc_uri=%%(document_path)s""" % { "table": Config.xcap_table} params = {"username": username, "domain" : domain, "doc_type": doc_type, "document_path": document_path} trans.execute(query, params) result = trans.fetchall() if len(result) > 1: raise MultipleResultsError(params) elif not result: check_etag(None, False) ## the document doesn't exist, create it etag = make_random_etag(uri) query = """INSERT INTO %(table)s (username, domain, doc_type, etag, doc, doc_uri) VALUES (%%(username)s, %%(domain)s, %%(doc_type)s, %%(etag)s, %%(document)s, %%(document_path)s)""" % { "table": Config.xcap_table } params = {"username": username, "domain" : domain, "doc_type": doc_type, "etag": etag, "document": document, "document_path": document_path} # may raise IntegrityError here, if the document was created in another connection # will be catched by repeat_on_error trans.execute(query, params) return StatusResponse(201, etag) else: old_etag = result[0][0] ## first check the etag of the existing resource check_etag(old_etag) ## the document exists, replace it etag = make_random_etag(uri) query = """UPDATE %(table)s SET doc = %%(document)s, etag = %%(etag)s WHERE username = %%(username)s AND domain = %%(domain)s AND doc_type = %%(doc_type)s AND etag = %%(old_etag)s AND doc_uri = %%(document_path)s""" % { "table": Config.xcap_table } params = {"document": document, "etag": etag, "username": username, "domain" : domain, "doc_type": doc_type, "old_etag": old_etag, "document_path": document_path} trans.execute(query, params) # the request may not update anything (e.g. if etag was changed by another connection # after we did SELECT); if so, we should retry updated = getattr(trans._connection, 'affected_rows', lambda : 1)() if not updated: raise UpdateFailed assert updated == 1, updated return StatusResponse(200, etag, old_etag=old_etag) def _delete_document(self, trans, uri, check_etag): username, domain = uri.user.username, uri.user.domain self._normalize_document_path(uri) doc_type = self.app_mapping[uri.application_id] document_path = uri.doc_selector.document_path query = """SELECT etag FROM %(table)s WHERE username = %%(username)s AND domain = %%(domain)s AND doc_type= %%(doc_type)s AND doc_uri = %%(document_path)s""" % { "table": Config.xcap_table} params = {"username": username, "domain" : domain, "doc_type": doc_type, "document_path": document_path} trans.execute(query, params) result = trans.fetchall() if len(result)>1: raise MultipleResultsError(params) elif result: etag = result[0][0] check_etag(etag) query = """DELETE FROM %(table)s WHERE username = %%(username)s AND domain = %%(domain)s AND doc_type= %%(doc_type)s AND doc_uri = %%(document_path)s AND etag = %%(etag)s""" % {"table" : Config.xcap_table} params = {"username": username, "domain" : domain, "doc_type": doc_type, "document_path": document_path, "etag": etag} trans.execute(query, params) deleted = getattr(trans._connection, 'affected_rows', lambda : 1)() if not deleted: # the document was replaced/removed after the SELECT but before the DELETE raise DeleteFailed assert deleted == 1, deleted return StatusResponse(200, old_etag=etag) else: return StatusResponse(404) def _delete_all_documents(self, trans, uri): username, domain = uri.user.username, uri.user.domain query = """DELETE FROM %(table)s WHERE username = %%(username)s AND domain = %%(domain)s """ % {"table" : Config.xcap_table} params = {"username": username, "domain" : domain} trans.execute(query, params) return StatusResponse(200) def get_document(self, uri, check_etag): return self.conn.runInteraction(self._get_document, uri, check_etag) def put_document(self, uri, document, check_etag): return repeat_on_error(10, (UpdateFailed, IntegrityError), self.conn.runInteraction, self._put_document, uri, document, check_etag) def delete_document(self, uri, check_etag): return repeat_on_error(10, DeleteFailed, self.conn.runInteraction, self._delete_document, uri, check_etag) def delete_documents(self, uri, check_etag): return self.conn.runInteraction(self._delete_all_documents, uri) # Application-specific functions def _get_watchers(self, trans, uri): status_mapping = {1: "allow", 2: "confirm", 3: "deny"} presentity_uri = "sip:%s@%s" % (uri.user.username, uri.user.domain) query = """SELECT watcher_username, watcher_domain, status FROM watchers WHERE presentity_uri = %(puri)s""" params = {'puri': presentity_uri} trans.execute(query, params) result = trans.fetchall() watchers = [{"id": "%s@%s" % (w_user, w_domain), "status": status_mapping.get(subs_status, "unknown"), "online": "false"} for w_user, w_domain, subs_status in result] query = """SELECT watcher_username, watcher_domain FROM active_watchers WHERE presentity_uri = %(puri)s AND event = 'presence'""" trans.execute(query, params) result = trans.fetchall() active_watchers = set("%s@%s" % pair for pair in result) for watcher in watchers: if watcher["id"] in active_watchers: watcher["online"] = "true" return watchers def get_watchers(self, uri): return self.conn.runInteraction(self._get_watchers, uri) def _get_documents_list(self, trans, uri): query = """SELECT doc_type, doc_uri, etag FROM %(table)s WHERE username = %%(username)s AND domain = %%(domain)s""" % {'table': Config.xcap_table} params = {'username': uri.user.username, 'domain': uri.user.domain} trans.execute(query, params) result = trans.fetchall() docs = {} for r in result: app = [k for k, v in self.app_mapping.iteritems() if v == r[0]][0] if docs.has_key(app): docs[app].append((r[1], r[2])) else: docs[app] = [(r[1], r[2])] # Ex: {'pres-rules': [('index.html', '4564fd9c9a2a2e3e796310b00c9908aa')]} return docs def get_documents_list(self, uri): return self.conn.runInteraction(self._get_documents_list, uri) installSignalHandlers = True def auth_db_connection(uri): conn = connectionForURI(uri) return conn def storage_db_connection(uri): conn = connectionForURI(uri) def cb(res): if res[0:1][0:1] and res[0][0]: print '%s xcap documents in the database' % res[0][0] return res def eb(fail): fail.printTraceback() return fail # connect early, so database problem are detected early d = conn.runQuery('SELECT count(*) from %s' % Config.xcap_table) d.addCallback(cb) d.addErrback(eb) return conn diff --git a/xcap/backend/opensips.py b/xcap/backend/opensips.py index 28e7e1f..b340d9b 100644 --- a/xcap/backend/opensips.py +++ b/xcap/backend/opensips.py @@ -1,126 +1,123 @@ -# Copyright (C) 2007-2010 AG-Projects. -# - """Implementation of an OpenSIPS backend.""" import re from application import log from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import IPAddress from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.types import Singleton from sipsimple.core import Engine, FromHeader, Header, Publication, RouteHeader, SIPURI from sipsimple.configuration.datatypes import SIPProxyAddress from sipsimple.threading import run_in_twisted_thread from zope.interface import implements import xcap from xcap.datatypes import XCAPRootURI from xcap.backend import database from xcap.xcapdiff import Notifier class ServerConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Server' address = ConfigSetting(type=IPAddress, value='0.0.0.0') root = ConfigSetting(type=XCAPRootURI, value=None) class Config(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'OpenSIPS' publish_xcapdiff = False outbound_sip_proxy = '' class PlainPasswordChecker(database.PlainPasswordChecker): pass class HashPasswordChecker(database.HashPasswordChecker): pass class SIPNotifier(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.engine = Engine() self.engine.start( ip_address=None if ServerConfig.address == '0.0.0.0' else ServerConfig.address, user_agent="OpenXCAP %s" % xcap.__version__, ) self.sip_prefix_re = re.compile('^sips?:') try: self.outbound_proxy = SIPProxyAddress.from_description(Config.outbound_sip_proxy) except ValueError: log.warning('Invalid SIP proxy address specified: %s' % Config.outbound_sip_proxy) self.outbound_proxy = None def send_publish(self, uri, body): if self.outbound_proxy is None: return uri = self.sip_prefix_re.sub('', uri) publication = Publication(FromHeader(SIPURI(uri)), "xcap-diff", "application/xcap-diff+xml", duration=0, extra_headers=[Header('Thor-Scope', 'publish-xcap')]) NotificationCenter().add_observer(self, sender=publication) route_header = RouteHeader(SIPURI(host=self.outbound_proxy.host, port=self.outbound_proxy.port, parameters=dict(transport=self.outbound_proxy.transport))) publication.publish(body, route_header, timeout=5) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPPublicationDidSucceed(self, notification): log.msg("PUBLISH for xcap-diff event successfully sent to %s for %s" % (notification.data.route_header.uri, notification.sender.from_header.uri)) def _NH_SIPPublicationDidEnd(self, notification): log.msg("PUBLISH for xcap-diff event ended for %s" % notification.sender.from_header.uri) notification.center.remove_observer(self, sender=notification.sender) def _NH_SIPPublicationDidFail(self, notification): log.msg("PUBLISH for xcap-diff event failed to %s for %s" % (notification.data.route_header.uri, notification.sender.from_header.uri)) notification.center.remove_observer(self, sender=notification.sender) class NotifyingStorage(database.Storage): def __init__(self): super(NotifyingStorage, self).__init__() self._sip_notifier = SIPNotifier() self.notifier = Notifier(ServerConfig.root, self._sip_notifier.send_publish) def put_document(self, uri, document, check_etag): d = super(NotifyingStorage, self).put_document(uri, document, check_etag) d.addCallback(lambda result: self._on_put(result, uri)) return d def _on_put(self, result, uri): if result.succeed: self.notifier.on_change(uri, result.old_etag, result.etag) return result def delete_document(self, uri, check_etag): d = super(NotifyingStorage, self).delete_document(uri, check_etag) d.addCallback(lambda result: self._on_delete(result, uri)) return d def _on_delete(self, result, uri): if result.succeed: self.notifier.on_change(uri, result.old_etag, None) return result if Config.publish_xcapdiff: Storage = NotifyingStorage else: Storage = database.Storage installSignalHandlers = database.installSignalHandlers diff --git a/xcap/backend/sipthor.py b/xcap/backend/sipthor.py index 2dd4aea..fa07973 100644 --- a/xcap/backend/sipthor.py +++ b/xcap/backend/sipthor.py @@ -1,601 +1,596 @@ -# Copyright (C) 2007-2010 AG-Projects. -# -# This module is proprietary to AG Projects. Use of this module by third -# parties is not supported. - import re import signal import cjson from formencode import validators from application import log from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.types import Singleton from application.system import host from application.process import process from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import IPAddress from sqlobject import sqlhub, connectionForURI, SQLObject, AND from sqlobject import StringCol, IntCol, DateTimeCol, SOBLOBCol, Col from sqlobject import MultipleJoin, ForeignKey from zope.interface import implements from twisted.internet import reactor from twisted.internet import defer from twisted.internet.defer import Deferred, maybeDeferred from twisted.cred.checkers import ICredentialsChecker from twisted.cred.credentials import IUsernamePassword, IUsernameHashedPassword from twisted.cred.error import UnauthorizedLogin from thor.link import ControlLink, Notification, Request from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from gnutls.interfaces.twisted import X509Credentials from gnutls.constants import COMP_DEFLATE, COMP_LZO, COMP_NULL from sipsimple.core import Engine, FromHeader, Header, Publication, RouteHeader, SIPURI from sipsimple.threading import run_in_twisted_thread import xcap from xcap.tls import Certificate, PrivateKey from xcap.backend import StatusResponse from xcap.datatypes import XCAPRootURI from xcap.dbutil import make_random_etag from xcap.xcapdiff import Notifier class ThorNodeConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'ThorNetwork' domain = "sipthor.net" multiply = 1000 certificate = ConfigSetting(type=Certificate, value=None) private_key = ConfigSetting(type=PrivateKey, value=None) ca = ConfigSetting(type=Certificate, value=None) class ServerConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Server' address = ConfigSetting(type=IPAddress, value='0.0.0.0') root = ConfigSetting(type=XCAPRootURI, value=None) class JSONValidator(validators.Validator): def to_python(self, value, state): if value is None: return None try: return cjson.decode(value) except Exception: raise validators.Invalid("expected a decodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state) def from_python(self, value, state): if value is None: return None try: return cjson.encode(value) except Exception: raise validators.Invalid("expected an encodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state) class SOJSONCol(SOBLOBCol): def createValidators(self): return [JSONValidator()] + super(SOJSONCol, self).createValidators() class JSONCol(Col): baseClass = SOJSONCol class SipAccount(SQLObject): class sqlmeta: table = 'sip_accounts_meta' username = StringCol(length=64) domain = StringCol(length=64) firstName = StringCol(length=64) lastName = StringCol(length=64) email = StringCol(length=64) customerId = IntCol(default=0) resellerId = IntCol(default=0) ownerId = IntCol(default=0) changeDate = DateTimeCol(default=DateTimeCol.now) ## joins data = MultipleJoin('SipAccountData', joinColumn='account_id') def _set_profile(self, value): data = list(self.data) if not data: SipAccountData(account=self, profile=value) else: data[0].profile = value def _get_profile(self): return self.data[0].profile def set(self, **kwargs): kwargs = kwargs.copy() profile = kwargs.pop('profile', None) SQLObject.set(self, **kwargs) if profile is not None: self._set_profile(profile) class SipAccountData(SQLObject): class sqlmeta: table = 'sip_accounts_data' account = ForeignKey('SipAccount', cascade=True) profile = JSONCol() class ThorEntityAddress(str): def __new__(cls, ip, control_port=None, version='unknown'): instance = str.__new__(cls, ip) instance.ip = ip instance.version = version instance.control_port = control_port return instance class GetSIPWatchers(Request): def __new__(cls, account): command = "get sip_watchers for %s" % account instance = Request.__new__(cls, command) return instance class XCAPProvisioning(EventServiceClient): __metaclass__ = Singleton topics = ["Thor.Members"] def __init__(self): self._database = DatabaseConnection() self.node = ThorEntity(host.default_ip if ServerConfig.address == '0.0.0.0' else ServerConfig.address, ['xcap_server'], version=xcap.__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL) self.control = ControlLink(credentials) EventServiceClient.__init__(self, ThorNodeConfig.domain, credentials) process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) def _disconnect_all(self, result): self.control.disconnect_all() EventServiceClient._disconnect_all(self, result) def lookup(self, key): network = self.networks.get("sip_proxy", None) if network is None: return None try: node = network.lookup_node(key) except LookupError: node = None except Exception: log.err() node = None return node def notify(self, operation, entity_type, entity): node = self.lookup(entity) if node is not None: if node.control_port is None: log.error("Could not send notify because node %s has no control port" % node.ip) return self.control.send_request(Notification("notify %s %s %s" % (operation, entity_type, entity)), (node.ip, node.control_port)) def get_watchers(self, key): node = self.lookup(key) if node is None: return defer.fail("no nodes found when searching for key %s" % str(key)) if node.control_port is None: return defer.fail("could not send notify because node %s has no control port" % node.ip) request = GetSIPWatchers(key) request.deferred = Deferred() self.control.send_request(request, (node.ip, node.control_port)) return request.deferred def handle_event(self, event): # print "Received event: %s" % event networks = self.networks role_map = ThorEntitiesRoleMap(event.message) ## mapping between role names and lists of nodes with that role thor_databases = role_map.get('thor_database', []) if thor_databases: thor_databases.sort(lambda x, y: cmp(x.priority, y.priority) or cmp(x.ip, y.ip)) dburi = thor_databases[0].dburi else: dburi = None self._database.update_dburi(dburi) all_roles = role_map.keys() + networks.keys() for role in all_roles: try: network = networks[role] ## avoid setdefault here because it always evaluates the 2nd argument except KeyError: from thor import network as thor_network if role in ["thor_manager", "thor_monitor", "provisioning_server", "media_relay", "thor_database"]: continue else: network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network new_nodes = set([ThorEntityAddress(node.ip, getattr(node, 'control_port', None), getattr(node, 'version', 'unknown')) for node in role_map.get(role, [])]) old_nodes = set(network.nodes) ## compute set differences added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) self.control.discard_link((node.ip, node.control_port)) plural = len(removed_nodes) != 1 and 's' or '' log.msg("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes))) if added_nodes: for node in added_nodes: network.add_node(node) plural = len(added_nodes) != 1 and 's' or '' log.msg("added %s node%s: %s" % (role, plural, ', '.join(added_nodes))) #print "Thor %s nodes: %s" % (role, str(network.nodes)) class NotFound(Exception): pass class NoDatabase(Exception): pass class DatabaseConnection(object): __metaclass__ = Singleton def __init__(self): self.dburi = None # Methods to be called from the Twisted thread: def put(self, uri, document, check_etag, new_etag): defer = Deferred() operation = lambda profile: self._put_operation(uri, document, check_etag, new_etag, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer) return defer def delete(self, uri, check_etag): defer = Deferred() operation = lambda profile: self._delete_operation(uri, check_etag, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer) return defer def delete_all(self, uri): defer = Deferred() operation = lambda profile: self._delete_all_operation(uri, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer) return defer def get(self, uri): defer = Deferred() operation = lambda profile: self._get_operation(uri, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer) return defer def get_profile(self, username, domain): defer = Deferred() reactor.callInThread(self.retrieve_profile, username, domain, lambda profile: profile, False, defer) return defer def get_documents_list(self, uri): defer = Deferred() operation = lambda profile: self._get_documents_list_operation(uri, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer) return defer # Methods to be called in a separate thread: def _put_operation(self, uri, document, check_etag, new_etag, profile): xcap_docs = profile.setdefault("xcap", {}) try: etag = xcap_docs[uri.application_id][uri.doc_selector.document_path][1] except KeyError: found = False etag = None check_etag(None, False) else: found = True check_etag(etag) xcap_app = xcap_docs.setdefault(uri.application_id, {}) xcap_app[uri.doc_selector.document_path] = (document, new_etag) return (found, etag, new_etag) def _delete_operation(self, uri, check_etag, profile): xcap_docs = profile.setdefault("xcap", {}) try: etag = xcap_docs[uri.application_id][uri.doc_selector.document_path][1] except KeyError: raise NotFound() check_etag(etag) del(xcap_docs[uri.application_id][uri.doc_selector.document_path]) return (etag) def _delete_all_operation(self, uri, profile): xcap_docs = profile.setdefault("xcap", {}) xcap_docs.clear() return None def _get_operation(self, uri, profile): try: xcap_docs = profile["xcap"] doc, etag = xcap_docs[uri.application_id][uri.doc_selector.document_path] except KeyError: raise NotFound() return doc, etag def _get_documents_list_operation(self, uri, profile): try: xcap_docs = profile["xcap"] except KeyError: raise NotFound() return xcap_docs def retrieve_profile(self, username, domain, operation, update, defer): transaction = None try: if self.dburi is None: raise NoDatabase() transaction = sqlhub.processConnection.transaction() try: db_account = SipAccount.select(AND(SipAccount.q.username == username, SipAccount.q.domain == domain), connection = transaction, forUpdate = update)[0] except IndexError: raise NotFound() profile = db_account.profile result = operation(profile) # NB: may modify profile! if update: db_account.profile = profile transaction.commit(close=True) except Exception, e: if transaction: transaction.rollback() reactor.callFromThread(defer.errback, e) else: reactor.callFromThread(defer.callback, result) finally: if transaction: transaction.cache.clear() def update_dburi(self, dburi): if self.dburi != dburi: if self.dburi is not None: sqlhub.processConnection.close() if dburi is None: sqlhub.processConnection else: sqlhub.processConnection = connectionForURI(dburi) self.dburi = dburi class SipthorPasswordChecker(object): implements(ICredentialsChecker) credentialInterfaces = (IUsernamePassword, IUsernameHashedPassword) def __init__(self): self._database = DatabaseConnection() def _query_credentials(self, credentials): username, domain = credentials.username.split('@', 1)[0], credentials.realm result = self._database.get_profile(username, domain) result.addCallback(self._got_query_results, credentials) result.addErrback(self._got_unsuccessfull) return result def _got_unsuccessfull(self, failure): failure.trap(NotFound) raise UnauthorizedLogin("Unauthorized login") def _got_query_results(self, profile, credentials): return self._authenticate_credentials(profile, credentials) def _authenticate_credentials(self, profile, credentials): raise NotImplementedError def _checkedPassword(self, matched, username, realm): if matched: username = username.split('@', 1)[0] ## this is the avatar ID return "%s@%s" % (username, realm) else: raise UnauthorizedLogin("Unauthorized login") def requestAvatarId(self, credentials): """Return the avatar ID for the credentials which must have the username and realm attributes, or an UnauthorizedLogin in case of a failure.""" d = self._query_credentials(credentials) return d class PlainPasswordChecker(SipthorPasswordChecker): """A credentials checker against a database subscriber table, where the passwords are stored in plain text.""" implements(ICredentialsChecker) def _authenticate_credentials(self, profile, credentials): return maybeDeferred( credentials.checkPassword, profile["password"]).addCallback( self._checkedPassword, credentials.username, credentials.realm) class HashPasswordChecker(SipthorPasswordChecker): """A credentials checker against a database subscriber table, where the passwords are stored as MD5 hashes.""" implements(ICredentialsChecker) def _authenticate_credentials(self, profile, credentials): return maybeDeferred( credentials.checkHash, profile["ha1"]).addCallback( self._checkedPassword, credentials.username, credentials.realm) class SIPNotifier(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.provisioning = XCAPProvisioning() self.engine = Engine() self.engine.start( ip_address=None if ServerConfig.address == '0.0.0.0' else ServerConfig.address, user_agent="OpenXCAP %s" % xcap.__version__, ) def send_publish(self, uri, body): uri = re.sub("^(sip:|sips:)", "", uri) destination_node = self.provisioning.lookup(uri) if destination_node is not None: # TODO: add configuration settings for SIP transport. -Saul publication = Publication(FromHeader(SIPURI(uri)), "xcap-diff", "application/xcap-diff+xml", duration=0, extra_headers=[Header('Thor-Scope', 'publish-xcap')]) NotificationCenter().add_observer(self, sender=publication) route_header = RouteHeader(SIPURI(host=str(destination_node), port='5060', parameters=dict(transport='udp'))) publication.publish(body, route_header, timeout=5) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPPublicationDidSucceed(self, notification): log.msg("PUBLISH for xcap-diff event successfully sent to %s for %s" % (notification.data.route_header.uri, notification.sender.from_header.uri)) def _NH_SIPPublicationDidEnd(self, notification): log.msg("PUBLISH for xcap-diff event ended for %s" % notification.sender.from_header.uri) NotificationCenter().remove_observer(self, sender=notification.sender) def _NH_SIPPublicationDidFail(self, notification): log.msg("PUBLISH for xcap-diff event failed to %s for %s" % (notification.data.route_header.uri, notification.sender.from_header.uri)) NotificationCenter().remove_observer(self, sender=notification.sender) class Storage(object): __metaclass__ = Singleton def __init__(self): self._database = DatabaseConnection() self._provisioning = XCAPProvisioning() self._sip_notifier = SIPNotifier() self._notifier = Notifier(ServerConfig.root, self._sip_notifier.send_publish) def _normalize_document_path(self, uri): if uri.application_id in ("pres-rules", "org.openmobilealliance.pres-rules"): # some clients e.g. counterpath's eyebeam save presence rules under # different filenames between versions and they expect to find the same # information, thus we are forcing all presence rules documents to be # saved under "index.xml" default filename uri.doc_selector.document_path = "index.xml" def get_document(self, uri, check_etag): self._normalize_document_path(uri) result = self._database.get(uri) result.addCallback(self._got_document, check_etag) result.addErrback(self._eb_not_found) return result def _eb_not_found(self, failure): failure.trap(NotFound) return StatusResponse(404) def _got_document(self, (doc, etag), check_etag): check_etag(etag) return StatusResponse(200, etag, doc.encode('utf-8')) def put_document(self, uri, document, check_etag): document = document.decode('utf-8') self._normalize_document_path(uri) etag = make_random_etag(uri) result = self._database.put(uri, document, check_etag, etag) result.addCallback(self._cb_put, uri, "%s@%s" % (uri.user.username, uri.user.domain)) result.addErrback(self._eb_not_found) return result def _cb_put(self, result, uri, thor_key): if result[0]: code = 200 else: code = 201 self._provisioning.notify("update", "sip_account", thor_key) self._notifier.on_change(uri, result[1], result[2]) return StatusResponse(code, result[2]) def delete_documents(self, uri): result = self._database.delete_all(uri) result.addCallback(self._cb_delete_all, uri, "%s@%s" % (uri.user.username, uri.user.domain)) result.addErrback(self._eb_not_found) return result def _cb_delete_all(self, result, uri, thor_key): self._provisioning.notify("update", "sip_account", thor_key) return StatusResponse(200) def delete_document(self, uri, check_etag): self._normalize_document_path(uri) result = self._database.delete(uri, check_etag) result.addCallback(self._cb_delete, uri, "%s@%s" % (uri.user.username, uri.user.domain)) result.addErrback(self._eb_not_found) return result def _cb_delete(self, result, uri, thor_key): self._provisioning.notify("update", "sip_account", thor_key) self._notifier.on_change(uri, result[1], None) return StatusResponse(200) def get_watchers(self, uri): thor_key = "%s@%s" % (uri.user.username, uri.user.domain) result = self._provisioning.get_watchers(thor_key) result.addCallback(self._get_watchers_decode) return result def _get_watchers_decode(self, response): if response.code == 200: watchers = cjson.decode(response.data) for watcher in watchers: watcher["online"] = str(watcher["online"]).lower() return watchers else: print "error: %s" % response def get_documents_list(self, uri): result = self._database.get_documents_list(uri) result.addCallback(self._got_documents_list) result.addErrback(self._got_documents_list_error) return result def _got_documents_list(self, xcap_docs): docs = {} if xcap_docs: for k, v in xcap_docs.iteritems(): for k2, v2 in v.iteritems(): if docs.has_key(k): docs[k].append((k2, v2[1])) else: docs[k] = [(k2, v2[1])] return docs def _got_documents_list_error(self, failure): failure.trap(NotFound) return {} installSignalHandlers = False diff --git a/xcap/datatypes.py b/xcap/datatypes.py index 9213bd1..c2b7539 100644 --- a/xcap/datatypes.py +++ b/xcap/datatypes.py @@ -1,69 +1,66 @@ -# Copyright (c) 2007-2010 AG Projects. See LICENSE for details. -# - """Configuration data types""" import re import urlparse from application import log class XCAPRootURI(str): """An XCAP root URI and a number of optional aliases""" def __new__(cls, value): if value is None: return None elif not isinstance(value, basestring): raise TypeError("value must be a string, unicode or None") if value.strip() == '': return None valid_uris = [] for uri in re.split(r'\s*,\s*', value): scheme, host, path, params, query, fragment = urlparse.urlparse(uri) if host and scheme in ('http', 'https'): for u in valid_uris: if u == uri or uri.startswith(u) or u.startswith(uri): log.warn("ignoring XCAP Root URI %r (similar to %r)" % (uri, u)) break else: valid_uris.append(uri) else: log.warn("Invalid XCAP Root URI: %r" % uri) if not valid_uris: return None instance = str.__new__(cls, valid_uris[0]) instance.uris = tuple(valid_uris) return instance def _get_port_from_uri(self, uri): scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri) if scheme and netloc: if len(netloc.split(":")) == 2: try: port = int(netloc.split(":")[1]) except ValueError: return None else: return port if port < 65536 else None if scheme.lower() == "http": return 80 if scheme.lower() == "https": return 443 return None @property def aliases(self): return self.uris[1:] @property def port(self): listen_port = self._get_port_from_uri(self) if listen_port: for uri in self.aliases: if self._get_port_from_uri(uri) != listen_port: raise ValueError("All XCAP root aliases must have the same port number") return listen_port else: raise ValueError("Invalid port specified") diff --git a/xcap/dbutil.py b/xcap/dbutil.py index 80da08c..cfac5e8 100644 --- a/xcap/dbutil.py +++ b/xcap/dbutil.py @@ -1,139 +1,136 @@ -# Copyright (c) 2007-2010 AG Projects. See LICENSE for details. -# - """Database utilities""" import time import random from hashlib import md5 from twisted.enterprise import adbapi from twisted.python import reflect db_modules = {"mysql": "MySQLdb"} def make_random_etag(uri): return md5("%s%s%s" % (uri, time.time(), random.random())).hexdigest() def make_etag(uri, document): return md5("%s%s" % (uri, document)).hexdigest() def parseURI(uri): schema, rest = uri.split(':', 1) assert rest.startswith('//'), "DB URIs must start with scheme:// -- you did not include a / (in %r)" % rest rest = rest[2:] if rest.find('/') != -1: host, rest = rest.split('/', 1) else: raise ValueError("You MUST specify a database in the DB URI.") if host and host.find('@') != -1: user, host = host.split('@', 1) if user.find(':') != -1: user, password = user.split(':', 1) else: password = None if not user: raise ValueError("You MUST specify a user in the DB URI.") else: raise ValueError("You MUST specify a host in the DB URI.") if host and host.find(':') != -1: host, port = host.split(':') try: port = int(port) except ValueError: raise ValueError, "port must be integer, got '%s' instead" % port if not (1 <= port <= 65535): raise ValueError, "port must be integer in the range 1-65535, got '%d' instead" % port else: port = None db = rest return schema, user, password, host, port, db def connectionForURI(uri): """Return a Twisted adbapi connection pool for a given database URI.""" schema, user, password, host, port, db = parseURI(uri) try: module = db_modules[schema] except KeyError: raise ValueError("Database scheme '%s' is not supported." % schema) # Reconnecting is safe since we don't use transactions. # the following code prefers MySQLdb native reconnect if it's available, # falling back to twisted's cp_reconnect. # mysql's reconnect is preferred because it's better tested than twisted's # MySQLdb reconnect just works with version 1.2.2 it has been removed after kwargs = {} if module == 'MySQLdb': MySQLdb = reflect.namedModule(module) if MySQLdb.version_info[:3] == (1, 2, 2): kwargs.setdefault('reconnect', 1) kwargs.setdefault('host', host or 'localhost') kwargs.setdefault('port', int(port or '3306')) kwargs.setdefault('user', user or '') kwargs.setdefault('passwd', password or '') kwargs.setdefault('db', db) args = () if 'reconnect' not in kwargs: # note that versions other than 1.2.2 of MySQLdb don't provide reconnect parameter. # hopefully, if underlying reconnect was enabled, twisted will never see # a disconnect and its reconnection code won't interfere. kwargs.setdefault('cp_reconnect', 1) kwargs.setdefault('cp_noisy', False) pool = adbapi.ConnectionPool(module, *args, **kwargs) pool.schema = schema return pool def repeat_on_error(N, errorinfo, func, *args, **kwargs): d = func(*args, **kwargs) counter = [N] def try_again(error): if isinstance(error.value, errorinfo) and counter[0]>0: counter[0] -= 1 d = func(*args, **kwargs) d.addErrback(try_again) return d return error d.addErrback(try_again) return d if __name__=='__main__': from twisted.internet import defer def s(): print 's()' return defer.succeed(True) def f(): print 'f()' return defer.fail(ZeroDivisionError()) def getcb(msg): def callback(x): print '%s callback: %r' % (msg, x) def errback(x): print '%s errback: %r' % (msg, x) return callback, errback # calls s()'s callback d = repeat_on_error(1, Exception, s) d.addCallbacks(*getcb('s')) # calls f() for 4 times (1+3), then gives up and calls last f()'s errback d = repeat_on_error(3, Exception, f) d.addCallbacks(*getcb('f')) x = Exception() x.lst = [f, f, s] def bad_func(): f, x.lst = x.lst[0], x.lst[1:] return f() d = repeat_on_error(1, Exception, bad_func) d.addCallbacks(*getcb('bad_func')) diff --git a/xcap/element.py b/xcap/element.py index f526916..e630871 100644 --- a/xcap/element.py +++ b/xcap/element.py @@ -1,875 +1,872 @@ -# Copyright (c) 2007-2010 AG Projects. See LICENSE for details. -# - """Element handling as described in RFC 4825. This module implements * location of an element in xml document * location of insertion point for a new element in xml document This allows to implement GET/PUT/DELETE for elements in XCAP server. Syntax for element selectors is a subset of XPATH, but an XPATH implementation was not used. One reason is that XPATH only implements locating an element but not an insertion point for an element selector which does not point to an existing element (but will point to the inserted element after PUT). For element selectors of type *[@att="value"] insertion point depends on the content of a new element. For RFC compliant behavior, fix such requests by replacing '*' with the root tag of the new element. """ from StringIO import StringIO from xcap import uri from xml import sax def make_parser(): parser = sax.make_parser(['xcap.sax.expatreader']) parser.setFeature(sax.handler.feature_namespaces, 1) parser.setFeature(sax.handler.feature_namespace_prefixes, 1) return parser class ThrowEventsAway(sax.ContentHandler): pass def check_xml_fragment(element_str): """Run SAX parser on element_str to check its well-formedness. Ignore unbound namespaces prefixes. >>> check_xml_fragment("") >>> check_xml_fragment(''' ... Test ... ''') >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:7: mismatched tag >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:5: not well-formed (invalid token) >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:7: junk after document element >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:4: not well-formed (invalid token) """ parser = sax.make_parser(['xcap.sax.expatreader']) # ignore namespaces and prefixes parser.setFeature(sax.handler.feature_namespaces, 0) parser.setFeature(sax.handler.feature_namespace_prefixes, 0) parser.setContentHandler(ThrowEventsAway()) parser.parse(StringIO(element_str)) class Step(object): # to be matched against uri.Step def __init__(self, name, position = 0): self.name = name # this integer holds index of a child element currently in processing self.position = position def __repr__(self): return '%s[pos=%s]' % (self.name, self.position) class ContentHandlerBase(sax.ContentHandler): def __init__(self, selector): sax.ContentHandler.__init__(self) self.selector = selector self.state = None self.locator = None def setDocumentLocator(self, locator): self.locator = locator def pos(self): return self.locator._ref._parser.CurrentByteIndex def set_state(self, new_state): #print new_state, 'at %s' % str(self.pos()) self.state = new_state def set_end_pos(self, end_pos, end_tag = None, end_pos_2 = None): self.end_pos = end_pos self.end_tag = end_tag self.end_pos_2 = end_pos_2 def fix_end_pos(self, document): if self.end_tag is not None and self.end_tag in document[self.end_pos:self.end_pos_2]: if self.end_pos_2 is None: self.end_pos = 1 + document.index('>', self.end_pos) else: self.end_pos = 1 + document.index('>', self.end_pos, self.end_pos_2) def __repr__(self): return '<%s selector=%r state=%r>' % (self.__class__.__name__, self.selector, self.state) class ElementLocator(ContentHandlerBase): """Locates element in a document by element selector expression (subset of XPATH defined in RFC 4825) There's an intentional difference from XPATH (at least as implemented in lxml): tail following the closing tag is not included in the end result (this doesn't make sense for XCAP and incompatible with some of the requirements in RFC). """ def startDocument(self): if self.locator is None: raise RuntimeError("The parser doesn't support locators") self.path = [] self.state = 'LOOKING' self.curstep = 0 self.skiplevel = 0 self.set_end_pos(None, None, None) def startElementNS(self, name, qname, attrs): #print '-' * (len(self.path) + self.skiplevel), '<', name, '/' + '/'.join(map(str, self.path)) if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel += 1 return if self.curstep>=len(self.selector): self.skiplevel = 1 return if self.path: parent = self.path[-1] else: parent = None curstep = self.selector[self.curstep] #print `name`, `curstep.name` if curstep.name == '*' or curstep.name == name: if parent: parent.position += 1 else: self.skiplevel = 1 return if parent is None: if curstep.position not in [None, 1]: self.skiplevel = 1 return else: if curstep.position is not None and curstep.position != parent.position: self.skiplevel = 1 return if curstep.att_name is not None and attrs.get(curstep.att_name)!=curstep.att_value: self.skiplevel = 1 return #print '%r matched' % curstep self.curstep += 1 self.path.append(Step(qname)) if len(self.path)==len(self.selector): if self.state=='LOOKING': self.set_state('FOUND') self.start_pos = self.pos() elif self.state=='DONE': self.set_state('MANY') def endElementNS(self, name, qname): #print '-' * (len(self.path) + self.skiplevel-1), '>', name, '/' + '/'.join(map(str, self.path)) if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel -= 1 return if len(self.path)==len(self.selector) and self.state=='FOUND': self.set_state('DONE') # QQQ why qname passed to endElementNS is None? qname = self.path[-1].name self.set_end_pos(self.pos(), '') # where does pos() point to? two cases: # 1. ....*HERE* # 2. *HERE*... # If it's the first case we need to adjust pos() by len('') # To determine the case, let's mark the position of the next startElement/endElement # and see if there '' substring right after end_pos limited by end_pos_2 # 1. ....*end_pos*...*end_pos_2*<... # 2. *end_pos*...*end_pos_2*<... element = self.path.pop() self.curstep -= 1 class InsertPointLocator(ContentHandlerBase): """Locate the insertion point -- where in the document a new element should be inserted. It operates under assumption that the request didn't yield any matches with ElementLocator (its state was 'LOOKING' after parsing). Note, that this class doesn't know what will be inserted and therefore may do not do what you want with requests like 'labels/*[att="new-att"]'. """ def startDocument(self): if self.locator is None: raise RuntimeError("The parser doesn't support locators") self.path = [] self.state = 'LOOKING' self.curstep = 0 self.skiplevel = 0 self.set_end_pos(None, None, None) def startElementNS(self, name, qname, attrs): #print '<' * (1+len(self.path) + self.skiplevel), name, '/' + '/'.join(map(str, self.path)), #print self.curstep, self.skiplevel if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel += 1 return if self.curstep>=len(self.selector): self.skiplevel = 1 return if self.path: parent = self.path[-1] else: parent = None curstep = self.selector[self.curstep] if curstep.name == '*' or curstep.name == name: if parent: parent.position += 1 else: self.skiplevel = 1 return is_last_step = len(self.path)+1 == len(self.selector) if not is_last_step: if curstep.position is not None and curstep.position != parent.position: self.skiplevel = 1 return if curstep.att_name is not None and \ attrs.get(curstep.att_name)!=curstep.att_value: self.skiplevel = 1 return else: if curstep.position == 1 and parent.position == 1: self.set_state('DONE') self.set_end_pos(self.pos(), end_pos_2=self.pos()) self.curstep += 1 self.path.append(Step(qname)) def endElementNS(self, name, qname): #print '>' * (1+len(self.path)+self.skiplevel-1), name, '/' + '/'.join(map(str, self.path)), #print self.curstep, self.skiplevel if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel -= 1 return qname = self.path[-1].name curstep = self.selector[-1] if len(self.path)==len(self.selector): parent = self.path[-2] if curstep.position is None: if self.state=='DONE': self.set_state('MANY') else: self.set_state('CLOSED') self.set_end_pos(self.pos(), '') elif curstep.position-1 == parent.position: if self.state=='DONE': self.set_state('MANY') else: self.set_state('DONE') self.set_end_pos(self.pos(), '') elif len(self.path)+1==len(self.selector): if self.state == 'CLOSED': self.set_state('DONE') if curstep.name=='*' and curstep.position is None: self.set_end_pos(self.pos(), end_pos_2 = self.pos()) elif self.state == 'LOOKING': self.set_state('DONE') self.set_end_pos(self.pos(), end_pos_2 = self.pos()) element = self.path.pop() self.curstep -= 1 class LocatorError(ValueError): def __init__(self, msg, handler=None): ValueError.__init__(self, msg) self.handler = handler @staticmethod def generate_error(locator, element_selector): if locator.state == 'LOOKING': return None elif locator.state == 'MANY': raise SelectorError(element_selector._original_string, locator) else: raise LocatorError('Internal error in %s' % locator.__class__.__name__, locator) class SelectorError(LocatorError): http_error = 404 def __init__(self, selector, handler=None): msg = 'The requested node selector %s matches more than one element' % selector LocatorError.__init__(self, msg, handler) def find(document, element_selector): """Return an element as (first index, last index+1) If it couldn't be found, return None. If there're several matches, raise SelectorError. """ parser = make_parser() el = ElementLocator(element_selector) parser.setContentHandler(el) parser.parse(StringIO(document)) if el.state == 'DONE': el.fix_end_pos(document) return (el.start_pos, el.end_pos) else: return LocatorError.generate_error(el, element_selector) def get(document, element_selector): """Return an element as a string. If it couldn't be found, return None. If there're several matches, raise SelectorError. """ location = find(document, element_selector) if location is not None: start, end = location return document[start:end] def delete(document, element_selector): """Return document with element deleted. If it couldn't be found, return None. If there're several matches, raise SelectorError. """ location = find(document, element_selector) if location is not None: start, end = location return document[:start] + document[end:] def put(document, element_selector, element_str): """Return a 2-items tuple: (new_document, created). new_document is a copy of document with element_str inside. created is True if insertion was performed as opposed to replacement. If element_selector matches an existing element, it is replaced with element_str. If not, it is inserted at appropriate place. If it's impossible to insert at this location, return None. If element_selector matches more than one element or more than one possible place to insert and there're no rule to resolve the ambiguity then SelectorError is raised. """ location = find(document, element_selector) if location is None: ipl = InsertPointLocator(element_selector) parser = make_parser() parser.setContentHandler(ipl) parser.parse(StringIO(document)) if ipl.state == 'DONE': ipl.fix_end_pos(document) start, end = ipl.end_pos, ipl.end_pos created = True else: return LocatorError.generate_error(ipl, element_selector) else: start, end = location created = False return (document[:start] + element_str + document[end:], created) # Q: why create a new parser for every parsing? # A: when sax.make_parser() was called once, I've occasionaly encountered an exception like this: # # File "/usr/lib/python2.5/site-packages/xcap/appusage/__init__.py", line 178, in _cb_get_element # result = XCAPElement.get(response.data, uri.node_selector.element_selector) # File "/usr/lib/python2.5/site-packages/xcap/element.py", line 323, in get # location = cls.find(document, element_selector) # File "/usr/lib/python2.5/site-packages/xcap/element.py", line 308, in find # cls.parser.setContentHandler(el) # File "/usr/lib/python2.5/site-packages/_xmlplus/sax/expatreader.py", line 128, in setContentHandler # self._reset_cont_handler() # File "/usr/lib/python2.5/site-packages/_xmlplus/sax/expatreader.py", line 234, in _reset_cont_handler # self._cont_handler.processingInstruction # exceptions.AttributeError: 'NoneType' object has no attribute 'ProcessingInstructionHandler' # # I have no idea what does that mean, but probably something to do with parser's state becoming invalid # under some circumstances. class _test(object): source1 = """ hello hi! """ source2 = """ """ rls_services_xml = """ http://xcap.example.com/resource-lists/users/sip:joe@example.com/index/~~/resource-lists/list%5b@name=%22l1%22%5d presence presence """ @staticmethod def trim(s0): "remove tail from the result" s = s0 while s and s[-1]!='>': s = s[:-1] if s: return s else: return s0 @classmethod def lxml_xpath_get(cls, xpath_expr, source=source1, namespace=None, namespaces={}): "First, use xpath from lxml, which should produce the same results for existing nodes" assert '/'.startswith(xpath_expr[:1]), xpath_expr doc = etree.parse(StringIO(source)) try: # where to put namespace? r = doc.xpath(xpath_expr, namespaces=namespaces) except etree.XPathEvalError: return uri.NodeParsingError except Exception, ex: traceback.print_exc() return ex if len(r)==1: return cls.trim(etree.tostring(r[0])) elif len(r)>1: return SelectorError @staticmethod def xcap_get(xpath_expr, source=source1, namespace=None, namespaces={}): "Second, use xpath_get_element" try: selector = uri.parse_node_selector(xpath_expr, namespace, namespaces)[0] return get(source, selector) except (uri.NodeParsingError, SelectorError), ex : return ex.__class__ except Exception, ex: traceback.print_exc() return ex @staticmethod def xcap_put(xpath_expr, element, source=source1, namespace=None, namespaces={}): try: selector = uri.parse_node_selector(xpath_expr, namespace, namespaces)[0] return put(source, selector, element)[0] except (uri.NodeParsingError, SelectorError), ex : return ex.__class__ except Exception, ex: traceback.print_exc() return ex @classmethod def test_get(cls): emph1 = 'Midwinter Spring' thomas = 'Thomas Eliot' ezra = 'Ezra Pound' hi = 'hi!' yesterday = '