diff --git a/xcap/authentication.py b/xcap/authentication.py index 5ca2d78..24bb566 100644 --- a/xcap/authentication.py +++ b/xcap/authentication.py @@ -1,336 +1,336 @@ """XCAP authentication module""" # XXX this module should be either renamed or refactored as it does more then just auth. from xcap import tweaks; tweaks.tweak_BasicCredentialFactory() from zope.interface import Interface, implements from twisted.internet import defer from twisted.python import failure from twisted.cred import credentials, portal, checkers, error as credError from twisted.web2 import http, server, stream, responsecode, http_headers from twisted.web2.auth.wrapper import HTTPAuthResource, UnauthorizedResponse from application.configuration.datatypes import NetworkRangeList from application.configuration import ConfigSection, ConfigSetting import struct import socket import urlparse import xcap from xcap.datatypes import XCAPRootURI from xcap.appusage import getApplicationForURI, namespaces, public_get_applications from xcap.errors import ResourceNotFound from xcap.uri import XCAPUser, XCAPUri # body of 404 error message to render when user requests xcap-root # it's html, because XCAP root is often published on the web. # NOTE: there're no plans to convert other error messages to html. # Since a web-browser is not the primary tool for accessing XCAP server, text/plain # is easier for clients to present to user/save to logs/etc. WELCOME = ('Not Found' '

Not Found

XCAP server does not serve anything ' 'directly under XCAP Root URL. You have to be more specific.' '

' '
OpenXCAP/%s
' '') % xcap.__version__ class AuthenticationConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Authentication' default_realm = ConfigSetting(type=str, value=None) trusted_peers = ConfigSetting(type=NetworkRangeList, value=NetworkRangeList('none')) class ServerConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Server' root = ConfigSetting(type=XCAPRootURI, value=None) def generateWWWAuthenticate(headers): _generated = [] for seq in headers: scheme, challenge = seq[0], seq[1] # If we're going to parse out to something other than a dict # we need to be able to generate from something other than a dict try: l = [] for k,v in dict(challenge).iteritems(): l.append("%s=%s" % (k, k in ("algorithm", "stale") and v or http_headers.quoteString(v))) _generated.append("%s %s" % (scheme, ", ".join(l))) except ValueError: _generated.append("%s %s" % (scheme, challenge)) return _generated http_headers.generator_response_headers["WWW-Authenticate"] = (generateWWWAuthenticate,) http_headers.DefaultHTTPHandler.updateGenerators(http_headers.generator_response_headers) del generateWWWAuthenticate def parseNodeURI(node_uri, default_realm): """Parses the given Node URI, containing the XCAP root, document selector, and node selector, and returns an XCAPUri instance if succesful.""" xcap_root = None for uri in ServerConfig.root.uris: if node_uri.startswith(uri): xcap_root = uri break if xcap_root is None: raise ResourceNotFound("XCAP root not found for URI: %s" % node_uri) resource_selector = node_uri[len(xcap_root):] if not resource_selector or resource_selector=='/': raise ResourceNotFound(WELCOME, http_headers.MimeType("text", "html")) r = XCAPUri(xcap_root, resource_selector, namespaces) if r.user.domain is None: r.user.domain = default_realm return r class ITrustedPeerCredentials(credentials.ICredentials): def checkPeer(self, trusted_peers): pass class TrustedPeerCredentials(object): implements(ITrustedPeerCredentials) def __init__(self, peer): self.peer = peer def checkPeer(self, trusted_peers): for range in trusted_peers: if struct.unpack('!L', socket.inet_aton(self.peer))[0] & range[1] == range[0]: return True return False class IPublicGetApplicationCredentials(credentials.ICredentials): def checkApplication(self): pass class PublicGetApplicationCredentials(object): implements(IPublicGetApplicationCredentials) def checkApplication(self): return True ## credentials checkers class TrustedPeerChecker(object): implements(checkers.ICredentialsChecker) credentialInterfaces = (ITrustedPeerCredentials,) def __init__(self, trusted_peers): self.trusted_peers = trusted_peers def requestAvatarId(self, credentials): """Return the avatar ID for the credentials which must have a 'peer' attribute, or an UnauthorizedLogin in case of a failure.""" if credentials.checkPeer(self.trusted_peers): return defer.succeed(credentials.peer) return defer.fail(credError.UnauthorizedLogin()) class PublicGetApplicationChecker(object): implements(checkers.ICredentialsChecker) credentialInterfaces = (IPublicGetApplicationCredentials,) def requestAvatarId(self, credentials): """We already know that the method is GET and the application is a 'public GET application', we just need to say that the authentication succeeded.""" if credentials.checkApplication(): return defer.succeed(None) return defer.fail(credError.UnauthorizedLogin()) ## avatars class IAuthUser(Interface): pass class ITrustedPeer(Interface): pass class IPublicGetApplication(Interface): pass class AuthUser(str): """Authenticated XCAP User avatar.""" implements(IAuthUser) class TrustedPeer(str): """Trusted peer avatar.""" implements(ITrustedPeer) class PublicGetApplication(str): """Public get application avatar.""" implements(IPublicGetApplication) ## realm class XCAPAuthRealm(object): """XCAP authentication realm. Receives an avatar ID (a string identifying the user) and a list of interfaces the avatar needs to support. It returns an avatar that encapsulates data about that user.""" implements(portal.IRealm) def requestAvatar(self, avatarId, mind, *interfaces): if IAuthUser in interfaces: return IAuthUser, AuthUser(avatarId) elif ITrustedPeer in interfaces: return ITrustedPeer, TrustedPeer(avatarId) elif IPublicGetApplication in interfaces: return IPublicGetApplication, PublicGetApplication(avatarId) raise NotImplementedError("Only IAuthUser and ITrustedPeer interfaces are supported") def get_cred(request, default_realm): auth = request.headers.getHeader('authorization') if auth: typ, data = auth if typ == 'basic': return data.decode('base64').split(':', 1)[0], default_realm elif typ == 'digest': raise NotImplementedError return None, default_realm ## authentication wrapper for XCAP resources class XCAPAuthResource(HTTPAuthResource): def allowedMethods(self): - return ('GET', 'PUT', 'DELETE') + return 'GET', 'PUT', 'DELETE' def _updateRealm(self, realm): """Updates the realm of the attached credential factories.""" for factory in self.credentialFactories.values(): factory.realm = realm def authenticate(self, request): """Authenticates an XCAP request.""" parsed_url = urlparse.urlparse(request.uri) if request.port in (80, 443): uri = request.scheme + "://" + request.host + parsed_url.path else: uri = request.scheme + "://" + request.host + ":" + str(request.port) + parsed_url.path if parsed_url.query: uri += "?%s" % parsed_url.query xcap_uri = parseNodeURI(uri, AuthenticationConfig.default_realm) request.xcap_uri = xcap_uri if xcap_uri.doc_selector.context=='global': return defer.succeed(self.wrappedResource) ## For each request the authentication realm must be ## dinamically deducted from the XCAP request URI realm = xcap_uri.user.domain if realm is None: raise ResourceNotFound('Unknown domain (the domain part of "username@domain" is required because this server has no default domain)') if not xcap_uri.user.username: # for 'global' requests there's no username@domain in the URI, # so we will use username and domain from Authorization header xcap_uri.user.username, xcap_uri.user.domain = get_cred(request, AuthenticationConfig.default_realm) self._updateRealm(realm) # If we receive a GET to a 'public GET application' we will not authenticate it if request.method == "GET" and public_get_applications.has_key(xcap_uri.application_id): return self.portal.login(PublicGetApplicationCredentials(), None, IPublicGetApplication ).addCallbacks(self._loginSucceeded, self._publicGetApplicationLoginFailed, (request,), None, (request,), None) remote_addr = request.remoteAddr.host if AuthenticationConfig.trusted_peers: return self.portal.login(TrustedPeerCredentials(remote_addr), None, ITrustedPeer ).addCallbacks(self._loginSucceeded, self._trustedPeerLoginFailed, (request,), None, (request,), None) return HTTPAuthResource.authenticate(self, request) def _trustedPeerLoginFailed(self, result, request): """If the peer is not trusted, fallback to HTTP basic/digest authentication.""" return HTTPAuthResource.authenticate(self, request) def _publicGetApplicationLoginFailed(self, result, request): return HTTPAuthResource.authenticate(self, request) def _loginSucceeded(self, avatar, request): """Authorizes an XCAP request after it has been authenticated.""" interface, avatar_id = avatar ## the avatar is the authenticated XCAP User xcap_uri = request.xcap_uri application = getApplicationForURI(xcap_uri) if not application: raise ResourceNotFound if interface is IAuthUser and application.is_authorized(XCAPUser.parse(avatar_id), xcap_uri): return HTTPAuthResource._loginSucceeded(self, avatar, request) elif interface is ITrustedPeer or interface is IPublicGetApplication: return HTTPAuthResource._loginSucceeded(self, avatar, request) else: return failure.Failure( http.HTTPError( UnauthorizedResponse( self.credentialFactories, request.remoteAddr))) def locateChild(self, request, seg): """ Authenticate the request then return the C{self.wrappedResource} and the unmodified segments. We're not using path location, we want to fall back to the renderHTTP() call. """ #return self.authenticate(request), seg return self, server.StopTraversal def renderHTTP(self, request): """ Authenticate the request then return the result of calling renderHTTP on C{self.wrappedResource} """ if request.method not in self.allowedMethods(): response = http.Response(responsecode.NOT_ALLOWED) response.headers.setHeader("allow", self.allowedMethods()) return response def _renderResource(resource): return resource.renderHTTP(request) def _finished_reading(ignore, result): data = ''.join(result) request.attachment = data d = self.authenticate(request) d.addCallback(_renderResource) return d if request.method in ('PUT', 'DELETE'): # we need to authenticate the request after all the attachment stream # has been read # QQQ DELETE doesn't have any attachments, does it? nor does GET. # QQQ Reading attachment when there isn't one won't hurt, will it? # QQQ So why don't we just do it all the time for all requests? data = [] d = stream.readStream(request.stream, data.append) d.addCallback(_finished_reading, data) return d else: d = self.authenticate(request) d.addCallback(_renderResource) return d diff --git a/xcap/backend/sipthor.py b/xcap/backend/sipthor.py index e2835cd..967095b 100644 --- a/xcap/backend/sipthor.py +++ b/xcap/backend/sipthor.py @@ -1,595 +1,595 @@ import re import signal import cjson from formencode import validators from application import log from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.types import Singleton from application.system import host from application.process import process from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import IPAddress from sqlobject import sqlhub, connectionForURI, SQLObject, AND from sqlobject import StringCol, IntCol, DateTimeCol, SOBLOBCol, Col from sqlobject import MultipleJoin, ForeignKey from zope.interface import implements from twisted.internet import reactor from twisted.internet import defer from twisted.internet.defer import Deferred, maybeDeferred from twisted.cred.checkers import ICredentialsChecker from twisted.cred.credentials import IUsernamePassword, IUsernameHashedPassword from twisted.cred.error import UnauthorizedLogin from thor.link import ControlLink, Notification, Request from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from gnutls.interfaces.twisted import TLSContext, X509Credentials from sipsimple.core import Engine, FromHeader, Header, Publication, RouteHeader, SIPURI from sipsimple.threading import run_in_twisted_thread import xcap from xcap.tls import Certificate, PrivateKey from xcap.backend import StatusResponse from xcap.datatypes import XCAPRootURI from xcap.dbutil import make_random_etag from xcap.xcapdiff import Notifier class ThorNodeConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'ThorNetwork' domain = "sipthor.net" multiply = 1000 certificate = ConfigSetting(type=Certificate, value=None) private_key = ConfigSetting(type=PrivateKey, value=None) ca = ConfigSetting(type=Certificate, value=None) class ServerConfig(ConfigSection): __cfgfile__ = xcap.__cfgfile__ __section__ = 'Server' address = ConfigSetting(type=IPAddress, value='0.0.0.0') root = ConfigSetting(type=XCAPRootURI, value=None) class JSONValidator(validators.Validator): def to_python(self, value, state): if value is None: return None try: return cjson.decode(value) except Exception: raise validators.Invalid("expected a decodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state) def from_python(self, value, state): if value is None: return None try: return cjson.encode(value) except Exception: raise validators.Invalid("expected an encodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state) class SOJSONCol(SOBLOBCol): def createValidators(self): return [JSONValidator()] + super(SOJSONCol, self).createValidators() class JSONCol(Col): baseClass = SOJSONCol class SipAccount(SQLObject): class sqlmeta: table = 'sip_accounts_meta' username = StringCol(length=64) domain = StringCol(length=64) firstName = StringCol(length=64) lastName = StringCol(length=64) email = StringCol(length=64) customerId = IntCol(default=0) resellerId = IntCol(default=0) ownerId = IntCol(default=0) changeDate = DateTimeCol(default=DateTimeCol.now) ## joins data = MultipleJoin('SipAccountData', joinColumn='account_id') def _set_profile(self, value): data = list(self.data) if not data: SipAccountData(account=self, profile=value) else: data[0].profile = value def _get_profile(self): return self.data[0].profile def set(self, **kwargs): kwargs = kwargs.copy() profile = kwargs.pop('profile', None) SQLObject.set(self, **kwargs) if profile is not None: self._set_profile(profile) class SipAccountData(SQLObject): class sqlmeta: table = 'sip_accounts_data' account = ForeignKey('SipAccount', cascade=True) profile = JSONCol() class ThorEntityAddress(str): def __new__(cls, ip, control_port=None, version='unknown'): instance = str.__new__(cls, ip) instance.ip = ip instance.version = version instance.control_port = control_port return instance class GetSIPWatchers(Request): def __new__(cls, account): command = "get sip_watchers for %s" % account instance = Request.__new__(cls, command) return instance class XCAPProvisioning(EventServiceClient): __metaclass__ = Singleton topics = ["Thor.Members"] def __init__(self): self._database = DatabaseConnection() self.node = ThorEntity(host.default_ip if ServerConfig.address == '0.0.0.0' else ServerConfig.address, ['xcap_server'], version=xcap.__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True tls_context = TLSContext(credentials) self.control = ControlLink(tls_context) EventServiceClient.__init__(self, ThorNodeConfig.domain, tls_context) process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) def _disconnect_all(self, result): self.control.disconnect_all() EventServiceClient._disconnect_all(self, result) def lookup(self, key): network = self.networks.get("sip_proxy", None) if network is None: return None try: node = network.lookup_node(key) except LookupError: node = None except Exception: log.err() node = None return node def notify(self, operation, entity_type, entity): node = self.lookup(entity) if node is not None: if node.control_port is None: log.error("Could not send notify because node %s has no control port" % node.ip) return self.control.send_request(Notification("notify %s %s %s" % (operation, entity_type, entity)), (node.ip, node.control_port)) def get_watchers(self, key): node = self.lookup(key) if node is None: return defer.fail("no nodes found when searching for key %s" % str(key)) if node.control_port is None: return defer.fail("could not send notify because node %s has no control port" % node.ip) request = GetSIPWatchers(key) request.deferred = Deferred() self.control.send_request(request, (node.ip, node.control_port)) return request.deferred def handle_event(self, event): # print "Received event: %s" % event networks = self.networks role_map = ThorEntitiesRoleMap(event.message) ## mapping between role names and lists of nodes with that role thor_databases = role_map.get('thor_database', []) if thor_databases: thor_databases.sort(lambda x, y: cmp(x.priority, y.priority) or cmp(x.ip, y.ip)) dburi = thor_databases[0].dburi else: dburi = None self._database.update_dburi(dburi) all_roles = role_map.keys() + networks.keys() for role in all_roles: try: network = networks[role] ## avoid setdefault here because it always evaluates the 2nd argument except KeyError: from thor import network as thor_network if role in ["thor_manager", "thor_monitor", "provisioning_server", "media_relay", "thor_database"]: continue else: network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network new_nodes = set([ThorEntityAddress(node.ip, getattr(node, 'control_port', None), getattr(node, 'version', 'unknown')) for node in role_map.get(role, [])]) old_nodes = set(network.nodes) ## compute set differences added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) self.control.discard_link((node.ip, node.control_port)) plural = len(removed_nodes) != 1 and 's' or '' log.msg("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes))) if added_nodes: for node in added_nodes: network.add_node(node) plural = len(added_nodes) != 1 and 's' or '' log.msg("added %s node%s: %s" % (role, plural, ', '.join(added_nodes))) #print "Thor %s nodes: %s" % (role, str(network.nodes)) class NotFound(Exception): pass class NoDatabase(Exception): pass class DatabaseConnection(object): __metaclass__ = Singleton def __init__(self): self.dburi = None # Methods to be called from the Twisted thread: def put(self, uri, document, check_etag, new_etag): defer = Deferred() operation = lambda profile: self._put_operation(uri, document, check_etag, new_etag, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer) return defer def delete(self, uri, check_etag): defer = Deferred() operation = lambda profile: self._delete_operation(uri, check_etag, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer) return defer def delete_all(self, uri): defer = Deferred() operation = lambda profile: self._delete_all_operation(uri, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer) return defer def get(self, uri): defer = Deferred() operation = lambda profile: self._get_operation(uri, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer) return defer def get_profile(self, username, domain): defer = Deferred() reactor.callInThread(self.retrieve_profile, username, domain, lambda profile: profile, False, defer) return defer def get_documents_list(self, uri): defer = Deferred() operation = lambda profile: self._get_documents_list_operation(uri, profile) reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer) return defer # Methods to be called in a separate thread: def _put_operation(self, uri, document, check_etag, new_etag, profile): xcap_docs = profile.setdefault("xcap", {}) try: etag = xcap_docs[uri.application_id][uri.doc_selector.document_path][1] except KeyError: found = False etag = None check_etag(None, False) else: found = True check_etag(etag) xcap_app = xcap_docs.setdefault(uri.application_id, {}) xcap_app[uri.doc_selector.document_path] = (document, new_etag) - return (found, etag, new_etag) + return found, etag, new_etag def _delete_operation(self, uri, check_etag, profile): xcap_docs = profile.setdefault("xcap", {}) try: etag = xcap_docs[uri.application_id][uri.doc_selector.document_path][1] except KeyError: raise NotFound() check_etag(etag) del(xcap_docs[uri.application_id][uri.doc_selector.document_path]) return (etag) def _delete_all_operation(self, uri, profile): xcap_docs = profile.setdefault("xcap", {}) xcap_docs.clear() return None def _get_operation(self, uri, profile): try: xcap_docs = profile["xcap"] doc, etag = xcap_docs[uri.application_id][uri.doc_selector.document_path] except KeyError: raise NotFound() return doc, etag def _get_documents_list_operation(self, uri, profile): try: xcap_docs = profile["xcap"] except KeyError: raise NotFound() return xcap_docs def retrieve_profile(self, username, domain, operation, update, defer): transaction = None try: if self.dburi is None: raise NoDatabase() transaction = sqlhub.processConnection.transaction() try: db_account = SipAccount.select(AND(SipAccount.q.username == username, SipAccount.q.domain == domain), connection = transaction, forUpdate = update)[0] except IndexError: raise NotFound() profile = db_account.profile result = operation(profile) # NB: may modify profile! if update: db_account.profile = profile transaction.commit(close=True) except Exception, e: if transaction: transaction.rollback() reactor.callFromThread(defer.errback, e) else: reactor.callFromThread(defer.callback, result) finally: if transaction: transaction.cache.clear() def update_dburi(self, dburi): if self.dburi != dburi: if self.dburi is not None: sqlhub.processConnection.close() if dburi is None: sqlhub.processConnection else: sqlhub.processConnection = connectionForURI(dburi) self.dburi = dburi class SipthorPasswordChecker(object): implements(ICredentialsChecker) credentialInterfaces = (IUsernamePassword, IUsernameHashedPassword) def __init__(self): self._database = DatabaseConnection() def _query_credentials(self, credentials): username, domain = credentials.username.split('@', 1)[0], credentials.realm result = self._database.get_profile(username, domain) result.addCallback(self._got_query_results, credentials) result.addErrback(self._got_unsuccessfull) return result def _got_unsuccessfull(self, failure): failure.trap(NotFound) raise UnauthorizedLogin("Unauthorized login") def _got_query_results(self, profile, credentials): return self._authenticate_credentials(profile, credentials) def _authenticate_credentials(self, profile, credentials): raise NotImplementedError def _checkedPassword(self, matched, username, realm): if matched: username = username.split('@', 1)[0] ## this is the avatar ID return "%s@%s" % (username, realm) else: raise UnauthorizedLogin("Unauthorized login") def requestAvatarId(self, credentials): """Return the avatar ID for the credentials which must have the username and realm attributes, or an UnauthorizedLogin in case of a failure.""" d = self._query_credentials(credentials) return d class PlainPasswordChecker(SipthorPasswordChecker): """A credentials checker against a database subscriber table, where the passwords are stored in plain text.""" implements(ICredentialsChecker) def _authenticate_credentials(self, profile, credentials): return maybeDeferred( credentials.checkPassword, profile["password"]).addCallback( self._checkedPassword, credentials.username, credentials.realm) class HashPasswordChecker(SipthorPasswordChecker): """A credentials checker against a database subscriber table, where the passwords are stored as MD5 hashes.""" implements(ICredentialsChecker) def _authenticate_credentials(self, profile, credentials): return maybeDeferred( credentials.checkHash, profile["ha1"]).addCallback( self._checkedPassword, credentials.username, credentials.realm) class SIPNotifier(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.provisioning = XCAPProvisioning() self.engine = Engine() self.engine.start( ip_address=None if ServerConfig.address == '0.0.0.0' else ServerConfig.address, user_agent="OpenXCAP %s" % xcap.__version__, ) def send_publish(self, uri, body): uri = re.sub("^(sip:|sips:)", "", uri) destination_node = self.provisioning.lookup(uri) if destination_node is not None: # TODO: add configuration settings for SIP transport. -Saul publication = Publication(FromHeader(SIPURI(uri)), "xcap-diff", "application/xcap-diff+xml", duration=0, extra_headers=[Header('Thor-Scope', 'publish-xcap')]) NotificationCenter().add_observer(self, sender=publication) route_header = RouteHeader(SIPURI(host=str(destination_node), port='5060', parameters=dict(transport='udp'))) publication.publish(body, route_header, timeout=5) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPPublicationDidSucceed(self, notification): log.msg("PUBLISH for xcap-diff event successfully sent to %s for %s" % (notification.data.route_header.uri, notification.sender.from_header.uri)) def _NH_SIPPublicationDidEnd(self, notification): log.msg("PUBLISH for xcap-diff event ended for %s" % notification.sender.from_header.uri) NotificationCenter().remove_observer(self, sender=notification.sender) def _NH_SIPPublicationDidFail(self, notification): log.msg("PUBLISH for xcap-diff event failed to %s for %s" % (notification.data.route_header.uri, notification.sender.from_header.uri)) NotificationCenter().remove_observer(self, sender=notification.sender) class Storage(object): __metaclass__ = Singleton def __init__(self): self._database = DatabaseConnection() self._provisioning = XCAPProvisioning() self._sip_notifier = SIPNotifier() self._notifier = Notifier(ServerConfig.root, self._sip_notifier.send_publish) def _normalize_document_path(self, uri): if uri.application_id in ("pres-rules", "org.openmobilealliance.pres-rules"): # some clients e.g. counterpath's eyebeam save presence rules under # different filenames between versions and they expect to find the same # information, thus we are forcing all presence rules documents to be # saved under "index.xml" default filename uri.doc_selector.document_path = "index.xml" def get_document(self, uri, check_etag): self._normalize_document_path(uri) result = self._database.get(uri) result.addCallback(self._got_document, check_etag) result.addErrback(self._eb_not_found) return result def _eb_not_found(self, failure): failure.trap(NotFound) return StatusResponse(404) def _got_document(self, (doc, etag), check_etag): check_etag(etag) return StatusResponse(200, etag, doc.encode('utf-8')) def put_document(self, uri, document, check_etag): document = document.decode('utf-8') self._normalize_document_path(uri) etag = make_random_etag(uri) result = self._database.put(uri, document, check_etag, etag) result.addCallback(self._cb_put, uri, "%s@%s" % (uri.user.username, uri.user.domain)) result.addErrback(self._eb_not_found) return result def _cb_put(self, result, uri, thor_key): if result[0]: code = 200 else: code = 201 self._provisioning.notify("update", "sip_account", thor_key) self._notifier.on_change(uri, result[1], result[2]) return StatusResponse(code, result[2]) def delete_documents(self, uri): result = self._database.delete_all(uri) result.addCallback(self._cb_delete_all, uri, "%s@%s" % (uri.user.username, uri.user.domain)) result.addErrback(self._eb_not_found) return result def _cb_delete_all(self, result, uri, thor_key): self._provisioning.notify("update", "sip_account", thor_key) return StatusResponse(200) def delete_document(self, uri, check_etag): self._normalize_document_path(uri) result = self._database.delete(uri, check_etag) result.addCallback(self._cb_delete, uri, "%s@%s" % (uri.user.username, uri.user.domain)) result.addErrback(self._eb_not_found) return result def _cb_delete(self, result, uri, thor_key): self._provisioning.notify("update", "sip_account", thor_key) self._notifier.on_change(uri, result[1], None) return StatusResponse(200) def get_watchers(self, uri): thor_key = "%s@%s" % (uri.user.username, uri.user.domain) result = self._provisioning.get_watchers(thor_key) result.addCallback(self._get_watchers_decode) return result def _get_watchers_decode(self, response): if response.code == 200: watchers = cjson.decode(response.data) for watcher in watchers: watcher["online"] = str(watcher["online"]).lower() return watchers else: print "error: %s" % response def get_documents_list(self, uri): result = self._database.get_documents_list(uri) result.addCallback(self._got_documents_list) result.addErrback(self._got_documents_list_error) return result def _got_documents_list(self, xcap_docs): docs = {} if xcap_docs: for k, v in xcap_docs.iteritems(): for k2, v2 in v.iteritems(): if docs.has_key(k): docs[k].append((k2, v2[1])) else: docs[k] = [(k2, v2[1])] return docs def _got_documents_list_error(self, failure): failure.trap(NotFound) return {} installSignalHandlers = False diff --git a/xcap/element.py b/xcap/element.py index e630871..ce25b7c 100644 --- a/xcap/element.py +++ b/xcap/element.py @@ -1,872 +1,872 @@ """Element handling as described in RFC 4825. This module implements * location of an element in xml document * location of insertion point for a new element in xml document This allows to implement GET/PUT/DELETE for elements in XCAP server. Syntax for element selectors is a subset of XPATH, but an XPATH implementation was not used. One reason is that XPATH only implements locating an element but not an insertion point for an element selector which does not point to an existing element (but will point to the inserted element after PUT). For element selectors of type *[@att="value"] insertion point depends on the content of a new element. For RFC compliant behavior, fix such requests by replacing '*' with the root tag of the new element. """ from StringIO import StringIO from xcap import uri from xml import sax def make_parser(): parser = sax.make_parser(['xcap.sax.expatreader']) parser.setFeature(sax.handler.feature_namespaces, 1) parser.setFeature(sax.handler.feature_namespace_prefixes, 1) return parser class ThrowEventsAway(sax.ContentHandler): pass def check_xml_fragment(element_str): """Run SAX parser on element_str to check its well-formedness. Ignore unbound namespaces prefixes. >>> check_xml_fragment("") >>> check_xml_fragment(''' ... Test ... ''') >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:7: mismatched tag >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:5: not well-formed (invalid token) >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:7: junk after document element >>> check_xml_fragment("") Traceback (most recent call last): ... SAXParseException: :1:4: not well-formed (invalid token) """ parser = sax.make_parser(['xcap.sax.expatreader']) # ignore namespaces and prefixes parser.setFeature(sax.handler.feature_namespaces, 0) parser.setFeature(sax.handler.feature_namespace_prefixes, 0) parser.setContentHandler(ThrowEventsAway()) parser.parse(StringIO(element_str)) class Step(object): # to be matched against uri.Step def __init__(self, name, position = 0): self.name = name # this integer holds index of a child element currently in processing self.position = position def __repr__(self): return '%s[pos=%s]' % (self.name, self.position) class ContentHandlerBase(sax.ContentHandler): def __init__(self, selector): sax.ContentHandler.__init__(self) self.selector = selector self.state = None self.locator = None def setDocumentLocator(self, locator): self.locator = locator def pos(self): return self.locator._ref._parser.CurrentByteIndex def set_state(self, new_state): #print new_state, 'at %s' % str(self.pos()) self.state = new_state def set_end_pos(self, end_pos, end_tag = None, end_pos_2 = None): self.end_pos = end_pos self.end_tag = end_tag self.end_pos_2 = end_pos_2 def fix_end_pos(self, document): if self.end_tag is not None and self.end_tag in document[self.end_pos:self.end_pos_2]: if self.end_pos_2 is None: self.end_pos = 1 + document.index('>', self.end_pos) else: self.end_pos = 1 + document.index('>', self.end_pos, self.end_pos_2) def __repr__(self): return '<%s selector=%r state=%r>' % (self.__class__.__name__, self.selector, self.state) class ElementLocator(ContentHandlerBase): """Locates element in a document by element selector expression (subset of XPATH defined in RFC 4825) There's an intentional difference from XPATH (at least as implemented in lxml): tail following the closing tag is not included in the end result (this doesn't make sense for XCAP and incompatible with some of the requirements in RFC). """ def startDocument(self): if self.locator is None: raise RuntimeError("The parser doesn't support locators") self.path = [] self.state = 'LOOKING' self.curstep = 0 self.skiplevel = 0 self.set_end_pos(None, None, None) def startElementNS(self, name, qname, attrs): #print '-' * (len(self.path) + self.skiplevel), '<', name, '/' + '/'.join(map(str, self.path)) if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel += 1 return if self.curstep>=len(self.selector): self.skiplevel = 1 return if self.path: parent = self.path[-1] else: parent = None curstep = self.selector[self.curstep] #print `name`, `curstep.name` if curstep.name == '*' or curstep.name == name: if parent: parent.position += 1 else: self.skiplevel = 1 return if parent is None: if curstep.position not in [None, 1]: self.skiplevel = 1 return else: if curstep.position is not None and curstep.position != parent.position: self.skiplevel = 1 return if curstep.att_name is not None and attrs.get(curstep.att_name)!=curstep.att_value: self.skiplevel = 1 return #print '%r matched' % curstep self.curstep += 1 self.path.append(Step(qname)) if len(self.path)==len(self.selector): if self.state=='LOOKING': self.set_state('FOUND') self.start_pos = self.pos() elif self.state=='DONE': self.set_state('MANY') def endElementNS(self, name, qname): #print '-' * (len(self.path) + self.skiplevel-1), '>', name, '/' + '/'.join(map(str, self.path)) if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel -= 1 return if len(self.path)==len(self.selector) and self.state=='FOUND': self.set_state('DONE') # QQQ why qname passed to endElementNS is None? qname = self.path[-1].name self.set_end_pos(self.pos(), '') # where does pos() point to? two cases: # 1. ....*HERE* # 2. *HERE*... # If it's the first case we need to adjust pos() by len('') # To determine the case, let's mark the position of the next startElement/endElement # and see if there '' substring right after end_pos limited by end_pos_2 # 1. ....*end_pos*...*end_pos_2*<... # 2. *end_pos*...*end_pos_2*<... element = self.path.pop() self.curstep -= 1 class InsertPointLocator(ContentHandlerBase): """Locate the insertion point -- where in the document a new element should be inserted. It operates under assumption that the request didn't yield any matches with ElementLocator (its state was 'LOOKING' after parsing). Note, that this class doesn't know what will be inserted and therefore may do not do what you want with requests like 'labels/*[att="new-att"]'. """ def startDocument(self): if self.locator is None: raise RuntimeError("The parser doesn't support locators") self.path = [] self.state = 'LOOKING' self.curstep = 0 self.skiplevel = 0 self.set_end_pos(None, None, None) def startElementNS(self, name, qname, attrs): #print '<' * (1+len(self.path) + self.skiplevel), name, '/' + '/'.join(map(str, self.path)), #print self.curstep, self.skiplevel if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel += 1 return if self.curstep>=len(self.selector): self.skiplevel = 1 return if self.path: parent = self.path[-1] else: parent = None curstep = self.selector[self.curstep] if curstep.name == '*' or curstep.name == name: if parent: parent.position += 1 else: self.skiplevel = 1 return is_last_step = len(self.path)+1 == len(self.selector) if not is_last_step: if curstep.position is not None and curstep.position != parent.position: self.skiplevel = 1 return if curstep.att_name is not None and \ attrs.get(curstep.att_name)!=curstep.att_value: self.skiplevel = 1 return else: if curstep.position == 1 and parent.position == 1: self.set_state('DONE') self.set_end_pos(self.pos(), end_pos_2=self.pos()) self.curstep += 1 self.path.append(Step(qname)) def endElementNS(self, name, qname): #print '>' * (1+len(self.path)+self.skiplevel-1), name, '/' + '/'.join(map(str, self.path)), #print self.curstep, self.skiplevel if self.state=='DONE' and self.end_pos_2 is None: self.end_pos_2 = self.pos() if self.skiplevel>0: self.skiplevel -= 1 return qname = self.path[-1].name curstep = self.selector[-1] if len(self.path)==len(self.selector): parent = self.path[-2] if curstep.position is None: if self.state=='DONE': self.set_state('MANY') else: self.set_state('CLOSED') self.set_end_pos(self.pos(), '') elif curstep.position-1 == parent.position: if self.state=='DONE': self.set_state('MANY') else: self.set_state('DONE') self.set_end_pos(self.pos(), '') elif len(self.path)+1==len(self.selector): if self.state == 'CLOSED': self.set_state('DONE') if curstep.name=='*' and curstep.position is None: self.set_end_pos(self.pos(), end_pos_2 = self.pos()) elif self.state == 'LOOKING': self.set_state('DONE') self.set_end_pos(self.pos(), end_pos_2 = self.pos()) element = self.path.pop() self.curstep -= 1 class LocatorError(ValueError): def __init__(self, msg, handler=None): ValueError.__init__(self, msg) self.handler = handler @staticmethod def generate_error(locator, element_selector): if locator.state == 'LOOKING': return None elif locator.state == 'MANY': raise SelectorError(element_selector._original_string, locator) else: raise LocatorError('Internal error in %s' % locator.__class__.__name__, locator) class SelectorError(LocatorError): http_error = 404 def __init__(self, selector, handler=None): msg = 'The requested node selector %s matches more than one element' % selector LocatorError.__init__(self, msg, handler) def find(document, element_selector): """Return an element as (first index, last index+1) If it couldn't be found, return None. If there're several matches, raise SelectorError. """ parser = make_parser() el = ElementLocator(element_selector) parser.setContentHandler(el) parser.parse(StringIO(document)) if el.state == 'DONE': el.fix_end_pos(document) - return (el.start_pos, el.end_pos) + return el.start_pos, el.end_pos else: return LocatorError.generate_error(el, element_selector) def get(document, element_selector): """Return an element as a string. If it couldn't be found, return None. If there're several matches, raise SelectorError. """ location = find(document, element_selector) if location is not None: start, end = location return document[start:end] def delete(document, element_selector): """Return document with element deleted. If it couldn't be found, return None. If there're several matches, raise SelectorError. """ location = find(document, element_selector) if location is not None: start, end = location return document[:start] + document[end:] def put(document, element_selector, element_str): """Return a 2-items tuple: (new_document, created). new_document is a copy of document with element_str inside. created is True if insertion was performed as opposed to replacement. If element_selector matches an existing element, it is replaced with element_str. If not, it is inserted at appropriate place. If it's impossible to insert at this location, return None. If element_selector matches more than one element or more than one possible place to insert and there're no rule to resolve the ambiguity then SelectorError is raised. """ location = find(document, element_selector) if location is None: ipl = InsertPointLocator(element_selector) parser = make_parser() parser.setContentHandler(ipl) parser.parse(StringIO(document)) if ipl.state == 'DONE': ipl.fix_end_pos(document) start, end = ipl.end_pos, ipl.end_pos created = True else: return LocatorError.generate_error(ipl, element_selector) else: start, end = location created = False return (document[:start] + element_str + document[end:], created) # Q: why create a new parser for every parsing? # A: when sax.make_parser() was called once, I've occasionaly encountered an exception like this: # # File "/usr/lib/python2.5/site-packages/xcap/appusage/__init__.py", line 178, in _cb_get_element # result = XCAPElement.get(response.data, uri.node_selector.element_selector) # File "/usr/lib/python2.5/site-packages/xcap/element.py", line 323, in get # location = cls.find(document, element_selector) # File "/usr/lib/python2.5/site-packages/xcap/element.py", line 308, in find # cls.parser.setContentHandler(el) # File "/usr/lib/python2.5/site-packages/_xmlplus/sax/expatreader.py", line 128, in setContentHandler # self._reset_cont_handler() # File "/usr/lib/python2.5/site-packages/_xmlplus/sax/expatreader.py", line 234, in _reset_cont_handler # self._cont_handler.processingInstruction # exceptions.AttributeError: 'NoneType' object has no attribute 'ProcessingInstructionHandler' # # I have no idea what does that mean, but probably something to do with parser's state becoming invalid # under some circumstances. class _test(object): source1 = """ hello hi! """ source2 = """ """ rls_services_xml = """ http://xcap.example.com/resource-lists/users/sip:joe@example.com/index/~~/resource-lists/list%5b@name=%22l1%22%5d presence presence """ @staticmethod def trim(s0): "remove tail from the result" s = s0 while s and s[-1]!='>': s = s[:-1] if s: return s else: return s0 @classmethod def lxml_xpath_get(cls, xpath_expr, source=source1, namespace=None, namespaces={}): "First, use xpath from lxml, which should produce the same results for existing nodes" assert '/'.startswith(xpath_expr[:1]), xpath_expr doc = etree.parse(StringIO(source)) try: # where to put namespace? r = doc.xpath(xpath_expr, namespaces=namespaces) except etree.XPathEvalError: return uri.NodeParsingError except Exception, ex: traceback.print_exc() return ex if len(r)==1: return cls.trim(etree.tostring(r[0])) elif len(r)>1: return SelectorError @staticmethod def xcap_get(xpath_expr, source=source1, namespace=None, namespaces={}): "Second, use xpath_get_element" try: selector = uri.parse_node_selector(xpath_expr, namespace, namespaces)[0] return get(source, selector) except (uri.NodeParsingError, SelectorError), ex : return ex.__class__ except Exception, ex: traceback.print_exc() return ex @staticmethod def xcap_put(xpath_expr, element, source=source1, namespace=None, namespaces={}): try: selector = uri.parse_node_selector(xpath_expr, namespace, namespaces)[0] return put(source, selector, element)[0] except (uri.NodeParsingError, SelectorError), ex : return ex.__class__ except Exception, ex: traceback.print_exc() return ex @classmethod def test_get(cls): emph1 = 'Midwinter Spring' thomas = 'Thomas Eliot' ezra = 'Ezra Pound' hi = 'hi!' yesterday = '