diff --git a/sylk/applications/xmppgateway/__init__.py b/sylk/applications/xmppgateway/__init__.py index 645ac46..9013078 100644 --- a/sylk/applications/xmppgateway/__init__.py +++ b/sylk/applications/xmppgateway/__init__.py @@ -1,351 +1,352 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os from application.notification import IObserver, NotificationCenter from application.python import Null from sipsimple.core import SIPURI from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.applications import ISylkApplication, SylkApplication, ApplicationLogger from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource from sylk.applications.xmppgateway.im import SIPMessageSender, SIPMessageError, ChatSessionHandler from sylk.applications.xmppgateway.presence import S2XPresenceHandler, X2SPresenceHandler -from sylk.applications.xmppgateway.xmpp import ChatMessage, ChatComposingIndication, NormalMessage, XMPPChatSession from sylk.applications.xmppgateway.xmpp import XMPPManager +from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession +from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, NormalMessage log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) class XMPPGatewayApplication(object): __metaclass__ = SylkApplication implements(ISylkApplication, IObserver) __appname__ = 'xmppgateway' def __init__(self): self.xmpp_manager = XMPPManager() self.pending_sessions = {} self.chat_sessions = set() self.s2x_presence_subscriptions = {} self.x2s_presence_subscriptions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.xmpp_manager) self.xmpp_manager.start() def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.xmpp_manager) self.xmpp_manager.stop() def incoming_session(self, session): log.msg('New incoming session from %s' % session.remote_identity.uri) try: msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next() except StopIteration: session.reject(488, 'Only MSRP media is supported') return # Check domain if session.remote_identity.uri.host not in XMPPGatewayConfig.domains: session.reject(606, 'Not Acceptable') return # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) # Get URI representing the XMPP side request_uri = session._invitation.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): pass xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: pass else: # There is another pending session with same identifiers, can't accept this one session.reject(488) return sip_identity = Identity(sip_leg_uri, session.remote_identity.display_name) handler = ChatSessionHandler.new_from_sip_session(sip_identity, session) notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler if xmpp_leg_uri.resource is not None: # Incoming session target contained GRUU, so create XMPPChatSession immediately xmpp_session = XMPPChatSession(local_identity=handler.sip_identity, remote_identity=Identity(xmpp_leg_uri)) handler.xmpp_identity = xmpp_session.remote_identity handler.xmpp_session = xmpp_session def incoming_subscription(self, subscribe_request, data): if subscribe_request.event != 'presence': subscribe_request.reject(489) return # Check domain remote_identity_uri = data.headers['From'].uri if remote_identity_uri.host not in XMPPGatewayConfig.domains: subscribe_request.reject(606) return # Get URI representing the SIP side sip_leg_uri = FrozenURI(remote_identity_uri.user, remote_identity_uri.host) # Get URI representing the XMPP side request_uri = data.request_uri xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host) try: handler = self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: sip_identity = Identity(sip_leg_uri, data.headers['From'].display_name) xmpp_identity = Identity(xmpp_leg_uri) handler = S2XPresenceHandler(sip_identity, xmpp_identity) notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) handler.start() handler.add_sip_subscription(subscribe_request) def incoming_referral(self, refer_request, data): refer_request.reject(405) def incoming_sip_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null)[0] from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (content_type, from_header, to_header): message_request.answer(400) return log.msg('New incoming SIP MESSAGE from %s' % from_header.uri) # Check domain if from_header.uri.host not in XMPPGatewayConfig.domains: message_request.answer(606) return if content_type == 'message/cpim': try: cpim_message = CPIMMessage.parse(data.body) except CPIMParserError: message_request.answer(400) return else: body = cpim_message.body content_type = cpim_message.content_type sender = cpim_message.sender or from_header from_uri = sender.uri else: body = data.body from_uri = from_header.uri to_uri = str(to_header.uri) message_request.answer(200) if from_uri.parameters.get('gr', None) is None: from_uri = SIPURI.new(from_uri) from_uri.parameters['gr'] = generate_sylk_resource() sender = Identity(FrozenURI.parse(from_uri)) recipient = Identity(FrozenURI.parse(to_uri)) if content_type in ('text/plain', 'text/html'): if XMPPGatewayConfig.use_msrp_for_chat: message = NormalMessage(sender, recipient, body, content_type, use_receipt=False) self.xmpp_manager.send_stanza(message) else: message = ChatMessage(sender, recipient, body, content_type, use_receipt=False) self.xmpp_manager.send_stanza(message) elif content_type == IsComposingDocument.content_type: if not XMPPGatewayConfig.use_msrp_for_chat: try: msg = IsComposingMessage.parse(body) except ParserError: pass else: state = 'composing' if msg.state == 'active' else 'paused' message = ChatComposingIndication(sender, recipient, state, use_receipt=False) self.xmpp_manager.send_stanza(message) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Out of band XMPP stanza handling @run_in_green_thread def _NH_XMPPGotChatMessage(self, notification): # This notification is only processed here untill the ChatSessionHandler # has both (SIP and XMPP) sessions established message = notification.data.message sender = message.sender recipient = message.recipient if XMPPGatewayConfig.use_msrp_for_chat: if recipient.uri.resource is None: # If recipient resource is not set the session is started from # the XMPP side sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Not found, need to create a new handler and a outgoing SIP session xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) handler.xmpp_message_queue.append(message) else: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI(sender.uri.user, sender.uri.host) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI(recipient.uri.user, recipient.uri.host) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # It's a new XMPP session to a full JID, disregard the full JID and start a new SIP session to the bare JID xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) handler.xmpp_message_queue.append(message) else: # Found handle, create XMPP session and establish session session = XMPPChatSession(local_identity=recipient, remote_identity=sender) handler.xmpp_message_queue.append(message) handler.xmpp_identity = session.remote_identity handler.xmpp_session = session else: sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP MESSAGE: %s' % e) @run_in_green_thread def _NH_XMPPGotNormalMessage(self, notification): message = notification.data.message sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP MESSAGE: %s' % e) @run_in_green_thread def _NH_XMPPGotComposingIndication(self, notification): composing_indication = notification.data.composing_indication sender = composing_indication.sender recipient = composing_indication.recipient if not XMPPGatewayConfig.use_msrp_for_chat: state = 'active' if composing_indication.state == 'composing' else 'idle' body = IsComposingMessage(state=state, refresh=composing_indication.interval or 30).toxml() message = NormalMessage(sender, recipient, body, IsComposingDocument.content_type) sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP MESSAGE: %s' % e) def _NH_XMPPGotPresenceSubscriptionRequest(self, notification): stanza = notification.data.stanza # Disregard the resource part, the presence request could be a probe instead of a subscribe sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: handler = self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] except KeyError: xmpp_identity = stanza.sender xmpp_identity.uri = sender_uri_bare sip_identity = stanza.recipient handler = X2SPresenceHandler(sip_identity, xmpp_identity) notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) handler.start() # Chat session handling def _NH_ChatSessionDidStart(self, notification): handler = notification.sender log.msg('Chat session established sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) for k,v in self.pending_sessions.items(): if v is handler: del self.pending_sessions[k] break self.chat_sessions.add(handler) def _NH_ChatSessionDidEnd(self, notification): handler = notification.sender log.msg('Chat session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.chat_sessions.remove(handler) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=handler) def _NH_ChatSessionDidFail(self, notification): handler = notification.sender uris = None for k,v in self.pending_sessions.items(): if v is handler: uris = k del self.pending_sessions[k] break sip_uri, xmpp_uri = uris log.msg('Chat session failed sip:%s <--> xmpp:%s' % (sip_uri, xmpp_uri)) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=handler) # Presence handling def _NH_S2XPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence session established sip:%s --> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.s2x_presence_subscriptions[(handler.sip_identity.uri, handler.xmpp_identity.uri)] = handler def _NH_S2XPresenceHandlerDidEnd(self, notification): handler = notification.sender log.msg('Presence session ended sip:%s --> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.s2x_presence_subscriptions.pop((handler.sip_identity.uri, handler.xmpp_identity.uri), None) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=handler) def _NH_X2SPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_presence_subscriptions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler def _NH_X2SPresenceHandlerDidEnd(self, notification): handler = notification.sender log.msg('Presence session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_presence_subscriptions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=handler) diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py index 8ebeda0..bdea164 100644 --- a/sylk/applications/xmppgateway/im.py +++ b/sylk/applications/xmppgateway/im.py @@ -1,445 +1,446 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.descriptor import WriteOnceAttribute from collections import deque from eventlet import coros from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Message as SIPMessageRequest from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource -from sylk.applications.xmppgateway.xmpp import ChatMessage, XMPPChatSession from sylk.applications.xmppgateway.xmpp import XMPPManager +from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession +from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage from sylk.extensions import ChatStream from sylk.session import ServerSession log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) __all__ = ['ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError'] SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout class ChatSessionHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self): self.started = False self.ended = False self.sip_session = None self.msrp_stream = None self._sip_session_timer = None self.use_receipts = False self.xmpp_session = None self.xmpp_message_queue = deque() self._pending_msrp_chunks = {} self._pending_xmpp_stanzas = {} def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: notification_center = NotificationCenter() notification_center.post_notification('ChatSessionDidStart', sender=self, data=TimestampedNotificationData()) self._send_queued_messages() def _get_started(self): return self.__dict__['started'] started = property(_get_started, _set_started) del _get_started, _set_started def _set_xmpp_session(self, session): self.__dict__['xmpp_session'] = session if session is not None: # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) session.start() # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) def _get_xmpp_session(self): return self.__dict__['xmpp_session'] xmpp_session = property(_get_xmpp_session, _set_xmpp_session) del _get_xmpp_session, _set_xmpp_session @classmethod def new_from_sip_session(cls, sip_identity, session): instance = cls() instance.sip_identity = sip_identity instance._start_incoming_sip_session(session) return instance @classmethod def new_from_xmpp_stanza(cls, xmpp_identity, recipient): instance = cls() instance.xmpp_identity = xmpp_identity instance._start_outgoing_sip_session(recipient) return instance @run_in_green_thread def _start_incoming_sip_session(self, session): self.sip_session = session self.msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.accept([self.msrp_stream]) @run_in_green_thread def _start_outgoing_sip_session(self, target_uri): notification_center = NotificationCenter() # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = target_uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = AccountManager().default_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('ChatSessionDidFail', sender=self, data=TimestampedNotificationData()) return self.msrp_stream = ChatStream(account) route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self.sip_session = ServerSession(account) notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self.msrp_stream]) def end(self): if self.ended: return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.cancel() self._sip_session_timer = None notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session.end() self.sip_session = None self.msrp_stream = None if self.xmpp_session is not None: notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session.end() self.xmpp_session = None self.ended = True if self.started: notification_center.post_notification('ChatSessionDidEnd', sender=self, data=TimestampedNotificationData()) else: notification_center.post_notification('ChatSessionDidFail', sender=self, data=TimestampedNotificationData()) def _send_queued_messages(self): if self.xmpp_message_queue: while self.xmpp_message_queue: message = self.xmpp_message_queue.popleft() if message.content_type not in ('text/plain', 'text/html'): continue if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' sender_uri = message.sender.uri.as_sip_uri() sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8')) sender = CPIMIdentity(sender_uri) self.msrp_stream.send_message(message.body, message.content_type, local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _inactivity_timeout(self): log.msg("Ending SIP session %s due to incactivity" % self.sip_session._invitation.call_id) self.sip_session.end() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP session %s started" % notification.sender._invitation.call_id) self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout) if self.sip_session.direction == 'outgoing': # Time to set sip_identity and create the XMPPChatSession contact_uri = self.sip_session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = self.sip_session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name) session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) self.xmpp_session = session # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: if self.xmpp_session is not None: # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: # Try to wakeup XMPP clients sender = self.sip_identity tmp = self.sip_session.local_identity.uri recipient_uri = FrozenURI(tmp.user, tmp.host) recipient = Identity(recipient_uri) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, ' ', 'text/plain')) # Send queued messages self._send_queued_messages() def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP session %s ended" % notification.sender._invitation.call_id) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP session %s failed" % notification.sender._invitation.call_id) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionGotProposal(self, notification): self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream message = notification.data.message content_type = message.content_type.lower() if content_type in ('text/plain', 'text/html'): if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) chunk = notification.data.chunk if self.started: self.xmpp_session.send_message(message.body, message.content_type, message_id=chunk.message_id) if self.use_receipts: self._pending_msrp_chunks[chunk.message_id] = chunk else: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') else: sender = self.sip_identity recipient_uri = FrozenURI.parse(message.recipients[0].uri) recipient = Identity(recipient_uri, message.recipients[0].display_name) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, message.body, message.content_type)) self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_ChatStreamGotComposingIndication(self, notification): # Notification is sent by the MSRP stream if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) if not self.started: return state = None if notification.data.state == 'active': state = 'composing' elif notification.data.state == 'idle': state = 'paused' if state is not None: self.xmpp_session.send_composing_indication(state) def _NH_ChatStreamDidDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_ChatStreamDidNotDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_error(message, 'TODO', []) # TODO def _NH_XMPPChatSessionDidStart(self, notification): if self.sip_session is not None: # Session is now established on both ends self.started = True def _NH_XMPPChatSessionDidEnd(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session = None self.end() def _NH_XMPPChatSessionGotMessage(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': self._xmpp_message_queue.append(notification.data.message) return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.use_receipts = message.use_receipt if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' self._pending_xmpp_stanzas[message.id] = message self.msrp_stream.send_message(message.body, message.content_type, local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _NH_XMPPChatSessionGotComposingIndication(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message state = None if message.state == 'composing': state = 'active' elif message.state == 'paused': state = 'idle' if state is not None: sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.msrp_stream.send_composing_indication(state, 30, local_identity=sender) if message.use_receipt: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_XMPPChatSessionDidDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason) def chunks(text, size): for i in xrange(0, len(text), size): yield text[i:i+size] class SIPMessageError(Exception): def __init__(self, code, reason): Exception.__init__(self, reason) self.code = code self.reason = reason class SIPMessageSender(object): implements(IObserver) def __init__(self, message): self.from_uri = message.sender.uri.as_sip_uri() self.from_uri.parameters.pop('gr', None) # No GRUU in From header self.to_uri = message.recipient.uri.as_sip_uri() self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU self.body = message.body self.content_type = message.content_type self._requests = set() self._channel = coros.queue() @run_in_waitable_green_thread def send(self): lookup = DNSLookup() settings = SIPSimpleSettings() account = AccountManager().default_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: msg = 'DNS lookup error while looking for %s proxy' % uri log.warning(msg) raise SIPMessageError(0, msg) else: route = routes.pop(0) from_header = FromHeader(self.from_uri) to_header = ToHeader(self.to_uri) route_header = RouteHeader(route.get_uri()) notification_center = NotificationCenter() for chunk in chunks(self.body, 1000): request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body) notification_center.add_observer(self, sender=request) self._requests.add(request) request.send() error = None count = len(self._requests) while count > 0: notification = self._channel.wait() if notification.name == 'SIPMessageDidFail': error = (notification.data.code, notification.data.reason) count -= 1 self._requests.clear() if error is not None: raise SIPMessageError(*error) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPMessageDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) self._channel.send(notification) def _NH_SIPMessageDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) self._channel.send(notification) diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py index 5ba1ded..044897c 100644 --- a/sylk/applications/xmppgateway/presence.py +++ b/sylk/applications/xmppgateway/presence.py @@ -1,480 +1,481 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import hashlib import os import random import urllib from application.notification import IObserver, NotificationCenter from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute from eventlet import coros, proc from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Subscription from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import pidf, rpid from sipsimple.payloads import ParserError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import TimestampedNotificationData from time import time from twisted.internet import reactor from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource -from sylk.applications.xmppgateway.xmpp import AvailabilityPresence, XMPPSubscription, XMPPIncomingSubscription +from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence +from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscription, XMPPIncomingSubscription log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) __all__ = ['S2XPresenceHandler', 'X2SPresenceHandler'] class S2XPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self._sip_subscriptions = [] self._stanza_cache = {} self._pidf = None self._xmpp_subscription = None self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self, data=TimestampedNotificationData()) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None while self._sip_subscriptions: subscription = self._sip_subscriptions.pop() notification_center.remove_observer(self, sender=subscription) try: subscription.end() except SIPCoreError: pass self.ended = True notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self, data=TimestampedNotificationData()) def add_sip_subscription(self, subscription): self._sip_subscriptions.append(subscription) notification_center = NotificationCenter() notification_center.add_observer(self, sender=subscription) if self._xmpp_subscription.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None subscription.accept(content_type, pidf_doc) else: subscription.accept_pending() def _build_pidf(self): if not self._stanza_cache: self._pidf = None return None pidf_doc = pidf.PIDF(str(self.xmpp_identity)) uri = self._stanza_cache.iterkeys().next() person = pidf.Person("ID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()) person.activities = rpid.Activities() pidf_doc.add(person) for stanza in self._stanza_cache.itervalues(): if not stanza.available: status = pidf.Status('closed') status.extended = 'offline' else: status = pidf.Status('open') if stanza.show == 'away': status.extended = 'away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'xa': status.extended = 'extended-away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'dnd': stanza.extended = 'busy' if 'busy' not in person.activities: person.activities.add('busy') else: stanza.extended = 'available' resource = encode_resource(stanza.sender.uri.resource) tuple_id = "ID-%s" % resource sip_uri = stanza.sender.uri.as_sip_uri() sip_uri.parameters['gr'] = resource contact = pidf.Contact(str(sip_uri)) tuple = pidf.Service(tuple_id, status=status, contact=contact) tuple.add(pidf.DeviceID(resource)) tuple.device_info = pidf.DeviceInfo(resource, description=urllib.quote(stanza.sender.uri.resource.encode('utf-8'))) for lang, note in stanza.statuses.iteritems(): tuple.notes.add(pidf.PIDFNote(note, lang=lang)) pidf_doc.add(tuple) if not person.activities: person.activities = None self._pidf = pidf_doc.toxml() return self._pidf def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) @run_in_twisted_thread def _NH_SIPIncomingSubscriptionDidEnd(self, notification): notification_center = NotificationCenter() subscription = notification.sender notification_center.remove_observer(self, sender=subscription) self._sip_subscriptions.remove(subscription) if not self._sip_subscriptions: self.end() def _NH_XMPPSubscriptionChangedState(self, notification): if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'): subscription.accept(content_type, pidf_doc) def _NH_XMPPSubscriptionGotNotify(self, notification): stanza = notification.data.presence self._stanza_cache[stanza.sender.uri] = stanza pidf_doc = self._build_pidf() for subscription in self._sip_subscriptions: try: subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc) except SIPCoreError: pass if not stanza.available: # Only inform once about this device being unavailable del self._stanza_cache[stanza.sender.uri] def _NH_XMPPSubscriptionDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription = None self.end() _NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class SubscriptionError(Exception): def __init__(self, error, timeout, refresh_interval=None, fatal=False): self.error = error self.refresh_interval = refresh_interval self.timeout = timeout self.fatal = fatal class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class X2SPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._sip_subscription = None self._sip_subscription_proc = None self._sip_subscription_timer = None self._xmpp_subscription = None def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() self._command_proc = proc.spawn(self._run) self._subscribe_sip() notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self, data=TimestampedNotificationData()) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None if self._sip_subscription: self._unsubscribe_sip() self.ended = True notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self, data=TimestampedNotificationData()) @run_in_green_thread def _subscribe_sip(self): command = Command('subscribe') self._command_channel.send(command) @run_in_green_thread def _unsubscribe_sip(self): command = Command('unsubscribe') self._command_channel.send(command) command.wait() self._command_proc.kill() self._command_proc = None def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_subscribe(self, command): if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._sip_subscription_proc = None command.signal() def _process_pidf(self, body): try: pidf_doc = pidf.PIDF.parse(body) except ParserError, e: log.warn('Error parsing PIDF document: %s' % e) return # Build XML stanzas out of PIDF documents try: person = (p for p in pidf_doc.persons).next() except StopIteration: person = None for service in pidf_doc.services: sip_contact = self.sip_identity.uri.as_sip_uri() if service.device_info is not None: sip_contact.parameters['gr'] = service.device_info.id else: sip_contact.parameters['gr'] = service.id # TODO: pseudorandom thing with AoR? sender = Identity(FrozenURI.parse(sip_contact)) if service.status.extended is not None: available = service.status.extended != 'offline' else: available = service.status.basic == 'open' stanza = AvailabilityPresence(sender, self.xmpp_identity, available) for note in service.notes: stanza.statuses[note.lang] = note if service.status.extended is not None: if service.status.extended == 'away': stanza.show = 'away' elif service.status.extended == 'extended-away': stanza.show = 'xa' elif service.status.extended == 'busy': stanza.show = 'dnd' elif person is not None and person.activities is not None: activities = set(list(person.activities)) if 'away' in activities: stanza.show = 'away' elif set(('holiday', 'vacation')).intersection(activities): stanza.show = 'xa' elif 'busy' in activities: stanza.show = 'dnd' self._xmpp_subscription.send_presence(stanza) def _sip_subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() account = AccountManager().default_account refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval try: # Lookup routes if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[route] except KeyError: continue subscription_uri = self.sip_identity.uri.as_sip_uri() subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()), ToHeader(subscription_uri), ContactHeader(contact_uri), 'presence', RouteHeader(route.get_uri()), refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) raise SubscriptionError(error='Internal error', timeout=5) self._sip_subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail, e: notification_center.remove_observer(self, sender=subscription) self._sip_subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120)) elif e.data.code == 403: # Forbidden raise SubscriptionError(error='Forbidden', timeout=None, fatal=True) elif e.data.code == 423: # Get the value of the Min-Expires header if e.data.min_expires is not None and e.data.min_expires > refresh_interval: interval = e.data.min_expires else: interval = None raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval) elif e.data.code in (405, 406, 489): raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True) elif e.data.code == 1400: raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True) else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, give up raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._sip_subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'presence': subscription_state = notification.data.headers.get('Subscription-State').state if subscription_state == 'active' and self._xmpp_subscription.state != 'active': self._xmpp_subscription.accept() elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active': # The state went from active to pending, hide the presence state? pass if notification.data.body: self._process_pidf(notification.data.body) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail, e: if e.data.code == 0 and e.data.reason == 'rejected': self._xmpp_subscription.reject() else: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._sip_subscription) except InterruptSubscription, e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: notification_center.remove_observer(self, sender=self._sip_subscription) try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription, e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._sip_subscription) except SubscriptionError, e: if not e.fatal: self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval)) finally: self.subscribed = False self._sip_subscription = None self._sip_subscription_proc = None reactor.callLater(0, self.end) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification): self.end() def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification): if self._sip_subscription.state.lower() == 'active': self._xmpp_subscription.accept() _NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe diff --git a/sylk/applications/xmppgateway/xmpp.py b/sylk/applications/xmppgateway/xmpp.py deleted file mode 100644 index b6c9adb..0000000 --- a/sylk/applications/xmppgateway/xmpp.py +++ /dev/null @@ -1,868 +0,0 @@ -# Copyright (C) 2012 AG Projects. See LICENSE for details -# - -import os - -from application.notification import IObserver, NotificationCenter -from application.python import Null -from application.python.descriptor import WriteOnceAttribute -from application.python.types import Singleton -from datetime import datetime -from eventlet import coros, proc -from sipsimple.util import Timestamp, TimestampedNotificationData -from twisted.internet import reactor -from twisted.words.protocols.jabber.jid import internJID as JID -from twisted.words.xish import domish -from wokkel.component import InternalComponent, Router as _Router -from wokkel.server import ServerService, XMPPS2SServerFactory as _XMPPS2SServerFactory, DeferredS2SClientFactory as _DeferredS2SClientFactory -from wokkel.xmppim import MessageProtocol as _MessageProtocol, PresenceProtocol as _PresenceProtocol -from zope.interface import implements - -from sylk.applications import ApplicationLogger -from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig -from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI -from sylk.applications.xmppgateway.logger import Logger - -log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) - - -CHATSTATES_NS = 'http://jabber.org/protocol/chatstates' -RECEIPTS_NS = 'urn:xmpp:receipts' -STANZAS_NS = 'urn:ietf:params:xml:ns:xmpp-stanzas' -XML_NS = 'http://www.w3.org/XML/1998/namespace' - -xmpp_logger = Logger() - -# Datatypes - -class BaseStanza(object): - _stanza_type = None # to be defined by subclasses - - def __init__(self, sender, recipient, id=None, type=None): - self.sender = sender - self.recipient = recipient - self.id = id - self.type = type - - def to_xml_element(self): - xml_element = domish.Element((None, self._stanza_type)) - xml_element['from'] = self.sender.uri.as_string('xmpp') - xml_element['to'] = self.recipient.uri.as_string('xmpp') - if self.type: - xml_element['type'] = self.type - if self.id is not None: - xml_element['id'] = self.id - return xml_element - -class MessageStanza(BaseStanza): - _stanza_type = 'message' - - def __init__(self, sender, recipient, type='', id=None, use_receipt=False): - super(MessageStanza, self).__init__(sender, recipient, id=id, type=type) - self.use_receipt = use_receipt - - def to_xml_element(self): - xml_element = super(MessageStanza, self).to_xml_element() - if self.id is not None and self.recipient.uri.resource is not None and self.use_receipt: - xml_element.addElement('request', defaultUri=RECEIPTS_NS) - return xml_element - -class NormalMessage(MessageStanza): - def __init__(self, sender, recipient, body, content_type='text/plain', id=None, use_receipt=False): - super(NormalMessage, self).__init__(sender, recipient, type='', id=id, use_receipt=use_receipt) - self.body = body - self.content_type = content_type - - def to_xml_element(self): - xml_element = super(NormalMessage, self).to_xml_element() - xml_element.addElement('body', content=self.body) # TODO: what if content type is text/html ? - return xml_element - -class ChatMessage(MessageStanza): - def __init__(self, sender, recipient, body, content_type='text/plain', id=None, use_receipt=True): - super(ChatMessage, self).__init__(sender, recipient, type='chat', id=id, use_receipt=use_receipt) - self.body = body - self.content_type = content_type - - def to_xml_element(self): - xml_element = super(ChatMessage, self).to_xml_element() - xml_element.addElement('active', defaultUri=CHATSTATES_NS) - xml_element.addElement('body', content=self.body) # TODO: what if content type is text/html ? - return xml_element - -class ChatComposingIndication(MessageStanza): - def __init__(self, sender, recipient, state, id=None, use_receipt=False): - super(ChatComposingIndication, self).__init__(sender, recipient, type='chat', id=id, use_receipt=use_receipt) - self.state = state - - def to_xml_element(self): - xml_element = super(ChatComposingIndication, self).to_xml_element() - xml_element.addElement(self.state, defaultUri=CHATSTATES_NS) - return xml_element - -class MessageReceipt(MessageStanza): - def __init__(self, sender, recipient, receipt_id, id=None): - super(MessageReceipt, self).__init__(sender, recipient, type='', id=id, use_receipt=False) - self.receipt_id = receipt_id - - def to_xml_element(self): - xml_element = super(MessageReceipt, self).to_xml_element() - receipt_element = domish.Element((RECEIPTS_NS, 'received')) - receipt_element['id'] = self.receipt_id - xml_element.addChild(receipt_element) - return xml_element - -class PresenceStanza(BaseStanza): - _stanza_type = 'presence' - - def __init__(self, sender, recipient, type=None, id=None): - super(PresenceStanza, self).__init__(sender, recipient, type=type, id=id) - -class AvailabilityPresence(PresenceStanza): - def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None): - super(AvailabilityPresence, self).__init__(sender, recipient, id=id) - self.available = available - self.show = show - self.priority = priority - self.statuses = statuses or {} - - def _get_available(self): - return self.__dict__['available'] - def _set_available(self, available): - if available: - self.type = None - else: - self.type = 'unavailable' - self.__dict__['available'] = available - available = property(_get_available, _set_available) - del _get_available, _set_available - - @property - def status(self): - status = self.statuses.get(None) - if status is None: - try: - status = self.statuses.itervalues().next() - except StopIteration: - pass - return status - - def to_xml_element(self): - xml_element = super(PresenceStanza, self).to_xml_element() - if self.available: - if self.show is not None: - xml_element.addElement('show', content=self.show) - if self.priority != 0: - xml_element.addElement('priority', content=unicode(self.priority)) - for lang, text in self.statuses.iteritems(): - status = xml_element.addElement('status', content=text) - if lang: - status[(XML_NS, 'lang')] = lang - return xml_element - -class SubscriptionPresence(PresenceStanza): - def __init__(self, sender, recipient, type, id=None): - super(SubscriptionPresence, self).__init__(sender, recipient, type=type, id=id) - -class ProbePresence(PresenceStanza): - def __init__(self, sender, recipient, id=None): - super(ProbePresence, self).__init__(sender, recipient, type='probe', id=id) - -class ErrorStanza(object): - """ - Stanza representing an error of another stanza. It's not a base stanza type on its own. - """ - - def __init__(self, stanza_type, sender, recipient, error_type, conditions, id=None): - self.stanza_type = stanza_type - self.sender = sender - self.recipient = recipient - self.id = id - self.conditions = conditions - self.error_type = error_type - - @classmethod - def from_stanza(cls, stanza, error_type, conditions): - # In error stanzas sender and recipient are swapped - return cls(stanza._stanza_type, stanza.recipient, stanza.sender, error_type, conditions, id=stanza.id) - - def to_xml_element(self): - xml_element = domish.Element((None, self.stanza_type)) - xml_element['from'] = self.sender.uri.as_string('xmpp') - xml_element['to'] = self.recipient.uri.as_string('xmpp') - xml_element['type'] = 'error' - if self.id is not None: - xml_element['id'] = self.id - error_element = domish.Element((None, 'error')) - error_element['type'] = self.error_type - [error_element.addChild(domish.Element((ns, condition))) for condition, ns in self.conditions] - xml_element.addChild(error_element) - return xml_element - - -# Protocols - -class MessageProtocol(_MessageProtocol): - messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' - - def _onMessage(self, message): - if message.handled: - return - messageType = message.getAttribute("type") - if messageType not in self.messageTypes: - message["type"] = 'normal' - self.onMessage(message) - - def onMessage(self, msg): - notification_center = NotificationCenter() - - sender_uri = FrozenURI.parse('xmpp:'+msg['from']) - sender = Identity(sender_uri) - recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) - recipient = Identity(recipient_uri) - type = msg.getAttribute('type') - - if type == 'error': - error_type = msg.error['type'] - conditions = [(child.name, child.defaultUri) for child in msg.error.children] - error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg.getAttribute('id', None)) - notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=TimestampedNotificationData(error_message=error_message)) - return - - if type in (None, 'normal', 'chat') and msg.body is not None: - if msg.html is not None: - content_type = 'text/html' - body = msg.html.toXml() - else: - content_type = 'text/plain' - body = unicode(msg.body) - use_receipt = msg.request is not None and msg.request.defaultUri == RECEIPTS_NS - if type == 'chat': - message = ChatMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) - notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) - else: - message = NormalMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) - notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) - return - - if type == 'chat' and msg.body is None: - # Check if it's a composing indication - for attr in ('active', 'inactive', 'composing', 'paused', 'gone'): - attr_obj = getattr(msg, attr, None) - if attr_obj is not None and attr_obj.defaultUri == CHATSTATES_NS: - state = attr - break - else: - state = None - if state is not None: - composing_indication = ChatComposingIndication(sender, recipient, state, id=msg.getAttribute('id', None)) - notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=TimestampedNotificationData(composing_indication=composing_indication)) - return - - # Check if it's a receipt acknowledgement - if msg.body is None and msg.received is not None and msg.received.defaultUri == RECEIPTS_NS: - receipt_id = msg.getAttribute('id', None) - if receipt_id is not None: - receipt = MessageReceipt(sender, recipient, receipt_id) - notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=TimestampedNotificationData(receipt=receipt)) - - -class PresenceProtocol(_PresenceProtocol): - - def availableReceived(self, stanza): - sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) - sender = Identity(sender_uri) - recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) - recipient = Identity(recipient_uri) - id = stanza.element.getAttribute('id') - show = stanza.show - statuses = stanza.statuses - presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) - notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) - - def unavailableReceived(self, stanza): - sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) - sender = Identity(sender_uri) - recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) - recipient = Identity(recipient_uri) - id = stanza.element.getAttribute('id') - presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) - notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) - - def _process_subscription_stanza(self, stanza): - sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) - sender = Identity(sender_uri) - recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) - recipient = Identity(recipient_uri) - id = stanza.element.getAttribute('id') - type = stanza.element.getAttribute('type') - presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) - notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) - - def subscribedReceived(self, stanza): - self._process_subscription_stanza(stanza) - - def unsubscribedReceived(self, stanza): - self._process_subscription_stanza(stanza) - - def subscribeReceived(self, stanza): - self._process_subscription_stanza(stanza) - - def unsubscribeReceived(self, stanza): - self._process_subscription_stanza(stanza) - - def probeReceived(self, stanza): - sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) - sender = Identity(sender_uri) - recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) - recipient = Identity(recipient_uri) - id = stanza.element.getAttribute('id') - presence_stanza = ProbePresence(sender, recipient, id=id) - notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceProbe', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) - - -# Sessions - -class XMPPChatSession(object): - local_identity = WriteOnceAttribute() - remote_identity = WriteOnceAttribute() - - def __init__(self, local_identity, remote_identity): - self.local_identity = local_identity - self.remote_identity = remote_identity - self.state = None - self.pending_receipts = {} - self.channel = coros.queue() - self._proc = None - self.xmpp_manager = XMPPManager() - - def start(self): - notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidStart', sender=self, data=TimestampedNotificationData()) - self._proc = proc.spawn(self._run) - self.state = 'started' - - def end(self): - self.send_composing_indication('gone') - self._clear_pending_receipts() - self._proc.kill() - self._proc = None - notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) - self.state = 'terminated' - - def send_message(self, body, content_type='text/plain', message_id=None, use_receipt=True): - message = ChatMessage(self.local_identity, self.remote_identity, body, content_type, id=message_id, use_receipt=use_receipt) - self.xmpp_manager.send_stanza(message) - if message_id is not None: - timer = reactor.callLater(30, self._receipt_timer_expired, message_id) - self.pending_receipts[message_id] = timer - notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) - - def send_composing_indication(self, state, message_id=None, use_receipt=False): - message = ChatComposingIndication(self.local_identity, self.remote_identity, state, id=message_id, use_receipt=use_receipt) - self.xmpp_manager.send_stanza(message) - if message_id is not None: - timer = reactor.callLater(30, self._receipt_timer_expired, message_id) - self.pending_receipts[message_id] = timer - notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) - - def send_receipt_acknowledgement(self, receipt_id): - message = MessageReceipt(self.local_identity, self.remote_identity, receipt_id) - self.xmpp_manager.send_stanza(message) - - def send_error(self, stanza, error_type, conditions): - message = ErrorStanza.from_stanza(stanza, error_type, conditions) - self.xmpp_manager.send_stanza(message) - - def _run(self): - notification_center = NotificationCenter() - while True: - item = self.channel.wait() - if isinstance(item, ChatMessage): - notification_center.post_notification('XMPPChatSessionGotMessage', sender=self, data=TimestampedNotificationData(message=item)) - elif isinstance(item, ChatComposingIndication): - if item.state == 'gone': - self._clear_pending_receipts() - notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='remote')) - self.state = 'terminated' - break - else: - notification_center.post_notification('XMPPChatSessionGotComposingIndication', sender=self, data=TimestampedNotificationData(message=item)) - elif isinstance(item, MessageReceipt): - if item.receipt_id in self.pending_receipts: - timer = self.pending_receipts.pop(item.receipt_id) - timer.cancel() - notification_center.post_notification('XMPPChatSessionDidDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.receipt_id)) - elif isinstance(item, ErrorStanza): - if item.id in self.pending_receipts: - timer = self.pending_receipts.pop(item.id) - timer.cancel() - # TODO: translate cause - notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.id, code=503, reason='Service Unavailable')) - self._proc = None - - def _receipt_timer_expired(self, message_id): - self.pending_receipts.pop(message_id) - notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) - - def _clear_pending_receipts(self): - notification_center = NotificationCenter() - while self.pending_receipts: - message_id, timer = self.pending_receipts.popitem() - timer.cancel() - notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) - - -class XMPPChatSessionManager(object): - __metaclass__ = Singleton - implements(IObserver) - - def __init__(self): - self.sessions = {} - - def start(self): - notification_center = NotificationCenter() - notification_center.add_observer(self, name='XMPPChatSessionDidStart') - notification_center.add_observer(self, name='XMPPChatSessionDidEnd') - - def stop(self): - notification_center = NotificationCenter() - notification_center.remove_observer(self, name='XMPPChatSessionDidStart') - notification_center.remove_observer(self, name='XMPPChatSessionDidEnd') - - def handle_notification(self, notification): - handler = getattr(self, '_NH_%s' % notification.name, Null) - handler(notification) - - def _NH_XMPPChatSessionDidStart(self, notification): - session = notification.sender - self.sessions[(session.local_identity.uri, session.remote_identity.uri)] = session - - def _NH_XMPPChatSessionDidEnd(self, notification): - session = notification.sender - del self.sessions[(session.local_identity.uri, session.remote_identity.uri)] - - -# Subscriptions - -class XMPPSubscription(object): - local_identity = WriteOnceAttribute() - remote_identity = WriteOnceAttribute() - - def __init__(self, local_identity, remote_identity): - self.local_identity = local_identity - self.remote_identity = remote_identity - self.state = None - self.channel = coros.queue() - self._proc = None - self.xmpp_manager = XMPPManager() - - def _set_state(self, new_state): - prev_state = self.__dict__.get('state', None) - self.__dict__['state'] = new_state - if prev_state != new_state: - notification_center = NotificationCenter() - notification_center.post_notification('XMPPSubscriptionChangedState', sender=self, data=TimestampedNotificationData(prev_state=prev_state, state=new_state)) - def _get_state(self): - return self.__dict__['state'] - state = property(_get_state, _set_state) - del _get_state, _set_state - - def start(self): - notification_center = NotificationCenter() - notification_center.post_notification('XMPPSubscriptionDidStart', sender=self, data=TimestampedNotificationData()) - self._proc = proc.spawn(self._run) - self.subscribe() - - def end(self): - if self.state == 'terminated': - return - self._proc.kill() - self._proc = None - notification_center = NotificationCenter() - notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) - self.state = 'terminated' - - def subscribe(self): - self.state = 'subscribe_sent' - stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribe') - self.xmpp_manager.send_stanza(stanza) - # If we are already subscribed we may not receive an answer, send a probe just in case - self._send_probe() - - def unsubscribe(self): - self.state = 'unsubscribe_sent' - stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribe') - self.xmpp_manager.send_stanza(stanza) - - def _send_probe(self): - self.state = 'subscribe_sent' - stanza = ProbePresence(self.local_identity, self.remote_identity) - self.xmpp_manager.send_stanza(stanza) - - def _run(self): - notification_center = NotificationCenter() - while True: - item = self.channel.wait() - if isinstance(item, AvailabilityPresence): - if self.state == 'subscribe_sent': - self.state == 'active' - notification_center.post_notification('XMPPSubscriptionGotNotify', sender=self, data=TimestampedNotificationData(presence=item)) - elif isinstance(item, SubscriptionPresence): - if self.state == 'subscribe_sent' and item.type == 'subscribed': - self.state = 'active' - elif item.type == 'unsubscribed': - prev_state = self.state - self.state = 'terminated' - if prev_state in ('active', 'unsubscribe_sent'): - notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=TimestampedNotificationData()) - else: - notification_center.post_notification('XMPPSubscriptionDidFail', sender=self, data=TimestampedNotificationData()) - break - self._proc = None - - -class XMPPIncomingSubscription(object): - local_identity = WriteOnceAttribute() - remote_identity = WriteOnceAttribute() - - def __init__(self, local_identity, remote_identity): - self.local_identity = local_identity - self.remote_identity = remote_identity - self.state = None - self.channel = coros.queue() - self._proc = None - self.xmpp_manager = XMPPManager() - - def _set_state(self, new_state): - prev_state = self.__dict__.get('state', None) - self.__dict__['state'] = new_state - if prev_state != new_state: - notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionChangedState', sender=self, data=TimestampedNotificationData(prev_state=prev_state, state=new_state)) - def _get_state(self): - return self.__dict__['state'] - state = property(_get_state, _set_state) - del _get_state, _set_state - - def start(self): - notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionDidStart', sender=self, data=TimestampedNotificationData()) - self._proc = proc.spawn(self._run) - - def end(self): - if self.state == 'terminated': - return - self.state = 'terminated' - self._proc.kill() - self._proc = None - notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) - - def accept(self): - self.state = 'active' - stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribed') - self.xmpp_manager.send_stanza(stanza) - - def reject(self): - self.state = 'terminating' - stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribed') - self.xmpp_manager.send_stanza(stanza) - self.end() - - def send_presence(self, stanza): - self.xmpp_manager.send_stanza(stanza) - - def _run(self): - notification_center = NotificationCenter() - while True: - item = self.channel.wait() - if isinstance(item, SubscriptionPresence): - if item.type == 'subscribe': - notification_center.post_notification('XMPPIncomingSubscriptionGotSubscribe', sender=self, data=TimestampedNotificationData()) - elif item.type == 'unsubscribe': - self.state = 'terminated' - notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionGotUnsubscribe', sender=self, data=TimestampedNotificationData()) - notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) - break - elif isinstance(item, ProbePresence): - notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionGotProbe', sender=self, data=TimestampedNotificationData()) - self._proc = None - - -class XMPPSubscriptionManager(object): - __metaclass__ = Singleton - implements(IObserver) - - def __init__(self): - self.incoming_subscriptions = {} - self.outgoing_subscriptions = {} - - def start(self): - notification_center = NotificationCenter() - notification_center.add_observer(self, name='XMPPSubscriptionDidStart') - notification_center.add_observer(self, name='XMPPSubscriptionDidEnd') - notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidStart') - notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidEnd') - - def stop(self): - notification_center = NotificationCenter() - notification_center.remove_observer(self, name='XMPPSubscriptionDidStart') - notification_center.remove_observer(self, name='XMPPSubscriptionDidEnd') - notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidStart') - notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidEnd') - - def handle_notification(self, notification): - handler = getattr(self, '_NH_%s' % notification.name, Null) - handler(notification) - - def _NH_XMPPSubscriptionDidStart(self, notification): - subscription = notification.sender - self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription - - def _NH_XMPPSubscriptionDidEnd(self, notification): - subscription = notification.sender - del self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] - - def _NH_XMPPIncomingSubscriptionDidStart(self, notification): - subscription = notification.sender - self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription - - def _NH_XMPPIncomingSubscriptionDidEnd(self, notification): - subscription = notification.sender - del self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] - - -# Utility classes - -class Router(_Router): - def route(self, stanza): - """ - Route a stanza. (subclassed to avoid vebose logging) - - @param stanza: The stanza to be routed. - @type stanza: L{domish.Element}. - """ - destination = JID(stanza['to']) - - if destination.host in self.routes: - self.routes[destination.host].send(stanza) - else: - self.routes[None].send(stanza) - - -class XMPPS2SServerFactory(_XMPPS2SServerFactory): - def onConnectionMade(self, xs): - super(XMPPS2SServerFactory, self).onConnectionMade(xs) - - def logDataIn(buf): - buf = buf.strip() - if buf: - xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) - - def logDataOut(buf): - buf = buf.strip() - if buf: - xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) - - if XMPPGatewayConfig.trace_xmpp: - xs.rawDataInFn = logDataIn - xs.rawDataOutFn = logDataOut - - -class DeferredS2SClientFactory(_DeferredS2SClientFactory): - def onConnectionMade(self, xs): - super(DeferredS2SClientFactory, self).onConnectionMade(xs) - - def logDataIn(buf): - if buf: - xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) - - def logDataOut(buf): - if buf: - xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) - - if XMPPGatewayConfig.trace_xmpp: - xs.rawDataInFn = logDataIn - xs.rawDataOutFn = logDataOut - -# Patch Wokkel's DeferredS2SClientFactory to use our logger -import wokkel.server -wokkel.server.DeferredS2SClientFactory = DeferredS2SClientFactory -del wokkel.server - -# Manager - -class XMPPManager(object): - __metaclass__ = Singleton - implements(IObserver) - - def __init__(self): - config = XMPPGatewayConfig - - self.stopped = False - - router = Router() - self._server_service = ServerService(router) - self._server_service.domains = set(config.domains) - self._server_service.logTraffic = False # done manually - - self._s2s_factory = XMPPS2SServerFactory(self._server_service) - self._s2s_factory.logTraffic = False # done manually - - self._internal_component = InternalComponent(router) - self._internal_component.domains = set(config.domains) - - self._message_protocol = MessageProtocol() - self._message_protocol.setHandlerParent(self._internal_component) - - self._presence_protocol = PresenceProtocol() - self._presence_protocol.setHandlerParent(self._internal_component) - - self._s2s_listener = None - - self.chat_session_manager = XMPPChatSessionManager() - self.subscription_manager = XMPPSubscriptionManager() - - def start(self): - self.stopped = False - xmpp_logger.start() - config = XMPPGatewayConfig - self._s2s_listener = reactor.listenTCP(config.local_port, self._s2s_factory, interface=config.local_ip) - listen_address = self._s2s_listener.getHost() - log.msg("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port)) - self.chat_session_manager.start() - self.subscription_manager.start() - notification_center = NotificationCenter() - notification_center.add_observer(self, sender=self._internal_component) - self._internal_component.startService() - - def stop(self): - self.stopped = True - self._s2s_listener.stopListening() - self.subscription_manager.stop() - self.chat_session_manager.stop() - self._internal_component.stopService() - notification_center = NotificationCenter() - notification_center.remove_observer(self, sender=self._internal_component) - xmpp_logger.stop() - - def send_stanza(self, stanza): - if self.stopped: - return - self._internal_component.send(stanza.to_xml_element()) - - def handle_notification(self, notification): - handler = getattr(self, '_NH_%s' % notification.name, Null) - handler(notification) - - # Process message stanzas - - def _NH_XMPPGotChatMessage(self, notification): - message = notification.data.message - try: - session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)] - except KeyError: - notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data) - else: - session.channel.send(message) - - def _NH_XMPPGotNormalMessage(self, notification): - notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data) - - def _NH_XMPPGotComposingIndication(self, notification): - notification_center = NotificationCenter() - composing_indication = notification.data.composing_indication - try: - session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)] - except KeyError: - notification_center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data) - else: - session.channel.send(composing_indication) - - def _NH_XMPPGotErrorMessage(self, notification): - error_message = notification.data.error_message - try: - session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)] - except KeyError: - notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data) - else: - session.channel.send(error_message) - - def _NH_XMPPGotReceipt(self, notification): - receipt = notification.data.receipt - try: - session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)] - except KeyError: - pass - else: - session.channel.send(receipt) - - # Process presence stanzas - - def _NH_XMPPGotPresenceAvailability(self, notification): - stanza = notification.data.presence_stanza - if stanza.recipient.uri.resource is not None: - # Skip directed presence - return - sender_uri = stanza.sender.uri - sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) - try: - subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)] - except KeyError: - # Ignore incoming presence stanzas if there is no subscription - pass - else: - subscription.channel.send(stanza) - - def _NH_XMPPGotPresenceSubscriptionStatus(self, notification): - stanza = notification.data.presence_stanza - if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None: - # Skip directed presence - return - if stanza.type in ('subscribed', 'unsubscribed'): - try: - subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] - except KeyError: - pass - else: - subscription.channel.send(stanza) - elif stanza.type in ('subscribe', 'unsubscribe'): - try: - subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] - except KeyError: - if stanza.type == 'subscribe': - notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) - else: - subscription.channel.send(stanza) - - def _NH_XMPPGotPresenceProbe(self, notification): - stanza = notification.data.presence_stanza - if stanza.recipient.uri.resource is not None: - # Skip directed presence - return - sender_uri = stanza.sender.uri - sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) - try: - subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)] - except KeyError: - notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) - else: - subscription.channel.send(stanza) - - diff --git a/sylk/applications/xmppgateway/xmpp/__init__.py b/sylk/applications/xmppgateway/xmpp/__init__.py new file mode 100644 index 0000000..3db331d --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/__init__.py @@ -0,0 +1,252 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +import os + +from application.notification import IObserver, NotificationCenter +from application.python import Null +from application.python.types import Singleton +from datetime import datetime +from sipsimple.util import Timestamp, TimestampedNotificationData +from twisted.internet import reactor +from twisted.words.protocols.jabber.jid import internJID as JID +from wokkel.component import InternalComponent, Router as _Router +from wokkel.server import ServerService, XMPPS2SServerFactory as _XMPPS2SServerFactory, DeferredS2SClientFactory as _DeferredS2SClientFactory +from zope.interface import implements + +from sylk.applications import ApplicationLogger +from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig +from sylk.applications.xmppgateway.datatypes import FrozenURI +from sylk.applications.xmppgateway.logger import Logger +from sylk.applications.xmppgateway.xmpp.protocols import MessageProtocol, PresenceProtocol +from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager +from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscriptionManager + +log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) + + +xmpp_logger = Logger() + + +# Utility classes + +class Router(_Router): + def route(self, stanza): + """ + Route a stanza. (subclassed to avoid vebose logging) + + @param stanza: The stanza to be routed. + @type stanza: L{domish.Element}. + """ + destination = JID(stanza['to']) + + if destination.host in self.routes: + self.routes[destination.host].send(stanza) + else: + self.routes[None].send(stanza) + + +class XMPPS2SServerFactory(_XMPPS2SServerFactory): + def onConnectionMade(self, xs): + super(XMPPS2SServerFactory, self).onConnectionMade(xs) + + def logDataIn(buf): + buf = buf.strip() + if buf: + xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) + + def logDataOut(buf): + buf = buf.strip() + if buf: + xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) + + if XMPPGatewayConfig.trace_xmpp: + xs.rawDataInFn = logDataIn + xs.rawDataOutFn = logDataOut + + +class DeferredS2SClientFactory(_DeferredS2SClientFactory): + def onConnectionMade(self, xs): + super(DeferredS2SClientFactory, self).onConnectionMade(xs) + + def logDataIn(buf): + if buf: + xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) + + def logDataOut(buf): + if buf: + xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) + + if XMPPGatewayConfig.trace_xmpp: + xs.rawDataInFn = logDataIn + xs.rawDataOutFn = logDataOut + +# Patch Wokkel's DeferredS2SClientFactory to use our logger +import wokkel.server +wokkel.server.DeferredS2SClientFactory = DeferredS2SClientFactory +del wokkel.server + +# Manager + +class XMPPManager(object): + __metaclass__ = Singleton + implements(IObserver) + + def __init__(self): + config = XMPPGatewayConfig + + self.stopped = False + + router = Router() + self._server_service = ServerService(router) + self._server_service.domains = set(config.domains) + self._server_service.logTraffic = False # done manually + + self._s2s_factory = XMPPS2SServerFactory(self._server_service) + self._s2s_factory.logTraffic = False # done manually + + self._internal_component = InternalComponent(router) + self._internal_component.domains = set(config.domains) + + self._message_protocol = MessageProtocol() + self._message_protocol.setHandlerParent(self._internal_component) + + self._presence_protocol = PresenceProtocol() + self._presence_protocol.setHandlerParent(self._internal_component) + + self._s2s_listener = None + + self.chat_session_manager = XMPPChatSessionManager() + self.subscription_manager = XMPPSubscriptionManager() + + def start(self): + self.stopped = False + xmpp_logger.start() + config = XMPPGatewayConfig + self._s2s_listener = reactor.listenTCP(config.local_port, self._s2s_factory, interface=config.local_ip) + listen_address = self._s2s_listener.getHost() + log.msg("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port)) + self.chat_session_manager.start() + self.subscription_manager.start() + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=self._internal_component) + self._internal_component.startService() + + def stop(self): + self.stopped = True + self._s2s_listener.stopListening() + self.subscription_manager.stop() + self.chat_session_manager.stop() + self._internal_component.stopService() + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self._internal_component) + xmpp_logger.stop() + + def send_stanza(self, stanza): + if self.stopped: + return + self._internal_component.send(stanza.to_xml_element()) + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + # Process message stanzas + + def _NH_XMPPGotChatMessage(self, notification): + message = notification.data.message + try: + session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)] + except KeyError: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data) + else: + session.channel.send(message) + + def _NH_XMPPGotNormalMessage(self, notification): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data) + + def _NH_XMPPGotComposingIndication(self, notification): + notification_center = NotificationCenter() + composing_indication = notification.data.composing_indication + try: + session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)] + except KeyError: + notification_center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data) + else: + session.channel.send(composing_indication) + + def _NH_XMPPGotErrorMessage(self, notification): + error_message = notification.data.error_message + try: + session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)] + except KeyError: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data) + else: + session.channel.send(error_message) + + def _NH_XMPPGotReceipt(self, notification): + receipt = notification.data.receipt + try: + session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)] + except KeyError: + pass + else: + session.channel.send(receipt) + + # Process presence stanzas + + def _NH_XMPPGotPresenceAvailability(self, notification): + stanza = notification.data.presence_stanza + if stanza.recipient.uri.resource is not None: + # Skip directed presence + return + sender_uri = stanza.sender.uri + sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) + try: + subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)] + except KeyError: + # Ignore incoming presence stanzas if there is no subscription + pass + else: + subscription.channel.send(stanza) + + def _NH_XMPPGotPresenceSubscriptionStatus(self, notification): + stanza = notification.data.presence_stanza + if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None: + # Skip directed presence + return + if stanza.type in ('subscribed', 'unsubscribed'): + try: + subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] + except KeyError: + pass + else: + subscription.channel.send(stanza) + elif stanza.type in ('subscribe', 'unsubscribe'): + try: + subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] + except KeyError: + if stanza.type == 'subscribe': + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + else: + subscription.channel.send(stanza) + + def _NH_XMPPGotPresenceProbe(self, notification): + stanza = notification.data.presence_stanza + if stanza.recipient.uri.resource is not None: + # Skip directed presence + return + sender_uri = stanza.sender.uri + sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) + try: + subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)] + except KeyError: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + else: + subscription.channel.send(stanza) + diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py new file mode 100644 index 0000000..3756d07 --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/protocols.py @@ -0,0 +1,137 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +from application.notification import NotificationCenter +from sipsimple.util import TimestampedNotificationData +from wokkel.xmppim import MessageProtocol as _MessageProtocol, PresenceProtocol as _PresenceProtocol + +from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI +from sylk.applications.xmppgateway.xmpp.stanzas import NormalMessage, MessageReceipt, ChatMessage, \ + ChatComposingIndication, ErrorStanza, AvailabilityPresence, SubscriptionPresence, ProbePresence +from sylk.applications.xmppgateway.xmpp.stanzas import RECEIPTS_NS, CHATSTATES_NS + + +class MessageProtocol(_MessageProtocol): + messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' + + def _onMessage(self, message): + if message.handled: + return + messageType = message.getAttribute("type") + if messageType not in self.messageTypes: + message["type"] = 'normal' + self.onMessage(message) + + def onMessage(self, msg): + notification_center = NotificationCenter() + + sender_uri = FrozenURI.parse('xmpp:'+msg['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) + recipient = Identity(recipient_uri) + type = msg.getAttribute('type') + + if type == 'error': + error_type = msg.error['type'] + conditions = [(child.name, child.defaultUri) for child in msg.error.children] + error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg.getAttribute('id', None)) + notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=TimestampedNotificationData(error_message=error_message)) + return + + if type in (None, 'normal', 'chat') and msg.body is not None: + if msg.html is not None: + content_type = 'text/html' + body = msg.html.toXml() + else: + content_type = 'text/plain' + body = unicode(msg.body) + use_receipt = msg.request is not None and msg.request.defaultUri == RECEIPTS_NS + if type == 'chat': + message = ChatMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) + notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) + else: + message = NormalMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) + notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) + return + + if type == 'chat' and msg.body is None: + # Check if it's a composing indication + for attr in ('active', 'inactive', 'composing', 'paused', 'gone'): + attr_obj = getattr(msg, attr, None) + if attr_obj is not None and attr_obj.defaultUri == CHATSTATES_NS: + state = attr + break + else: + state = None + if state is not None: + composing_indication = ChatComposingIndication(sender, recipient, state, id=msg.getAttribute('id', None)) + notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=TimestampedNotificationData(composing_indication=composing_indication)) + return + + # Check if it's a receipt acknowledgement + if msg.body is None and msg.received is not None and msg.received.defaultUri == RECEIPTS_NS: + receipt_id = msg.getAttribute('id', None) + if receipt_id is not None: + receipt = MessageReceipt(sender, recipient, receipt_id) + notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=TimestampedNotificationData(receipt=receipt)) + + +class PresenceProtocol(_PresenceProtocol): + + def availableReceived(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + show = stanza.show + statuses = stanza.statuses + presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + + def unavailableReceived(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + + def _process_subscription_stanza(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + type = stanza.element.getAttribute('type') + presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + + def subscribedReceived(self, stanza): + self._process_subscription_stanza(stanza) + + def unsubscribedReceived(self, stanza): + self._process_subscription_stanza(stanza) + + def subscribeReceived(self, stanza): + self._process_subscription_stanza(stanza) + + def unsubscribeReceived(self, stanza): + self._process_subscription_stanza(stanza) + + def probeReceived(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + presence_stanza = ProbePresence(sender, recipient, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPGotPresenceProbe', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + + + diff --git a/sylk/applications/xmppgateway/xmpp/session.py b/sylk/applications/xmppgateway/xmpp/session.py new file mode 100644 index 0000000..e80eb09 --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/session.py @@ -0,0 +1,141 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +from application.notification import IObserver, NotificationCenter +from application.python import Null +from application.python.descriptor import WriteOnceAttribute +from application.python.types import Singleton +from eventlet import coros, proc +from sipsimple.util import TimestampedNotificationData +from twisted.internet import reactor +from zope.interface import implements + +from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, MessageReceipt, ErrorStanza + +__all__ = ['XMPPChatSession', 'XMPPChatSessionManager'] + + +class XMPPChatSession(object): + local_identity = WriteOnceAttribute() + remote_identity = WriteOnceAttribute() + + def __init__(self, local_identity, remote_identity): + self.local_identity = local_identity + self.remote_identity = remote_identity + self.state = None + self.pending_receipts = {} + self.channel = coros.queue() + self._proc = None + from sylk.applications.xmppgateway.xmpp import XMPPManager + self.xmpp_manager = XMPPManager() + + def start(self): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidStart', sender=self, data=TimestampedNotificationData()) + self._proc = proc.spawn(self._run) + self.state = 'started' + + def end(self): + self.send_composing_indication('gone') + self._clear_pending_receipts() + self._proc.kill() + self._proc = None + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + self.state = 'terminated' + + def send_message(self, body, content_type='text/plain', message_id=None, use_receipt=True): + message = ChatMessage(self.local_identity, self.remote_identity, body, content_type, id=message_id, use_receipt=use_receipt) + self.xmpp_manager.send_stanza(message) + if message_id is not None: + timer = reactor.callLater(30, self._receipt_timer_expired, message_id) + self.pending_receipts[message_id] = timer + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) + + def send_composing_indication(self, state, message_id=None, use_receipt=False): + message = ChatComposingIndication(self.local_identity, self.remote_identity, state, id=message_id, use_receipt=use_receipt) + self.xmpp_manager.send_stanza(message) + if message_id is not None: + timer = reactor.callLater(30, self._receipt_timer_expired, message_id) + self.pending_receipts[message_id] = timer + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) + + def send_receipt_acknowledgement(self, receipt_id): + message = MessageReceipt(self.local_identity, self.remote_identity, receipt_id) + self.xmpp_manager.send_stanza(message) + + def send_error(self, stanza, error_type, conditions): + message = ErrorStanza.from_stanza(stanza, error_type, conditions) + self.xmpp_manager.send_stanza(message) + + def _run(self): + notification_center = NotificationCenter() + while True: + item = self.channel.wait() + if isinstance(item, ChatMessage): + notification_center.post_notification('XMPPChatSessionGotMessage', sender=self, data=TimestampedNotificationData(message=item)) + elif isinstance(item, ChatComposingIndication): + if item.state == 'gone': + self._clear_pending_receipts() + notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='remote')) + self.state = 'terminated' + break + else: + notification_center.post_notification('XMPPChatSessionGotComposingIndication', sender=self, data=TimestampedNotificationData(message=item)) + elif isinstance(item, MessageReceipt): + if item.receipt_id in self.pending_receipts: + timer = self.pending_receipts.pop(item.receipt_id) + timer.cancel() + notification_center.post_notification('XMPPChatSessionDidDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.receipt_id)) + elif isinstance(item, ErrorStanza): + if item.id in self.pending_receipts: + timer = self.pending_receipts.pop(item.id) + timer.cancel() + # TODO: translate cause + notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.id, code=503, reason='Service Unavailable')) + self._proc = None + + def _receipt_timer_expired(self, message_id): + self.pending_receipts.pop(message_id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) + + def _clear_pending_receipts(self): + notification_center = NotificationCenter() + while self.pending_receipts: + message_id, timer = self.pending_receipts.popitem() + timer.cancel() + notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) + + +class XMPPChatSessionManager(object): + __metaclass__ = Singleton + implements(IObserver) + + def __init__(self): + self.sessions = {} + + def start(self): + notification_center = NotificationCenter() + notification_center.add_observer(self, name='XMPPChatSessionDidStart') + notification_center.add_observer(self, name='XMPPChatSessionDidEnd') + + def stop(self): + notification_center = NotificationCenter() + notification_center.remove_observer(self, name='XMPPChatSessionDidStart') + notification_center.remove_observer(self, name='XMPPChatSessionDidEnd') + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_XMPPChatSessionDidStart(self, notification): + session = notification.sender + self.sessions[(session.local_identity.uri, session.remote_identity.uri)] = session + + def _NH_XMPPChatSessionDidEnd(self, notification): + session = notification.sender + del self.sessions[(session.local_identity.uri, session.remote_identity.uri)] + diff --git a/sylk/applications/xmppgateway/xmpp/stanzas.py b/sylk/applications/xmppgateway/xmpp/stanzas.py new file mode 100644 index 0000000..7fcd131 --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/stanzas.py @@ -0,0 +1,187 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +from twisted.words.xish import domish + + +CHATSTATES_NS = 'http://jabber.org/protocol/chatstates' +RECEIPTS_NS = 'urn:xmpp:receipts' +STANZAS_NS = 'urn:ietf:params:xml:ns:xmpp-stanzas' +XML_NS = 'http://www.w3.org/XML/1998/namespace' + + +class BaseStanza(object): + _stanza_type = None # to be defined by subclasses + + def __init__(self, sender, recipient, id=None, type=None): + self.sender = sender + self.recipient = recipient + self.id = id + self.type = type + + def to_xml_element(self): + xml_element = domish.Element((None, self._stanza_type)) + xml_element['from'] = self.sender.uri.as_string('xmpp') + xml_element['to'] = self.recipient.uri.as_string('xmpp') + if self.type: + xml_element['type'] = self.type + if self.id is not None: + xml_element['id'] = self.id + return xml_element + + +class MessageStanza(BaseStanza): + _stanza_type = 'message' + + def __init__(self, sender, recipient, type='', id=None, use_receipt=False): + super(MessageStanza, self).__init__(sender, recipient, id=id, type=type) + self.use_receipt = use_receipt + + def to_xml_element(self): + xml_element = super(MessageStanza, self).to_xml_element() + if self.id is not None and self.recipient.uri.resource is not None and self.use_receipt: + xml_element.addElement('request', defaultUri=RECEIPTS_NS) + return xml_element + + +class NormalMessage(MessageStanza): + def __init__(self, sender, recipient, body, content_type='text/plain', id=None, use_receipt=False): + super(NormalMessage, self).__init__(sender, recipient, type='', id=id, use_receipt=use_receipt) + self.body = body + self.content_type = content_type + + def to_xml_element(self): + xml_element = super(NormalMessage, self).to_xml_element() + xml_element.addElement('body', content=self.body) # TODO: what if content type is text/html ? + return xml_element + + +class ChatMessage(MessageStanza): + def __init__(self, sender, recipient, body, content_type='text/plain', id=None, use_receipt=True): + super(ChatMessage, self).__init__(sender, recipient, type='chat', id=id, use_receipt=use_receipt) + self.body = body + self.content_type = content_type + + def to_xml_element(self): + xml_element = super(ChatMessage, self).to_xml_element() + xml_element.addElement('active', defaultUri=CHATSTATES_NS) + xml_element.addElement('body', content=self.body) # TODO: what if content type is text/html ? + return xml_element + + +class ChatComposingIndication(MessageStanza): + def __init__(self, sender, recipient, state, id=None, use_receipt=False): + super(ChatComposingIndication, self).__init__(sender, recipient, type='chat', id=id, use_receipt=use_receipt) + self.state = state + + def to_xml_element(self): + xml_element = super(ChatComposingIndication, self).to_xml_element() + xml_element.addElement(self.state, defaultUri=CHATSTATES_NS) + return xml_element + + +class MessageReceipt(MessageStanza): + def __init__(self, sender, recipient, receipt_id, id=None): + super(MessageReceipt, self).__init__(sender, recipient, type='', id=id, use_receipt=False) + self.receipt_id = receipt_id + + def to_xml_element(self): + xml_element = super(MessageReceipt, self).to_xml_element() + receipt_element = domish.Element((RECEIPTS_NS, 'received')) + receipt_element['id'] = self.receipt_id + xml_element.addChild(receipt_element) + return xml_element + + +class PresenceStanza(BaseStanza): + _stanza_type = 'presence' + + def __init__(self, sender, recipient, type=None, id=None): + super(PresenceStanza, self).__init__(sender, recipient, type=type, id=id) + + +class AvailabilityPresence(PresenceStanza): + def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None): + super(AvailabilityPresence, self).__init__(sender, recipient, id=id) + self.available = available + self.show = show + self.priority = priority + self.statuses = statuses or {} + + def _get_available(self): + return self.__dict__['available'] + def _set_available(self, available): + if available: + self.type = None + else: + self.type = 'unavailable' + self.__dict__['available'] = available + available = property(_get_available, _set_available) + del _get_available, _set_available + + @property + def status(self): + status = self.statuses.get(None) + if status is None: + try: + status = self.statuses.itervalues().next() + except StopIteration: + pass + return status + + def to_xml_element(self): + xml_element = super(PresenceStanza, self).to_xml_element() + if self.available: + if self.show is not None: + xml_element.addElement('show', content=self.show) + if self.priority != 0: + xml_element.addElement('priority', content=unicode(self.priority)) + for lang, text in self.statuses.iteritems(): + status = xml_element.addElement('status', content=text) + if lang: + status[(XML_NS, 'lang')] = lang + return xml_element + + +class SubscriptionPresence(PresenceStanza): + def __init__(self, sender, recipient, type, id=None): + super(SubscriptionPresence, self).__init__(sender, recipient, type=type, id=id) + + +class ProbePresence(PresenceStanza): + def __init__(self, sender, recipient, id=None): + super(ProbePresence, self).__init__(sender, recipient, type='probe', id=id) + + +class ErrorStanza(object): + """ + Stanza representing an error of another stanza. It's not a base stanza type on its own. + """ + + def __init__(self, stanza_type, sender, recipient, error_type, conditions, id=None): + self.stanza_type = stanza_type + self.sender = sender + self.recipient = recipient + self.id = id + self.conditions = conditions + self.error_type = error_type + + @classmethod + def from_stanza(cls, stanza, error_type, conditions): + # In error stanzas sender and recipient are swapped + return cls(stanza._stanza_type, stanza.recipient, stanza.sender, error_type, conditions, id=stanza.id) + + def to_xml_element(self): + xml_element = domish.Element((None, self.stanza_type)) + xml_element['from'] = self.sender.uri.as_string('xmpp') + xml_element['to'] = self.recipient.uri.as_string('xmpp') + xml_element['type'] = 'error' + if self.id is not None: + xml_element['id'] = self.id + error_element = domish.Element((None, 'error')) + error_element['type'] = self.error_type + [error_element.addChild(domish.Element((ns, condition))) for condition, ns in self.conditions] + xml_element.addChild(error_element) + return xml_element + + diff --git a/sylk/applications/xmppgateway/xmpp/subscription.py b/sylk/applications/xmppgateway/xmpp/subscription.py new file mode 100644 index 0000000..5821b48 --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/subscription.py @@ -0,0 +1,207 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +from application.notification import IObserver, NotificationCenter +from application.python import Null +from application.python.descriptor import WriteOnceAttribute +from application.python.types import Singleton +from eventlet import coros, proc +from sipsimple.util import TimestampedNotificationData +from zope.interface import implements + +from sylk.applications.xmppgateway.xmpp.stanzas import SubscriptionPresence, ProbePresence, AvailabilityPresence + +__all__ = ['XMPPSubscription', 'XMPPIncomingSubscription', 'XMPPSubscriptionManager'] + + +class XMPPSubscription(object): + local_identity = WriteOnceAttribute() + remote_identity = WriteOnceAttribute() + + def __init__(self, local_identity, remote_identity): + self.local_identity = local_identity + self.remote_identity = remote_identity + self.state = None + self.channel = coros.queue() + self._proc = None + from sylk.applications.xmppgateway.xmpp import XMPPManager + self.xmpp_manager = XMPPManager() + + def _set_state(self, new_state): + prev_state = self.__dict__.get('state', None) + self.__dict__['state'] = new_state + if prev_state != new_state: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPSubscriptionChangedState', sender=self, data=TimestampedNotificationData(prev_state=prev_state, state=new_state)) + def _get_state(self): + return self.__dict__['state'] + state = property(_get_state, _set_state) + del _get_state, _set_state + + def start(self): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPSubscriptionDidStart', sender=self, data=TimestampedNotificationData()) + self._proc = proc.spawn(self._run) + self.subscribe() + + def end(self): + if self.state == 'terminated': + return + self._proc.kill() + self._proc = None + notification_center = NotificationCenter() + notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + self.state = 'terminated' + + def subscribe(self): + self.state = 'subscribe_sent' + stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribe') + self.xmpp_manager.send_stanza(stanza) + # If we are already subscribed we may not receive an answer, send a probe just in case + self._send_probe() + + def unsubscribe(self): + self.state = 'unsubscribe_sent' + stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribe') + self.xmpp_manager.send_stanza(stanza) + + def _send_probe(self): + self.state = 'subscribe_sent' + stanza = ProbePresence(self.local_identity, self.remote_identity) + self.xmpp_manager.send_stanza(stanza) + + def _run(self): + notification_center = NotificationCenter() + while True: + item = self.channel.wait() + if isinstance(item, AvailabilityPresence): + if self.state == 'subscribe_sent': + self.state == 'active' + notification_center.post_notification('XMPPSubscriptionGotNotify', sender=self, data=TimestampedNotificationData(presence=item)) + elif isinstance(item, SubscriptionPresence): + if self.state == 'subscribe_sent' and item.type == 'subscribed': + self.state = 'active' + elif item.type == 'unsubscribed': + prev_state = self.state + self.state = 'terminated' + if prev_state in ('active', 'unsubscribe_sent'): + notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=TimestampedNotificationData()) + else: + notification_center.post_notification('XMPPSubscriptionDidFail', sender=self, data=TimestampedNotificationData()) + break + self._proc = None + + +class XMPPIncomingSubscription(object): + local_identity = WriteOnceAttribute() + remote_identity = WriteOnceAttribute() + + def __init__(self, local_identity, remote_identity): + self.local_identity = local_identity + self.remote_identity = remote_identity + self.state = None + self.channel = coros.queue() + self._proc = None + from sylk.applications.xmppgateway.xmpp import XMPPManager + self.xmpp_manager = XMPPManager() + + def _set_state(self, new_state): + prev_state = self.__dict__.get('state', None) + self.__dict__['state'] = new_state + if prev_state != new_state: + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionChangedState', sender=self, data=TimestampedNotificationData(prev_state=prev_state, state=new_state)) + def _get_state(self): + return self.__dict__['state'] + state = property(_get_state, _set_state) + del _get_state, _set_state + + def start(self): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionDidStart', sender=self, data=TimestampedNotificationData()) + self._proc = proc.spawn(self._run) + + def end(self): + if self.state == 'terminated': + return + self.state = 'terminated' + self._proc.kill() + self._proc = None + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + + def accept(self): + self.state = 'active' + stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribed') + self.xmpp_manager.send_stanza(stanza) + + def reject(self): + self.state = 'terminating' + stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribed') + self.xmpp_manager.send_stanza(stanza) + self.end() + + def send_presence(self, stanza): + self.xmpp_manager.send_stanza(stanza) + + def _run(self): + notification_center = NotificationCenter() + while True: + item = self.channel.wait() + if isinstance(item, SubscriptionPresence): + if item.type == 'subscribe': + notification_center.post_notification('XMPPIncomingSubscriptionGotSubscribe', sender=self, data=TimestampedNotificationData()) + elif item.type == 'unsubscribe': + self.state = 'terminated' + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionGotUnsubscribe', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + break + elif isinstance(item, ProbePresence): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingSubscriptionGotProbe', sender=self, data=TimestampedNotificationData()) + self._proc = None + + +class XMPPSubscriptionManager(object): + __metaclass__ = Singleton + implements(IObserver) + + def __init__(self): + self.incoming_subscriptions = {} + self.outgoing_subscriptions = {} + + def start(self): + notification_center = NotificationCenter() + notification_center.add_observer(self, name='XMPPSubscriptionDidStart') + notification_center.add_observer(self, name='XMPPSubscriptionDidEnd') + notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidStart') + notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidEnd') + + def stop(self): + notification_center = NotificationCenter() + notification_center.remove_observer(self, name='XMPPSubscriptionDidStart') + notification_center.remove_observer(self, name='XMPPSubscriptionDidEnd') + notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidStart') + notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidEnd') + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_XMPPSubscriptionDidStart(self, notification): + subscription = notification.sender + self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription + + def _NH_XMPPSubscriptionDidEnd(self, notification): + subscription = notification.sender + del self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] + + def _NH_XMPPIncomingSubscriptionDidStart(self, notification): + subscription = notification.sender + self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription + + def _NH_XMPPIncomingSubscriptionDidEnd(self, notification): + subscription = notification.sender + del self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] +