diff --git a/sylk/applications/xmppgateway/__init__.py b/sylk/applications/xmppgateway/__init__.py new file mode 100644 index 0000000..e80dea7 --- /dev/null +++ b/sylk/applications/xmppgateway/__init__.py @@ -0,0 +1,334 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +import os + +from application.notification import IObserver, NotificationCenter +from application.python import Null +from sipsimple.core import SIPURI +from sipsimple.payloads import ParserError +from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage +from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError +from sipsimple.threading.green import run_in_green_thread +from zope.interface import implements + +from sylk.applications import ISylkApplication, SylkApplication, ApplicationLogger +from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig +from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource +from sylk.applications.xmppgateway.im import SIPMessageSender, SIPMessageError, ChatSessionHandler +from sylk.applications.xmppgateway.presence import S2XPresenceHandler, X2SPresenceHandler +from sylk.applications.xmppgateway.xmpp import ChatMessage, ChatComposingIndication, NormalMessage, XMPPChatSession +from sylk.applications.xmppgateway.xmpp import XMPPManager + +log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) + + +class XMPPGatewayApplication(object): + __metaclass__ = SylkApplication + implements(ISylkApplication, IObserver) + + __appname__ = 'xmppgateway' + + def __init__(self): + self.xmpp_manager = XMPPManager() + self.pending_sessions = {} + self.chat_sessions = set() + self.s2x_presence_subscriptions = {} + self.x2s_presence_subscriptions = {} + + def start(self): + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=self.xmpp_manager) + self.xmpp_manager.start() + + def stop(self): + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self.xmpp_manager) + self.xmpp_manager.stop() + + def incoming_session(self, session): + log.msg('New incoming session from %s' % session.remote_identity.uri) + try: + msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next() + except StopIteration: + session.reject(488) + return + + # Get URI representing the SIP side + contact_uri = session._invitation.remote_contact_header.uri + if contact_uri.parameters.get('gr') is not None: + sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) + else: + tmp = session.remote_identity.uri + sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) + + # Get URI representing the XMPP side + request_uri = session._invitation.request_uri + remote_resource = request_uri.parameters.get('gr', None) + if remote_resource is not None: + try: + remote_resource = decode_resource(remote_resource) + except (TypeError, UnicodeError): + pass + xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) + + try: + handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] + except KeyError: + pass + else: + # There is another pending session with same identifiers, can't accept this one + session.reject(488) + return + + sip_identity = Identity(sip_leg_uri, session.remote_identity.display_name) + handler = ChatSessionHandler.new_from_sip_session(sip_identity, session) + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=handler) + key = (sip_leg_uri, xmpp_leg_uri) + self.pending_sessions[key] = handler + + if xmpp_leg_uri.resource is not None: + # Incoming session target contained GRUU, so create XMPPChatSession immediately + xmpp_session = XMPPChatSession(local_identity=handler.sip_identity, remote_identity=Identity(xmpp_leg_uri)) + handler.xmpp_identity = xmpp_session.remote_identity + handler.xmpp_session = xmpp_session + + def incoming_subscription(self, subscribe_request, data): + if subscribe_request.event != 'presence': + subscribe_request.reject(489) + return + # Get URI representing the SIP side + remote_identity_uri = data.headers['From'].uri + sip_leg_uri = FrozenURI(remote_identity_uri.user, remote_identity_uri.host) + + # Get URI representing the XMPP side + request_uri = data.request_uri + xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host) + + try: + handler = self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] + except KeyError: + sip_identity = Identity(sip_leg_uri, data.headers['From'].display_name) + xmpp_identity = Identity(xmpp_leg_uri) + handler = S2XPresenceHandler(sip_identity, xmpp_identity) + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=handler) + handler.start() + handler.add_sip_subscription(subscribe_request) + + def incoming_referral(self, refer_request, data): + refer_request.reject(405) + + def incoming_sip_message(self, message_request, data): + content_type = data.headers.get('Content-Type', Null)[0] + from_header = data.headers.get('From', Null) + to_header = data.headers.get('To', Null) + if Null in (content_type, from_header, to_header): + message_request.answer(400) + return + log.msg('New incoming SIP MESSAGE from %s' % from_header.uri) + if content_type == 'message/cpim': + try: + cpim_message = CPIMMessage.parse(data.body) + except CPIMParserError: + message_request.answer(400) + return + else: + body = cpim_message.body + content_type = cpim_message.content_type + sender = cpim_message.sender or from_header + from_uri = sender.uri + else: + body = data.body + from_uri = from_header.uri + to_uri = str(to_header.uri) + message_request.answer(200) + if from_uri.parameters.get('gr', None) is None: + from_uri = SIPURI.new(from_uri) + from_uri.parameters['gr'] = generate_sylk_resource() + sender = Identity(FrozenURI.parse(from_uri)) + recipient = Identity(FrozenURI.parse(to_uri)) + if content_type in ('text/plain', 'text/html'): + if XMPPGatewayConfig.use_msrp_for_chat: + message = NormalMessage(sender, recipient, body, content_type, use_receipt=False) + self.xmpp_manager.send_stanza(message) + else: + message = ChatMessage(sender, recipient, body, content_type, use_receipt=False) + self.xmpp_manager.send_stanza(message) + elif content_type == IsComposingDocument.content_type: + if not XMPPGatewayConfig.use_msrp_for_chat: + try: + msg = IsComposingMessage.parse(body) + except ParserError: + pass + else: + state = 'composing' if msg.state == 'active' else 'paused' + message = ChatComposingIndication(sender, recipient, state, use_receipt=False) + self.xmpp_manager.send_stanza(message) + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + # Out of band XMPP stanza handling + + @run_in_green_thread + def _NH_XMPPGotChatMessage(self, notification): + # This notification is only processed here untill the ChatSessionHandler + # has both (SIP and XMPP) sessions established + message = notification.data.message + sender = message.sender + recipient = message.recipient + if XMPPGatewayConfig.use_msrp_for_chat: + if recipient.uri.resource is None: + # If recipient resource is not set the session is started from + # the XMPP side + sip_leg_uri = FrozenURI.new(recipient.uri) + xmpp_leg_uri = FrozenURI.new(sender.uri) + try: + handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] + except KeyError: + # Not found, need to create a new handler and a outgoing SIP session + xmpp_identity = Identity(xmpp_leg_uri) + handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) + key = (sip_leg_uri, xmpp_leg_uri) + self.pending_sessions[key] = handler + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=handler) + handler.xmpp_message_queue.append(message) + else: + # Find handler pending XMPP confirmation + sip_leg_uri = FrozenURI.new(recipient.uri) + xmpp_leg_uri = FrozenURI(sender.uri.user, sender.uri.host) + try: + handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] + except KeyError: + # Find handler pending XMPP confirmation + sip_leg_uri = FrozenURI(recipient.uri.user, recipient.uri.host) + xmpp_leg_uri = FrozenURI.new(sender.uri) + try: + handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] + except KeyError: + # It's a new XMPP session to a full JID, disregard the full JID and start a new SIP session to the bare JID + xmpp_identity = Identity(xmpp_leg_uri) + handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) + key = (sip_leg_uri, xmpp_leg_uri) + self.pending_sessions[key] = handler + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=handler) + handler.xmpp_message_queue.append(message) + else: + # Found handle, create XMPP session and establish session + session = XMPPChatSession(local_identity=recipient, remote_identity=sender) + handler.xmpp_message_queue.append(message) + handler.xmpp_identity = session.remote_identity + handler.xmpp_session = session + else: + sip_message_sender = SIPMessageSender(message) + try: + sip_message_sender.send().wait() + except SIPMessageError as e: + # TODO report back an error stanza + log.error('Error sending SIP MESSAGE: %s' % e) + + @run_in_green_thread + def _NH_XMPPGotNormalMessage(self, notification): + message = notification.data.message + sip_message_sender = SIPMessageSender(message) + try: + sip_message_sender.send().wait() + except SIPMessageError as e: + # TODO report back an error stanza + log.error('Error sending SIP MESSAGE: %s' % e) + + @run_in_green_thread + def _NH_XMPPGotComposingIndication(self, notification): + composing_indication = notification.data.composing_indication + sender = composing_indication.sender + recipient = composing_indication.recipient + if not XMPPGatewayConfig.use_msrp_for_chat: + state = 'active' if composing_indication.state == 'composing' else 'idle' + body = IsComposingMessage(state=state, refresh=composing_indication.interval or 30).toxml() + message = NormalMessage(sender, recipient, body, IsComposingDocument.content_type) + sip_message_sender = SIPMessageSender(message) + try: + sip_message_sender.send().wait() + except SIPMessageError as e: + # TODO report back an error stanza + log.error('Error sending SIP MESSAGE: %s' % e) + + def _NH_XMPPGotPresenceSubscriptionRequest(self, notification): + stanza = notification.data.stanza + # Disregard the resource part, the presence request could be a probe instead of a subscribe + sender_uri = stanza.sender.uri + sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) + try: + handler = self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] + except KeyError: + xmpp_identity = stanza.sender + xmpp_identity.uri = sender_uri_bare + sip_identity = stanza.recipient + handler = X2SPresenceHandler(sip_identity, xmpp_identity) + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=handler) + handler.start() + + # Chat session handling + + def _NH_ChatSessionDidStart(self, notification): + handler = notification.sender + log.msg('Chat session established sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) + for k,v in self.pending_sessions.items(): + if v is handler: + del self.pending_sessions[k] + break + self.chat_sessions.add(handler) + + def _NH_ChatSessionDidEnd(self, notification): + handler = notification.sender + log.msg('Chat session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) + self.chat_sessions.remove(handler) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=handler) + + def _NH_ChatSessionDidFail(self, notification): + handler = notification.sender + uris = None + for k,v in self.pending_sessions.items(): + if v is handler: + uris = k + del self.pending_sessions[k] + break + sip_uri, xmpp_uri = uris + log.msg('Chat session failed sip:%s <--> xmpp:%s' % (sip_uri, xmpp_uri)) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=handler) + + # Presence handling + + def _NH_S2XPresenceHandlerDidStart(self, notification): + handler = notification.sender + log.msg('Presence session established sip:%s --> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) + self.s2x_presence_subscriptions[(handler.sip_identity.uri, handler.xmpp_identity.uri)] = handler + + def _NH_S2XPresenceHandlerDidEnd(self, notification): + handler = notification.sender + log.msg('Presence session ended sip:%s --> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) + self.s2x_presence_subscriptions.pop((handler.sip_identity.uri, handler.xmpp_identity.uri), None) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=handler) + + def _NH_X2SPresenceHandlerDidStart(self, notification): + handler = notification.sender + log.msg('Presence session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) + self.x2s_presence_subscriptions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler + + def _NH_X2SPresenceHandlerDidEnd(self, notification): + handler = notification.sender + log.msg('Presence session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) + self.x2s_presence_subscriptions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=handler) + + diff --git a/sylk/applications/xmppgateway/configuration.py b/sylk/applications/xmppgateway/configuration.py new file mode 100644 index 0000000..0316fb6 --- /dev/null +++ b/sylk/applications/xmppgateway/configuration.py @@ -0,0 +1,22 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +from application.system import host +from application.configuration import ConfigSection, ConfigSetting +from application.configuration.datatypes import StringList +from sipsimple.configuration.datatypes import NonNegativeInteger + +from sylk.configuration.datatypes import IPAddress, Port + + +class XMPPGatewayConfig(ConfigSection): + __cfgfile__ = 'xmppgateway.ini' + __section__ = 'general' + + local_ip = ConfigSetting(type=IPAddress, value=host.default_ip) + local_port = ConfigSetting(type=Port, value=5269) + trace_xmpp = False + domains = ConfigSetting(type=StringList, value='') + sip_session_timeout = ConfigSetting(type=NonNegativeInteger, value=600) + use_msrp_for_chat = True + diff --git a/sylk/applications/xmppgateway/datatypes.py b/sylk/applications/xmppgateway/datatypes.py new file mode 100644 index 0000000..e3191ff --- /dev/null +++ b/sylk/applications/xmppgateway/datatypes.py @@ -0,0 +1,166 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +import hashlib +import random +import string + +from application.python.descriptor import WriteOnceAttribute +from sipsimple.core import BaseSIPURI, SIPURI, SIPCoreError +from twisted.words.protocols.jabber.jid import JID + + +sylkserver_prefix = hashlib.md5('sylkserver').hexdigest() + +def generate_sylk_resource(): + r = 'sylk-'+''.join(random.choice(string.ascii_letters+string.digits) for x in range(32)) + return r.encode('hex') + +def is_sylk_resource(r): + if r.startswith('urn:uuid:') or len(r) != 74: + return False + try: + decoded = r.decode('hex') + except TypeError: + return False + else: + return decoded.startswith('sylk-') + +def encode_resource(r): + return r.encode('utf-8').encode('hex') + +def decode_resource(r): + return r.decode('hex').decode('utf-8') + + +class BaseURI(object): + def __init__(self, user, host, resource=None): + self.user = user + self.host = host + self.resource = resource + + @classmethod + def parse(cls, value): + if isinstance(value, BaseSIPURI): + user = unicode(value.user) + host = unicode(value.host) + resource = unicode(value.parameters.get('gr', '')) or None + return cls(user, host, resource) + elif isinstance(value, JID): + user = value.user + host = value.host + resource = value.resource + return cls(user, host, resource) + elif not isinstance(value, basestring): + raise TypeError('uri needs to be a string') + if not value.startswith(('sip:', 'sips:', 'xmpp:')): + raise ValueError('invalid uri scheme for %s' % value) + if value.startswith(('sip:', 'sips:')): + try: + uri = SIPURI.parse(value) + except SIPCoreError: + raise ValueError('invalid SIP uri: %s' % value) + user = unicode(uri.user) + host = unicode(uri.host) + resource = unicode(uri.parameters.get('gr', '')) or None + else: + try: + jid = JID(value[5:]) + except Exception: + raise ValueError('invalid XMPP uri: %s' % value) + user = jid.user + host = jid.host + resource = jid.resource + return cls(user, host, resource) + + @classmethod + def new(cls, uri): + if not isinstance(uri, BaseURI): + raise TypeError('%s is not a valid URI type' % type(uri)) + return cls(uri.user, uri.host, uri.resource) + + def as_sip_uri(self): + uri = SIPURI(user=str(self.user), host=str(self.host)) + if self.resource is not None: + uri.parameters['gr'] = self.resource.encode('utf-8') + return uri + + def as_xmpp_jid(self): + jid = JID(tuple=(self.user, self.host, self.resource)) + return jid + + def as_string(self, protocol): + if protocol not in ('sip', 'xmpp'): + raise ValueError('protocol must be one of "sip" or "xmpp"') + if protocol == 'sip': + uri = self.as_sip_uri() + return unicode(str(uri)) + else: + uri = self.as_xmpp_jid() + return unicode(uri) + + def __eq__(self, other): + if isinstance(other, BaseURI): + return self.user == other.user and self.host == other.host and self.resource == other.resource + elif isinstance(other, basestring): + try: + other = BaseURI.parse(other) + except ValueError: + return False + else: + return self.user == other.user and self.host == other.host and self.resource == other.resource + else: + return NotImplemented + + def __ne__(self, other): + equal = self.__eq__(other) + return NotImplemented if equal is NotImplemented else not equal + + def __repr__(self): + return '%s(user=%r, host=%r, resource=%r)' % (self.__class__.__name__, self.user, self.host, self.resource) + + def __unicode__(self): + return u'%s@%s' % (self.user, self.host) + + def __str__(self): + return unicode(self).encode('utf-8') + + +class URI(BaseURI): + pass + + +class FrozenURI(BaseURI): + user = WriteOnceAttribute() + host = WriteOnceAttribute() + resource = WriteOnceAttribute() + + def __hash__(self): + return hash((self.user, self.host, self.resource)) + + +class Identity(object): + def __init__(self, uri, display_name=None): + self.uri = uri + self.display_name = display_name + + def __eq__(self, other): + if isinstance(other, Identity): + return self.uri == other.uri and self.display_name == other.display_name + else: + return NotImplemented + + def __ne__(self, other): + equal = self.__eq__(other) + return NotImplemented if equal is NotImplemented else not equal + + def __unicode__(self): + if self.display_name is not None: + return u'%s <%s>' % (self.display_name, self.uri) + else: + return u'%s' % self.uri + + def __str__(self): + return unicode(self).encode('utf-8') + + diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py new file mode 100644 index 0000000..9c2cffa --- /dev/null +++ b/sylk/applications/xmppgateway/im.py @@ -0,0 +1,436 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +import os + +from application.notification import IObserver, NotificationCenter +from application.python import Null +from application.python.descriptor import WriteOnceAttribute +from collections import deque +from eventlet import coros +from sipsimple.account import AccountManager +from sipsimple.configuration.settings import SIPSimpleSettings +from sipsimple.core import SIPURI +from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader +from sipsimple.core import Message as SIPMessageRequest +from sipsimple.lookup import DNSLookup, DNSLookupError +from sipsimple.streams.applications.chat import CPIMIdentity +from sipsimple.threading import run_in_twisted_thread +from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread +from sipsimple.util import TimestampedNotificationData +from twisted.internet import reactor +from zope.interface import implements + +from sylk.applications import ApplicationLogger +from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig +from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource +from sylk.applications.xmppgateway.xmpp import ChatMessage, XMPPChatSession +from sylk.applications.xmppgateway.xmpp import XMPPManager +from sylk.extensions import ChatStream +from sylk.session import ServerSession + +log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) + + +__all__ = ['ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError'] + + +SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout + + +class ChatSessionHandler(object): + implements(IObserver) + + sip_identity = WriteOnceAttribute() + xmpp_identity = WriteOnceAttribute() + + def __init__(self): + self.started = False + self.ended = False + self.sip_session = None + self.msrp_stream = None + self._sip_session_timer = None + + self.use_receipts = False + self.xmpp_session = None + self.xmpp_message_queue = deque() + + self._pending_msrp_chunks = {} + self._pending_xmpp_stanzas = {} + + def _set_started(self, value): + old_value = self.__dict__.get('started', False) + self.__dict__['started'] = value + if not old_value and value: + notification_center = NotificationCenter() + notification_center.post_notification('ChatSessionDidStart', sender=self, data=TimestampedNotificationData()) + self._send_queued_messages() + def _get_started(self): + return self.__dict__['started'] + started = property(_get_started, _set_started) + del _get_started, _set_started + + def _set_xmpp_session(self, session): + self.__dict__['xmpp_session'] = session + if session is not None: + # Reet SIP session timer in case it's active + if self._sip_session_timer is not None and self._sip_session_timer.active(): + self._sip_session_timer.reset(SESSION_TIMEOUT) + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=session) + session.start() + # Reet SIP session timer in case it's active + if self._sip_session_timer is not None and self._sip_session_timer.active(): + self._sip_session_timer.reset(SESSION_TIMEOUT) + def _get_xmpp_session(self): + return self.__dict__['xmpp_session'] + xmpp_session = property(_get_xmpp_session, _set_xmpp_session) + del _get_xmpp_session, _set_xmpp_session + + @classmethod + def new_from_sip_session(cls, sip_identity, session): + instance = cls() + instance.sip_identity = sip_identity + instance._start_incoming_sip_session(session) + return instance + + @classmethod + def new_from_xmpp_stanza(cls, xmpp_identity, recipient): + instance = cls() + instance.xmpp_identity = xmpp_identity + instance._start_outgoing_sip_session(recipient) + return instance + + @run_in_green_thread + def _start_incoming_sip_session(self, session): + self.sip_session = session + self.msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next() + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=self.sip_session) + notification_center.add_observer(self, sender=self.msrp_stream) + self.sip_session.accept([self.msrp_stream]) + + @run_in_green_thread + def _start_outgoing_sip_session(self, target_uri): + notification_center = NotificationCenter() + # self.xmpp_identity is our local identity + from_uri = self.xmpp_identity.uri.as_sip_uri() + del from_uri.parameters['gr'] # no GRUU in From header + contact_uri = self.xmpp_identity.uri.as_sip_uri() + contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) + to_uri = target_uri.as_sip_uri() + lookup = DNSLookup() + settings = SIPSimpleSettings() + account = AccountManager().default_account + if account.sip.outbound_proxy is not None: + uri = SIPURI(host=account.sip.outbound_proxy.host, + port=account.sip.outbound_proxy.port, + parameters={'transport': account.sip.outbound_proxy.transport}) + else: + uri = to_uri + try: + routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() + except DNSLookupError: + log.warning('DNS lookup error while looking for %s proxy' % uri) + notification_center.post_notification('ChatSessionDidFail', sender=self, data=TimestampedNotificationData()) + return + self.msrp_stream = ChatStream(account) + route = routes.pop(0) + from_header = FromHeader(from_uri) + to_header = ToHeader(to_uri) + contact_header = ContactHeader(contact_uri) + self.sip_session = ServerSession(account) + notification_center.add_observer(self, sender=self.sip_session) + notification_center.add_observer(self, sender=self.msrp_stream) + self.sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self.msrp_stream]) + + def end(self): + if self.ended: + return + if self._sip_session_timer is not None and self._sip_session_timer.active(): + self._sip_session_timer.cancel() + self._sip_session_timer = None + notification_center = NotificationCenter() + if self.sip_session is not None: + notification_center.remove_observer(self, sender=self.sip_session) + notification_center.remove_observer(self, sender=self.msrp_stream) + self.sip_session.end() + self.sip_session = None + self.msrp_stream = None + if self.xmpp_session is not None: + notification_center.remove_observer(self, sender=self.xmpp_session) + self.xmpp_session.end() + self.xmpp_session = None + self.ended = True + if self.started: + notification_center.post_notification('ChatSessionDidEnd', sender=self, data=TimestampedNotificationData()) + else: + notification_center.post_notification('ChatSessionDidFail', sender=self, data=TimestampedNotificationData()) + + def _send_queued_messages(self): + if self.xmpp_message_queue: + while self.xmpp_message_queue: + message = self.xmpp_message_queue.popleft() + if message.content_type not in ('text/plain', 'text/html'): + continue + if not message.use_receipt: + success_report = 'no' + failure_report = 'no' + else: + success_report = 'yes' + failure_report = 'yes' + sender_uri = message.sender.uri.as_sip_uri() + sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8')) + sender = CPIMIdentity(sender_uri) + self.msrp_stream.send_message(message.body, message.content_type, local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) + self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) + + def _inactivity_timeout(self): + log.msg("Ending SIP session %s due to incactivity" % self.sip_session._invitation.call_id) + self.sip_session.end() + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_SIPSessionDidStart(self, notification): + log.msg("SIP session %s started" % notification.sender._invitation.call_id) + self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout) + + if self.sip_session.direction == 'outgoing': + # Time to set sip_identity and create the XMPPChatSession + contact_uri = self.sip_session._invitation.remote_contact_header.uri + if contact_uri.parameters.get('gr') is not None: + sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) + else: + tmp = self.sip_session.remote_identity.uri + sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) + self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name) + session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) + self.xmpp_session = session + # Session is now established on both ends + self.started = True + # Send an 'active' message stanza to wakeup XMPP clients + self.xmpp_session.send_composing_indication('active') + else: + if self.xmpp_session is not None: + # Session is now established on both ends + self.started = True + # Send an 'active' message stanza to wakeup XMPP clients + self.xmpp_session.send_composing_indication('active') + else: + # Send queued messages + self._send_queued_messages() + + def _NH_SIPSessionDidEnd(self, notification): + log.msg("SIP session %s ended" % notification.sender._invitation.call_id) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self.sip_session) + notification_center.remove_observer(self, sender=self.msrp_stream) + self.sip_session = None + self.msrp_stream = None + self.end() + + def _NH_SIPSessionDidFail(self, notification): + log.msg("SIP session %s failed" % notification.sender._invitation.call_id) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self.sip_session) + notification_center.remove_observer(self, sender=self.msrp_stream) + self.sip_session = None + self.msrp_stream = None + self.end() + + def _NH_SIPSessionGotProposal(self, notification): + self.sip_session.reject_proposal() + + def _NH_SIPSessionTransferNewIncoming(self, notification): + self.sip_session.reject_transfer(403) + + def _NH_ChatStreamGotMessage(self, notification): + # Notification is sent by the MSRP stream + message = notification.data.message + content_type = message.content_type.lower() + if content_type in ('text/plain', 'text/html'): + if self._sip_session_timer is not None and self._sip_session_timer.active(): + self._sip_session_timer.reset(SESSION_TIMEOUT) + chunk = notification.data.chunk + if self.started: + self.xmpp_session.send_message(message.body, message.content_type, message_id=chunk.message_id) + if self.use_receipts: + self._pending_msrp_chunks[chunk.message_id] = chunk + else: + self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') + else: + sender = self.sip_identity + recipient_uri = FrozenURI.parse(message.recipients[0].uri) + recipient = Identity(recipient_uri, message.recipients[0].display_name) + xmpp_manager = XMPPManager() + xmpp_manager.send_stanza(ChatMessage(sender, recipient, message.body, message.content_type)) + self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') + + def _NH_ChatStreamGotComposingIndication(self, notification): + # Notification is sent by the MSRP stream + if self._sip_session_timer is not None and self._sip_session_timer.active(): + self._sip_session_timer.reset(SESSION_TIMEOUT) + if not self.started: + return + state = None + if notification.data.state == 'active': + state = 'composing' + elif notification.data.state == 'idle': + state = 'paused' + if state is not None: + self.xmpp_session.send_composing_indication(state) + + def _NH_ChatStreamDidDeliverMessage(self, notification): + if self.started: + message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) + if message is not None: + self.xmpp_session.send_receipt_acknowledgement(message.id) + + def _NH_ChatStreamDidNotDeliverMessage(self, notification): + if self.started: + message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) + if message is not None: + self.xmpp_session.send_error(message, 'TODO', []) # TODO + + def _NH_XMPPChatSessionDidStart(self, notification): + if self.sip_session is not None: + # Session is now established on both ends + self.started = True + + def _NH_XMPPChatSessionDidEnd(self, notification): + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self.xmpp_session) + self.xmpp_session = None + self.end() + + def _NH_XMPPChatSessionGotMessage(self, notification): + if self.sip_session is None or self.sip_session.state != 'connected': + self._xmpp_message_queue.append(notification.data.message) + return + if self._sip_session_timer is not None and self._sip_session_timer.active(): + self._sip_session_timer.reset(SESSION_TIMEOUT) + message = notification.data.message + sender_uri = message.sender.uri.as_sip_uri() + del sender_uri.parameters['gr'] # no GRUU in CPIM From header + sender = CPIMIdentity(sender_uri) + self.use_receipts = message.use_receipt + if not message.use_receipt: + success_report = 'no' + failure_report = 'no' + else: + success_report = 'yes' + failure_report = 'yes' + self._pending_xmpp_stanzas[message.id] = message + self.msrp_stream.send_message(message.body, message.content_type, local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) + self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) + + def _NH_XMPPChatSessionGotComposingIndication(self, notification): + if self.sip_session is None or self.sip_session.state != 'connected': + return + if self._sip_session_timer is not None and self._sip_session_timer.active(): + self._sip_session_timer.reset(SESSION_TIMEOUT) + message = notification.data.message + state = None + if message.state == 'composing': + state = 'active' + elif message.state == 'paused': + state = 'idle' + if state is not None: + sender_uri = message.sender.uri.as_sip_uri() + del sender_uri.parameters['gr'] # no GRUU in CPIM From header + sender = CPIMIdentity(sender_uri) + self.msrp_stream.send_composing_indication(state, 30, local_identity=sender) + if message.use_receipt: + self.xmpp_session.send_receipt_acknowledgement(message.id) + + def _NH_XMPPChatSessionDidDeliverMessage(self, notification): + chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) + if chunk is not None: + self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') + + def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification): + chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) + if chunk is not None: + self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason) + + +def chunks(text, size): + for i in xrange(0, len(text), size): + yield text[i:i+size] + +class SIPMessageError(Exception): + def __init__(self, code, reason): + Exception.__init__(self, reason) + self.code = code + self.reason = reason + +class SIPMessageSender(object): + implements(IObserver) + + def __init__(self, message): + self.from_uri = message.sender.uri.as_sip_uri() + self.from_uri.parameters.pop('gr', None) # No GRUU in From header + self.to_uri = message.recipient.uri.as_sip_uri() + self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU + self.body = message.body + self.content_type = message.content_type + self._requests = set() + self._channel = coros.queue() + + @run_in_waitable_green_thread + def send(self): + lookup = DNSLookup() + settings = SIPSimpleSettings() + account = AccountManager().default_account + if account.sip.outbound_proxy is not None: + uri = SIPURI(host=account.sip.outbound_proxy.host, + port=account.sip.outbound_proxy.port, + parameters={'transport': account.sip.outbound_proxy.transport}) + else: + uri = self.to_uri + try: + routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() + except DNSLookupError: + msg = 'DNS lookup error while looking for %s proxy' % uri + log.warning(msg) + raise SIPMessageError(0, msg) + else: + route = routes.pop(0) + from_header = FromHeader(self.from_uri) + to_header = ToHeader(self.to_uri) + route_header = RouteHeader(route.get_uri()) + notification_center = NotificationCenter() + for chunk in chunks(self.body, 1000): + request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body) + notification_center.add_observer(self, sender=request) + self._requests.add(request) + request.send() + error = None + count = len(self._requests) + while count > 0: + notification = self._channel.wait() + if notification.name == 'SIPMessageDidFail': + error = (notification.data.code, notification.data.reason) + count -= 1 + self._requests.clear() + if error is not None: + raise SIPMessageError(*error) + + @run_in_twisted_thread + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_SIPMessageDidSucceed(self, notification): + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=notification.sender) + self._channel.send(notification) + + def _NH_SIPMessageDidFail(self, notification): + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=notification.sender) + self._channel.send(notification) + + diff --git a/sylk/applications/xmppgateway/logger.py b/sylk/applications/xmppgateway/logger.py new file mode 100644 index 0000000..a6fc2ec --- /dev/null +++ b/sylk/applications/xmppgateway/logger.py @@ -0,0 +1,103 @@ +# Copyright (C) 2010-2011 AG Projects. See LICENSE for details. +# + +""" +Logging support for XMPP traffic. +""" + +__all__ = ["Logger"] + +import os +import sys + +from application.python.queue import EventQueue +from application.system import makedirs +from sipsimple.configuration.settings import SIPSimpleSettings +from sipsimple.threading import run_in_thread + + +class Logger(object): + + def __init__(self): + self.stopped = False + self._xmpptrace_filename = None + self._xmpptrace_file = None + self._xmpptrace_error = False + self._xmpptrace_start_time = None + self._xmpptrace_packet_count = 0 + + self._log_directory_error = False + + def start(self): + # try to create the log directory + try: + self._init_log_directory() + except Exception: + pass + self.stopped = False + + def stop(self): + self.stopped = True + self._close_file() + + @run_in_thread('log-io') + def _close_file(self): + if self._xmpptrace_file is not None: + self._xmpptrace_file.close() + self._xmpptrace_file = None + + def msg(self, direction, timestamp, packet): + if self._xmpptrace_start_time is None: + self._xmpptrace_start_time = timestamp + self._xmpptrace_packet_count += 1 + buf = ["%s: Packet %d, +%s" % (direction, self._xmpptrace_packet_count, (timestamp - self._xmpptrace_start_time))] + buf.append(packet) + buf.append('--') + message = '\n'.join(buf) + self._process_log((message, timestamp)) + + @run_in_thread('log-io') + def _process_log(self, record): + if self.stopped: + return + message, timestamp = record + try: + self._init_log_file() + except Exception: + pass + else: + self._xmpptrace_file.write('%s [%s %d]: %s\n' % (timestamp, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) + self._xmpptrace_file.flush() + + def _init_log_directory(self): + settings = SIPSimpleSettings() + log_directory = settings.logs.directory.normalized + try: + makedirs(log_directory) + except Exception, e: + if not self._log_directory_error: + print "failed to create logs directory '%s': %s" % (log_directory, e) + self._log_directory_error = True + self._xmpptrace_error = True + raise + else: + self._log_directory_error = False + if self._xmpptrace_filename is None: + self._xmpptrace_filename = os.path.join(log_directory, 'xmpp_trace.txt') + self._xmpptrace_error = False + + def _init_log_file(self): + if self._xmpptrace_file is None: + self._init_log_directory() + filename = self._xmpptrace_filename + try: + self._xmpptrace_file = open(filename, 'a') + except Exception, e: + if not self._xmpptrace_error: + print "failed to create log file '%s': %s" % (filename, e) + self._xmpptrace_error = True + raise + else: + self._xmpptrace_error = False + + diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py new file mode 100644 index 0000000..959c49d --- /dev/null +++ b/sylk/applications/xmppgateway/presence.py @@ -0,0 +1,480 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +import hashlib +import os +import random +import urllib + +from application.notification import IObserver, NotificationCenter +from application.python import Null, limit +from application.python.descriptor import WriteOnceAttribute +from eventlet import coros, proc +from sipsimple.account import AccountManager +from sipsimple.configuration.settings import SIPSimpleSettings +from sipsimple.core import SIPURI, SIPCoreError +from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader +from sipsimple.core import Subscription +from sipsimple.lookup import DNSLookup, DNSLookupError +from sipsimple.payloads import pidf, rpid +from sipsimple.payloads import ParserError +from sipsimple.threading import run_in_twisted_thread +from sipsimple.threading.green import Command, run_in_green_thread +from sipsimple.util import TimestampedNotificationData +from time import time +from twisted.internet import reactor +from zope.interface import implements + +from sylk.applications import ApplicationLogger +from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource +from sylk.applications.xmppgateway.xmpp import AvailabilityPresence, XMPPSubscription, XMPPIncomingSubscription + +log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) + + +__all__ = ['S2XPresenceHandler', 'X2SPresenceHandler'] + + +class S2XPresenceHandler(object): + implements(IObserver) + + sip_identity = WriteOnceAttribute() + xmpp_identity = WriteOnceAttribute() + + def __init__(self, sip_identity, xmpp_identity): + self.ended = False + self._sip_subscriptions = [] + self._stanza_cache = {} + self._pidf = None + self._xmpp_subscription = None + self.sip_identity = sip_identity + self.xmpp_identity = xmpp_identity + + def start(self): + notification_center = NotificationCenter() + self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) + notification_center.add_observer(self, sender=self._xmpp_subscription) + self._xmpp_subscription.start() + notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self, data=TimestampedNotificationData()) + + def end(self): + if self.ended: + return + notification_center = NotificationCenter() + if self._xmpp_subscription is not None: + notification_center.remove_observer(self, sender=self._xmpp_subscription) + self._xmpp_subscription.end() + self._xmpp_subscription = None + while self._sip_subscriptions: + subscription = self._sip_subscriptions.pop() + notification_center.remove_observer(self, sender=subscription) + try: + subscription.end() + except SIPCoreError: + pass + self.ended = True + notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self, data=TimestampedNotificationData()) + + def add_sip_subscription(self, subscription): + self._sip_subscriptions.append(subscription) + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=subscription) + if self._xmpp_subscription.state == 'active': + pidf_doc = self._pidf + content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None + subscription.accept(content_type, pidf_doc) + else: + subscription.accept_pending() + + def _build_pidf(self): + if not self._stanza_cache: + self._pidf = None + return None + pidf_doc = pidf.PIDF(str(self.xmpp_identity)) + uri = self._stanza_cache.iterkeys().next() + person = pidf.Person("ID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()) + person.activities = rpid.Activities() + pidf_doc.add(person) + for stanza in self._stanza_cache.itervalues(): + if not stanza.available: + status = pidf.Status('closed') + status.extended = 'offline' + else: + status = pidf.Status('open') + if stanza.show == 'away': + status.extended = 'away' + if 'away' not in person.activities: + person.activities.add('away') + elif stanza.show == 'xa': + status.extended = 'extended-away' + if 'away' not in person.activities: + person.activities.add('away') + elif stanza.show == 'dnd': + stanza.extended = 'busy' + if 'busy' not in person.activities: + person.activities.add('busy') + else: + stanza.extended = 'available' + resource = encode_resource(stanza.sender.uri.resource) + device_id = pidf.DeviceID(resource) + tuple_id = "ID-%s" % resource + sip_uri = stanza.sender.uri.as_sip_uri() + sip_uri.parameters['gr'] = resource + contact = pidf.Contact(str(sip_uri)) + tuple = pidf.Service(tuple_id, status=status, contact=contact, device_id=device_id) + tuple.device_info = pidf.DeviceInfo(resource, description=urllib.quote(stanza.sender.uri.resource.encode('utf-8'))) + for lang, note in stanza.statuses.iteritems(): + tuple.notes.add(pidf.PIDFNote(note, lang=lang)) + pidf_doc.add(tuple) + if not person.activities: + person.activities = None + self._pidf = pidf_doc.toxml() + return self._pidf + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + @run_in_twisted_thread + def _NH_SIPIncomingSubscriptionDidEnd(self, notification): + notification_center = NotificationCenter() + subscription = notification.sender + notification_center.remove_observer(self, sender=subscription) + self._sip_subscriptions.remove(subscription) + if not self._sip_subscriptions: + self.end() + + def _NH_XMPPSubscriptionChangedState(self, notification): + if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active': + pidf_doc = self._pidf + content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None + for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'): + subscription.accept(content_type, pidf_doc) + + def _NH_XMPPSubscriptionGotNotify(self, notification): + stanza = notification.data.presence + self._stanza_cache[stanza.sender.uri] = stanza + pidf_doc = self._build_pidf() + for subscription in self._sip_subscriptions: + try: + subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc) + except SIPCoreError: + pass + if not stanza.available: + # Only inform once about this device being unavailable + del self._stanza_cache[stanza.sender.uri] + + def _NH_XMPPSubscriptionDidFail(self, notification): + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self._xmpp_subscription) + self._xmpp_subscription = None + self.end() + + _NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail + + +class InterruptSubscription(Exception): pass + +class TerminateSubscription(Exception): pass + +class SubscriptionError(Exception): + def __init__(self, error, timeout, refresh_interval=None, fatal=False): + self.error = error + self.refresh_interval = refresh_interval + self.timeout = timeout + self.fatal = fatal + +class SIPSubscriptionDidFail(Exception): + def __init__(self, data): + self.data = data + +class X2SPresenceHandler(object): + implements(IObserver) + + sip_identity = WriteOnceAttribute() + xmpp_identity = WriteOnceAttribute() + + def __init__(self, sip_identity, xmpp_identity): + self.ended = False + self.sip_identity = sip_identity + self.xmpp_identity = xmpp_identity + self.subscribed = False + self._command_proc = None + self._command_channel = coros.queue() + self._data_channel = coros.queue() + self._sip_subscription = None + self._sip_subscription_proc = None + self._sip_subscription_timer = None + self._xmpp_subscription = None + + def start(self): + notification_center = NotificationCenter() + self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) + notification_center.add_observer(self, sender=self._xmpp_subscription) + self._xmpp_subscription.start() + self._command_proc = proc.spawn(self._run) + self._subscribe_sip() + notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self, data=TimestampedNotificationData()) + + def end(self): + if self.ended: + return + notification_center = NotificationCenter() + if self._xmpp_subscription is not None: + notification_center.remove_observer(self, sender=self._xmpp_subscription) + self._xmpp_subscription.end() + self._xmpp_subscription = None + if self._sip_subscription: + self._unsubscribe_sip() + self.ended = True + notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self, data=TimestampedNotificationData()) + + @run_in_green_thread + def _subscribe_sip(self): + command = Command('subscribe') + self._command_channel.send(command) + + @run_in_green_thread + def _unsubscribe_sip(self): + command = Command('unsubscribe') + self._command_channel.send(command) + command.wait() + self._command_proc.kill() + self._command_proc = None + + def _run(self): + while True: + command = self._command_channel.wait() + handler = getattr(self, '_CH_%s' % command.name) + handler(command) + + def _CH_subscribe(self, command): + if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): + self._sip_subscription_timer.cancel() + self._sip_subscription_timer = None + if self._sip_subscription_proc is not None: + subscription_proc = self._sip_subscription_proc + subscription_proc.kill(InterruptSubscription) + subscription_proc.wait() + self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command) + + def _CH_unsubscribe(self, command): + # Cancel any timer which would restart the subscription process + if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): + self._sip_subscription_timer.cancel() + self._sip_subscription_timer = None + if self._sip_subscription_proc is not None: + subscription_proc = self._sip_subscription_proc + subscription_proc.kill(TerminateSubscription) + subscription_proc.wait() + self._sip_subscription_proc = None + command.signal() + + def _process_pidf(self, body): + try: + pidf_doc = pidf.PIDF.parse(body) + except ParserError, e: + log.warn('Error parsing PIDF document: %s' % e) + return + # Build XML stanzas out of PIDF documents + try: + person = (p for p in pidf_doc.persons).next() + except StopIteration: + person = None + for service in pidf_doc.services: + sip_contact = self.sip_identity.uri.as_sip_uri() + if service.device_info is not None: + sip_contact.parameters['gr'] = service.device_info.id + else: + sip_contact.parameters['gr'] = service.id # TODO: pseudorandom thing with AoR? + sender = Identity(FrozenURI.parse(sip_contact)) + if service.status.extended is not None: + available = service.status.extended != 'offline' + else: + available = service.status.basic == 'open' + stanza = AvailabilityPresence(sender, self.xmpp_identity, available) + for note in service.notes: + stanza.statuses[note.lang] = note + if service.status.extended is not None: + if service.status.extended == 'away': + stanza.show = 'away' + elif service.status.extended == 'extended-away': + stanza.show = 'xa' + elif service.status.extended == 'busy': + stanza.show = 'dnd' + elif person is not None and person.activities is not None: + activities = set(list(person.activities)) + if 'away' in activities: + stanza.show = 'away' + elif set(('holiday', 'vacation')).intersection(activities): + stanza.show = 'xa' + elif 'busy' in activities: + stanza.show = 'dnd' + self._xmpp_subscription.send_presence(stanza) + + def _sip_subscription_handler(self, command): + notification_center = NotificationCenter() + settings = SIPSimpleSettings() + + account = AccountManager().default_account + refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval + + try: + # Lookup routes + if account.sip.outbound_proxy is not None: + uri = SIPURI(host=account.sip.outbound_proxy.host, + port=account.sip.outbound_proxy.port, + parameters={'transport': account.sip.outbound_proxy.transport}) + else: + uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host) + lookup = DNSLookup() + try: + routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() + except DNSLookupError, e: + timeout = random.uniform(15, 30) + raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) + + timeout = time() + 30 + for route in routes: + remaining_time = timeout - time() + if remaining_time > 0: + try: + contact_uri = account.contact[route] + except KeyError: + continue + subscription_uri = self.sip_identity.uri.as_sip_uri() + subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()), + ToHeader(subscription_uri), + ContactHeader(contact_uri), + 'presence', + RouteHeader(route.get_uri()), + refresh=refresh_interval) + notification_center.add_observer(self, sender=subscription) + try: + subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) + except SIPCoreError: + notification_center.remove_observer(self, sender=subscription) + raise SubscriptionError(error='Internal error', timeout=5) + self._sip_subscription = subscription + try: + while True: + notification = self._data_channel.wait() + if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': + break + except SIPSubscriptionDidFail, e: + notification_center.remove_observer(self, sender=subscription) + self._sip_subscription = None + if e.data.code == 407: + # Authentication failed, so retry the subscription in some time + raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120)) + elif e.data.code == 403: + # Forbidden + raise SubscriptionError(error='Forbidden', timeout=None, fatal=True) + elif e.data.code == 423: + # Get the value of the Min-Expires header + if e.data.min_expires is not None and e.data.min_expires > refresh_interval: + interval = e.data.min_expires + else: + interval = None + raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval) + elif e.data.code in (405, 406, 489): + raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True) + elif e.data.code == 1400: + raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True) + else: + # Otherwise just try the next route + continue + else: + self.subscribed = True + command.signal() + break + else: + # There are no more routes to try, give up + raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True) + # At this point it is subscribed. Handle notifications and ending/failures. + try: + while True: + notification = self._data_channel.wait() + if notification.sender is not self._sip_subscription: + continue + if notification.name == 'SIPSubscriptionGotNotify': + if notification.data.event == 'presence': + subscription_state = notification.data.headers.get('Subscription-State').state + if subscription_state == 'active' and self._xmpp_subscription.state != 'active': + self._xmpp_subscription.accept() + elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active': + # The state went from active to pending, hide the presence state? + pass + if notification.data.body: + self._process_pidf(notification.data.body) + elif notification.name == 'SIPSubscriptionDidEnd': + break + except SIPSubscriptionDidFail, e: + if e.data.code == 0 and e.data.reason == 'rejected': + self._xmpp_subscription.reject() + else: + self._command_channel.send(Command('subscribe')) + notification_center.remove_observer(self, sender=self._sip_subscription) + except InterruptSubscription, e: + if not self.subscribed: + command.signal(e) + if self._sip_subscription is not None: + notification_center.remove_observer(self, sender=self._sip_subscription) + try: + self._sip_subscription.end(timeout=2) + except SIPCoreError: + pass + except TerminateSubscription, e: + if not self.subscribed: + command.signal(e) + if self._sip_subscription is not None: + try: + self._sip_subscription.end(timeout=2) + except SIPCoreError: + pass + else: + try: + while True: + notification = self._data_channel.wait() + if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd': + break + except SIPSubscriptionDidFail: + pass + finally: + notification_center.remove_observer(self, sender=self._sip_subscription) + except SubscriptionError, e: + if not e.fatal: + self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval)) + finally: + self.subscribed = False + self._sip_subscription = None + self._sip_subscription_proc = None + reactor.callLater(0, self.end) + + @run_in_twisted_thread + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_SIPSubscriptionDidStart(self, notification): + self._data_channel.send(notification) + + def _NH_SIPSubscriptionDidEnd(self, notification): + self._data_channel.send(notification) + + def _NH_SIPSubscriptionDidFail(self, notification): + self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) + + def _NH_SIPSubscriptionGotNotify(self, notification): + self._data_channel.send(notification) + + def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification): + self.end() + + def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification): + if self._sip_subscription.state.lower() == 'active': + self._xmpp_subscription.accept() + + _NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe + + diff --git a/sylk/applications/xmppgateway/xmpp.py b/sylk/applications/xmppgateway/xmpp.py new file mode 100644 index 0000000..00e40d5 --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp.py @@ -0,0 +1,861 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +from application.notification import IObserver, NotificationCenter +from application.python import Null +from application.python.descriptor import WriteOnceAttribute +from application.python.types import Singleton +from datetime import datetime +from eventlet import coros, proc +from sipsimple.util import Timestamp, TimestampedNotificationData +from twisted.internet import reactor +from twisted.words.protocols.jabber.jid import internJID as JID +from twisted.words.xish import domish +from wokkel.component import InternalComponent, Router as _Router +from wokkel.server import ServerService, XMPPS2SServerFactory as _XMPPS2SServerFactory, DeferredS2SClientFactory as _DeferredS2SClientFactory +from wokkel.xmppim import MessageProtocol as _MessageProtocol, PresenceProtocol as _PresenceProtocol +from zope.interface import implements + +from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig +from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI +from sylk.applications.xmppgateway.logger import Logger + + +CHATSTATES_NS = 'http://jabber.org/protocol/chatstates' +RECEIPTS_NS = 'urn:xmpp:receipts' +STANZAS_NS = 'urn:ietf:params:xml:ns:xmpp-stanzas' +XML_NS = 'http://www.w3.org/XML/1998/namespace' + +xmpp_logger = Logger() + +# Datatypes + +class BaseStanza(object): + _stanza_type = None # to be defined by subclasses + + def __init__(self, sender, recipient, id=None, type=None): + self.sender = sender + self.recipient = recipient + self.id = id + self.type = type + + def to_xml_element(self): + xml_element = domish.Element((None, self._stanza_type)) + xml_element['from'] = self.sender.uri.as_string('xmpp') + xml_element['to'] = self.recipient.uri.as_string('xmpp') + if self.type: + xml_element['type'] = self.type + if self.id is not None: + xml_element['id'] = self.id + return xml_element + +class MessageStanza(BaseStanza): + _stanza_type = 'message' + + def __init__(self, sender, recipient, type='', id=None, use_receipt=False): + super(MessageStanza, self).__init__(sender, recipient, id=id, type=type) + self.use_receipt = use_receipt + + def to_xml_element(self): + xml_element = super(MessageStanza, self).to_xml_element() + if self.id is not None and self.recipient.uri.resource is not None and self.use_receipt: + xml_element.addElement('request', defaultUri=RECEIPTS_NS) + return xml_element + +class NormalMessage(MessageStanza): + def __init__(self, sender, recipient, body, content_type='text/plain', id=None, use_receipt=False): + super(NormalMessage, self).__init__(sender, recipient, type='', id=id, use_receipt=use_receipt) + self.body = body + self.content_type = content_type + + def to_xml_element(self): + xml_element = super(NormalMessage, self).to_xml_element() + xml_element.addElement('body', content=self.body) # TODO: what if content type is text/html ? + return xml_element + +class ChatMessage(MessageStanza): + def __init__(self, sender, recipient, body, content_type='text/plain', id=None, use_receipt=True): + super(ChatMessage, self).__init__(sender, recipient, type='chat', id=id, use_receipt=use_receipt) + self.body = body + self.content_type = content_type + + def to_xml_element(self): + xml_element = super(ChatMessage, self).to_xml_element() + xml_element.addElement('active', defaultUri=CHATSTATES_NS) + xml_element.addElement('body', content=self.body) # TODO: what if content type is text/html ? + return xml_element + +class ChatComposingIndication(MessageStanza): + def __init__(self, sender, recipient, state, id=None, use_receipt=False): + super(ChatComposingIndication, self).__init__(sender, recipient, type='chat', id=id, use_receipt=use_receipt) + self.state = state + + def to_xml_element(self): + xml_element = super(ChatComposingIndication, self).to_xml_element() + xml_element.addElement(self.state, defaultUri=CHATSTATES_NS) + return xml_element + +class MessageReceipt(MessageStanza): + def __init__(self, sender, recipient, receipt_id, id=None): + super(MessageReceipt, self).__init__(sender, recipient, type='', id=id, use_receipt=False) + self.receipt_id = receipt_id + + def to_xml_element(self): + xml_element = super(MessageReceipt, self).to_xml_element() + receipt_element = domish.Element((RECEIPTS_NS, 'received')) + receipt_element['id'] = self.receipt_id + xml_element.addChild(receipt_element) + return xml_element + +class PresenceStanza(BaseStanza): + _stanza_type = 'presence' + + def __init__(self, sender, recipient, type=None, id=None): + super(PresenceStanza, self).__init__(sender, recipient, type=type, id=id) + +class AvailabilityPresence(PresenceStanza): + def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None): + super(AvailabilityPresence, self).__init__(sender, recipient, id=id) + self.available = available + self.show = show + self.priority = priority + self.statuses = statuses or {} + + def _get_available(self): + return self.__dict__['available'] + def _set_available(self, available): + if available: + self.type = None + else: + self.type = 'unavailable' + self.__dict__['available'] = available + available = property(_get_available, _set_available) + del _get_available, _set_available + + @property + def status(self): + status = self.statuses.get(None) + if status is None: + try: + status = self.statuses.itervalues().next() + except StopIteration: + pass + return status + + def to_xml_element(self): + xml_element = super(PresenceStanza, self).to_xml_element() + if self.available: + if self.show is not None: + xml_element.addElement('show', content=self.show) + if self.priority != 0: + xml_element.addElement('priority', content=unicode(self.priority)) + for lang, text in self.statuses.iteritems(): + status = xml_element.addElement('status', content=text) + if lang: + status[(XML_NS, 'lang')] = lang + return xml_element + +class SubscriptionPresence(PresenceStanza): + def __init__(self, sender, recipient, type, id=None): + super(SubscriptionPresence, self).__init__(sender, recipient, type=type, id=id) + +class ProbePresence(PresenceStanza): + def __init__(self, sender, recipient, id=None): + super(ProbePresence, self).__init__(sender, recipient, type='probe', id=id) + +class ErrorStanza(object): + """ + Stanza representing an error of another stanza. It's not a base stanza type on its own. + """ + + def __init__(self, stanza_type, sender, recipient, error_type, conditions, id=None): + self.stanza_type = stanza_type + self.sender = sender + self.recipient = recipient + self.id = id + self.conditions = conditions + self.error_type = error_type + + @classmethod + def from_stanza(cls, stanza, error_type, conditions): + # In error stanzas sender and recipient are swapped + return cls(stanza._stanza_type, stanza.recipient, stanza.sender, error_type, conditions, id=stanza.id) + + def to_xml_element(self): + xml_element = domish.Element((None, self.stanza_type)) + xml_element['from'] = self.sender.uri.as_string('xmpp') + xml_element['to'] = self.recipient.uri.as_string('xmpp') + xml_element['type'] = 'error' + if self.id is not None: + xml_element['id'] = self.id + error_element = domish.Element((None, 'error')) + error_element['type'] = self.error_type + [error_element.addChild(domish.Element((ns, condition))) for condition, ns in self.conditions] + xml_element.addChild(error_element) + return xml_element + + +# Protocols + +class MessageProtocol(_MessageProtocol): + messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' + + def _onMessage(self, message): + if message.handled: + return + messageType = message.getAttribute("type") + if messageType not in self.messageTypes: + message["type"] = 'normal' + self.onMessage(message) + + def onMessage(self, msg): + notification_center = NotificationCenter() + + sender_uri = FrozenURI.parse('xmpp:'+msg['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) + recipient = Identity(recipient_uri) + type = msg.getAttribute('type') + + if type == 'error': + error_type = msg.error['type'] + conditions = [(child.name, child.defaultUri) for child in msg.error.children] + error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg.getAttribute('id', None)) + notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=TimestampedNotificationData(error_message=error_message)) + return + + if type in (None, 'normal', 'chat') and msg.body is not None: + if msg.html is not None: + content_type = 'text/html' + body = msg.html.toXml() + else: + content_type = 'text/plain' + body = unicode(msg.body) + use_receipt = msg.request is not None and msg.request.defaultUri == RECEIPTS_NS + if type == 'chat': + message = ChatMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) + notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) + else: + message = NormalMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) + notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) + return + + if type == 'chat' and msg.body is None: + # Check if it's a composing indication + for attr in ('active', 'inactive', 'composing', 'paused', 'gone'): + attr_obj = getattr(msg, attr, None) + if attr_obj is not None and attr_obj.defaultUri == CHATSTATES_NS: + state = attr + break + else: + state = None + if state is not None: + composing_indication = ChatComposingIndication(sender, recipient, state, id=msg.getAttribute('id', None)) + notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=TimestampedNotificationData(composing_indication=composing_indication)) + return + + # Check if it's a receipt acknowledgement + if msg.body is None and msg.received is not None and msg.received.defaultUri == RECEIPTS_NS: + receipt_id = msg.getAttribute('id', None) + if receipt_id is not None: + receipt = MessageReceipt(sender, recipient, receipt_id) + notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=TimestampedNotificationData(receipt=receipt)) + + +class PresenceProtocol(_PresenceProtocol): + + def availableReceived(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + show = stanza.show + statuses = stanza.statuses + presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + + def unavailableReceived(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + + def _process_subscription_stanza(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + type = stanza.element.getAttribute('type') + presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + + def subscribedReceived(self, stanza): + self._process_subscription_stanza(stanza) + + def unsubscribedReceived(self, stanza): + self._process_subscription_stanza(stanza) + + def subscribeReceived(self, stanza): + self._process_subscription_stanza(stanza) + + def unsubscribeReceived(self, stanza): + self._process_subscription_stanza(stanza) + + def probeReceived(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + presence_stanza = ProbePresence(sender, recipient, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceProbe', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + + +# Sessions + +class XMPPChatSession(object): + local_identity = WriteOnceAttribute() + remote_identity = WriteOnceAttribute() + + def __init__(self, local_identity, remote_identity): + self.local_identity = local_identity + self.remote_identity = remote_identity + self.state = None + self.pending_receipts = {} + self.channel = coros.queue() + self._proc = None + self.xmpp_manager = XMPPManager() + + def start(self): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidStart', sender=self, data=TimestampedNotificationData()) + self._proc = proc.spawn(self._run) + self.state = 'started' + + def end(self): + self.send_composing_indication('gone') + self._clear_pending_receipts() + self._proc.kill() + self._proc = None + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + self.state = 'terminated' + + def send_message(self, body, content_type='text/plain', message_id=None, use_receipt=True): + message = ChatMessage(self.local_identity, self.remote_identity, body, content_type, id=message_id, use_receipt=use_receipt) + self.xmpp_manager.send_stanza(message) + if message_id is not None: + timer = reactor.callLater(30, self._receipt_timer_expired, message_id) + self.pending_receipts[message_id] = timer + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) + + def send_composing_indication(self, state, message_id=None, use_receipt=False): + message = ChatComposingIndication(self.local_identity, self.remote_identity, state, id=message_id, use_receipt=use_receipt) + self.xmpp_manager.send_stanza(message) + if message_id is not None: + timer = reactor.callLater(30, self._receipt_timer_expired, message_id) + self.pending_receipts[message_id] = timer + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) + + def send_receipt_acknowledgement(self, receipt_id): + message = MessageReceipt(self.local_identity, self.remote_identity, receipt_id) + self.xmpp_manager.send_stanza(message) + + def send_error(self, stanza, error_type, conditions): + message = ErrorStanza.from_stanza(stanza, error_type, conditions) + self.xmpp_manager.send_stanza(message) + + def _run(self): + notification_center = NotificationCenter() + while True: + item = self.channel.wait() + if isinstance(item, ChatMessage): + notification_center.post_notification('XMPPChatSessionGotMessage', sender=self, data=TimestampedNotificationData(message=item)) + elif isinstance(item, ChatComposingIndication): + if item.state == 'gone': + self._clear_pending_receipts() + notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='remote')) + self.state = 'terminated' + break + else: + notification_center.post_notification('XMPPChatSessionGotComposingIndication', sender=self, data=TimestampedNotificationData(message=item)) + elif isinstance(item, MessageReceipt): + if item.receipt_id in self.pending_receipts: + timer = self.pending_receipts.pop(item.receipt_id) + timer.cancel() + notification_center.post_notification('XMPPChatSessionDidDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.receipt_id)) + elif isinstance(item, ErrorStanza): + if item.id in self.pending_receipts: + timer = self.pending_receipts.pop(item.id) + timer.cancel() + # TODO: translate cause + notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.id, code=503, reason='Service Unavailable')) + self._proc = None + + def _receipt_timer_expired(self, message_id): + self.pending_receipts.pop(message_id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) + + def _clear_pending_receipts(self): + notification_center = NotificationCenter() + while self.pending_receipts: + message_id, timer = self.pending_receipts.popitem() + timer.cancel() + notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) + + +class XMPPChatSessionManager(object): + __metaclass__ = Singleton + implements(IObserver) + + def __init__(self): + self.sessions = {} + + def start(self): + notification_center = NotificationCenter() + notification_center.add_observer(self, name='XMPPChatSessionDidStart') + notification_center.add_observer(self, name='XMPPChatSessionDidEnd') + + def stop(self): + notification_center = NotificationCenter() + notification_center.remove_observer(self, name='XMPPChatSessionDidStart') + notification_center.remove_observer(self, name='XMPPChatSessionDidEnd') + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_XMPPChatSessionDidStart(self, notification): + session = notification.sender + self.sessions[(session.local_identity.uri, session.remote_identity.uri)] = session + + def _NH_XMPPChatSessionDidEnd(self, notification): + session = notification.sender + del self.sessions[(session.local_identity.uri, session.remote_identity.uri)] + + +# Subscriptions + +class XMPPSubscription(object): + local_identity = WriteOnceAttribute() + remote_identity = WriteOnceAttribute() + + def __init__(self, local_identity, remote_identity): + self.local_identity = local_identity + self.remote_identity = remote_identity + self.state = None + self.channel = coros.queue() + self._proc = None + self.xmpp_manager = XMPPManager() + + def _set_state(self, new_state): + prev_state = self.__dict__.get('state', None) + self.__dict__['state'] = new_state + if prev_state != new_state: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPSubscriptionChangedState', sender=self, data=TimestampedNotificationData(prev_state=prev_state, state=new_state)) + def _get_state(self): + return self.__dict__['state'] + state = property(_get_state, _set_state) + del _get_state, _set_state + + def start(self): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPSubscriptionDidStart', sender=self, data=TimestampedNotificationData()) + self._proc = proc.spawn(self._run) + self.subscribe() + + def end(self): + if self.state == 'terminated': + return + self._proc.kill() + self._proc = None + notification_center = NotificationCenter() + notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + self.state = 'terminated' + + def subscribe(self): + self.state = 'subscribe_sent' + stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribe') + self.xmpp_manager.send_stanza(stanza) + # If we are already subscribed we may not receive an answer, send a probe just in case + self._send_probe() + + def unsubscribe(self): + self.state = 'unsubscribe_sent' + stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribe') + self.xmpp_manager.send_stanza(stanza) + + def _send_probe(self): + self.state = 'subscribe_sent' + stanza = ProbePresence(self.local_identity, self.remote_identity) + self.xmpp_manager.send_stanza(stanza) + + def _run(self): + notification_center = NotificationCenter() + while True: + item = self.channel.wait() + if isinstance(item, AvailabilityPresence): + if self.state == 'subscribe_sent': + self.state == 'active' + notification_center.post_notification('XMPPSubscriptionGotNotify', sender=self, data=TimestampedNotificationData(presence=item)) + elif isinstance(item, SubscriptionPresence): + if self.state == 'subscribe_sent' and item.type == 'subscribed': + self.state = 'active' + elif item.type == 'unsubscribed': + prev_state = self.state + self.state = 'terminated' + if prev_state in ('active', 'unsubscribe_sent'): + notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=TimestampedNotificationData()) + else: + notification_center.post_notification('XMPPSubscriptionDidFail', sender=self, data=TimestampedNotificationData()) + break + self._proc = None + + +class XMPPIncomingSubscription(object): + local_identity = WriteOnceAttribute() + remote_identity = WriteOnceAttribute() + + def __init__(self, local_identity, remote_identity): + self.local_identity = local_identity + self.remote_identity = remote_identity + self.state = None + self.channel = coros.queue() + self._proc = None + self.xmpp_manager = XMPPManager() + + def _set_state(self, new_state): + prev_state = self.__dict__.get('state', None) + self.__dict__['state'] = new_state + if prev_state != new_state: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionChangedState', sender=self, data=TimestampedNotificationData(prev_state=prev_state, state=new_state)) + def _get_state(self): + return self.__dict__['state'] + state = property(_get_state, _set_state) + del _get_state, _set_state + + def start(self): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionDidStart', sender=self, data=TimestampedNotificationData()) + self._proc = proc.spawn(self._run) + + def end(self): + if self.state == 'terminated': + return + self.state = 'terminated' + self._proc.kill() + self._proc = None + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + + def accept(self): + self.state = 'active' + stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribed') + self.xmpp_manager.send_stanza(stanza) + + def reject(self): + self.state = 'terminating' + stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribed') + self.xmpp_manager.send_stanza(stanza) + self.end() + + def send_presence(self, stanza): + self.xmpp_manager.send_stanza(stanza) + + def _run(self): + notification_center = NotificationCenter() + while True: + item = self.channel.wait() + if isinstance(item, SubscriptionPresence): + if item.type == 'subscribe': + notification_center.post_notification('XMPPIncomingSubscriptionGotSubscribe', sender=self, data=TimestampedNotificationData()) + elif item.type == 'unsubscribe': + self.state = 'terminated' + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionGotUnsubscribe', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + break + elif isinstance(item, ProbePresence): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionGotProbe', sender=self, data=TimestampedNotificationData()) + self._proc = None + + +class XMPPSubscriptionManager(object): + __metaclass__ = Singleton + implements(IObserver) + + def __init__(self): + self.incoming_subscriptions = {} + self.outgoing_subscriptions = {} + + def start(self): + notification_center = NotificationCenter() + notification_center.add_observer(self, name='XMPPSubscriptionDidStart') + notification_center.add_observer(self, name='XMPPSubscriptionDidEnd') + notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidStart') + notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidEnd') + + def stop(self): + notification_center = NotificationCenter() + notification_center.remove_observer(self, name='XMPPSubscriptionDidStart') + notification_center.remove_observer(self, name='XMPPSubscriptionDidEnd') + notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidStart') + notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidEnd') + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_XMPPSubscriptionDidStart(self, notification): + subscription = notification.sender + self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription + + def _NH_XMPPSubscriptionDidEnd(self, notification): + subscription = notification.sender + del self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] + + def _NH_XMPPIncomingSubscriptionDidStart(self, notification): + subscription = notification.sender + self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription + + def _NH_XMPPIncomingSubscriptionDidEnd(self, notification): + subscription = notification.sender + del self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] + + +# Utility classes + +class Router(_Router): + def route(self, stanza): + """ + Route a stanza. (subclassed to avoid vebose logging) + + @param stanza: The stanza to be routed. + @type stanza: L{domish.Element}. + """ + destination = JID(stanza['to']) + + if destination.host in self.routes: + self.routes[destination.host].send(stanza) + else: + self.routes[None].send(stanza) + + +class XMPPS2SServerFactory(_XMPPS2SServerFactory): + def onConnectionMade(self, xs): + super(XMPPS2SServerFactory, self).onConnectionMade(xs) + + def logDataIn(buf): + buf = buf.strip() + if buf: + xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) + + def logDataOut(buf): + buf = buf.strip() + if buf: + xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) + + if XMPPGatewayConfig.trace_xmpp: + xs.rawDataInFn = logDataIn + xs.rawDataOutFn = logDataOut + + +class DeferredS2SClientFactory(_DeferredS2SClientFactory): + def onConnectionMade(self, xs): + super(DeferredS2SClientFactory, self).onConnectionMade(xs) + + def logDataIn(buf): + if buf: + xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) + + def logDataOut(buf): + if buf: + xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) + + if XMPPGatewayConfig.trace_xmpp: + xs.rawDataInFn = logDataIn + xs.rawDataOutFn = logDataOut + +# Patch Wokkel's DeferredS2SClientFactory to use our logger +import wokkel.server +wokkel.server.DeferredS2SClientFactory = DeferredS2SClientFactory +del wokkel.server + +# Manager + +class XMPPManager(object): + __metaclass__ = Singleton + implements(IObserver) + + def __init__(self): + config = XMPPGatewayConfig + + self.stopped = False + + router = Router() + self._server_service = ServerService(router) + self._server_service.domains = set(config.domains) + self._server_service.logTraffic = False # done manually + + self._s2s_factory = XMPPS2SServerFactory(self._server_service) + self._s2s_factory.logTraffic = False # done manually + + self._internal_component = InternalComponent(router) + self._internal_component.domains = set(config.domains) + + self._message_protocol = MessageProtocol() + self._message_protocol.setHandlerParent(self._internal_component) + + self._presence_protocol = PresenceProtocol() + self._presence_protocol.setHandlerParent(self._internal_component) + + self._s2s_listener = None + + self.chat_session_manager = XMPPChatSessionManager() + self.subscription_manager = XMPPSubscriptionManager() + + def start(self): + self.stopped = False + xmpp_logger.start() + config = XMPPGatewayConfig + self._s2s_listener = reactor.listenTCP(config.local_port, self._s2s_factory, interface=config.local_ip) + self.chat_session_manager.start() + self.subscription_manager.start() + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=self._internal_component) + self._internal_component.startService() + + def stop(self): + self.stopped = True + self._s2s_listener.stopListening() + self.subscription_manager.stop() + self.chat_session_manager.stop() + self._internal_component.stopService() + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self._internal_component) + xmpp_logger.stop() + + def send_stanza(self, stanza): + if self.stopped: + return + self._internal_component.send(stanza.to_xml_element()) + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + # Process message stanzas + + def _NH_XMPPGotChatMessage(self, notification): + message = notification.data.message + try: + session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)] + except KeyError: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data) + else: + session.channel.send(message) + + def _NH_XMPPGotNormalMessage(self, notification): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data) + + def _NH_XMPPGotComposingIndication(self, notification): + notification_center = NotificationCenter() + composing_indication = notification.data.composing_indication + try: + session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)] + except KeyError: + notification_center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data) + else: + session.channel.send(composing_indication) + + def _NH_XMPPGotErrorMessage(self, notification): + error_message = notification.data.error_message + try: + session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)] + except KeyError: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data) + else: + session.channel.send(error_message) + + def _NH_XMPPGotReceipt(self, notification): + receipt = notification.data.receipt + try: + session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)] + except KeyError: + pass + else: + session.channel.send(receipt) + + # Process presence stanzas + + def _NH_XMPPGotPresenceAvailability(self, notification): + stanza = notification.data.presence_stanza + if stanza.recipient.uri.resource is not None: + # Skip directed presence + return + sender_uri = stanza.sender.uri + sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) + try: + subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)] + except KeyError: + # Ignore incoming presence stanzas if there is no subscription + pass + else: + subscription.channel.send(stanza) + + def _NH_XMPPGotPresenceSubscriptionStatus(self, notification): + stanza = notification.data.presence_stanza + if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None: + # Skip directed presence + return + if stanza.type in ('subscribed', 'unsubscribed'): + try: + subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] + except KeyError: + pass + else: + subscription.channel.send(stanza) + elif stanza.type in ('subscribe', 'unsubscribe'): + try: + subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] + except KeyError: + if stanza.type == 'subscribe': + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + else: + subscription.channel.send(stanza) + + def _NH_XMPPGotPresenceProbe(self, notification): + stanza = notification.data.presence_stanza + if stanza.recipient.uri.resource is not None: + # Skip directed presence + return + sender_uri = stanza.sender.uri + sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) + try: + subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)] + except KeyError: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + else: + subscription.channel.send(stanza) + + diff --git a/xmppgateway.ini.sample b/xmppgateway.ini.sample new file mode 100644 index 0000000..50caa73 --- /dev/null +++ b/xmppgateway.ini.sample @@ -0,0 +1,23 @@ + +[general] +; The following settings are the default used by the software, uncomment +; them only if you want to make changes + +; trace_xmpp = False + +; IP address used for listening to XMPP connections; empty string or any means listen on interface used +; by the default route +; local_ip = + +; local_port = 5269 + +; Comma-separated list of domains for which this server is responsible +; domains = + +; If set to True (default) MSRP will be used to translate XMPP Instant Messaging, else SIP MESSAGE will be used +; Note: XMPP 'normal' messages (not chat messages) are always translated to SIP MESSAGE requests +; use_msrp_for_chat = True + +; Timeout to terminate a SIP session if no chat traffic was received +; sip_session_timeout = 600 +