diff --git a/sylk/applications/xmppgateway/__init__.py b/sylk/applications/xmppgateway/__init__.py index 9013078..f027cea 100644 --- a/sylk/applications/xmppgateway/__init__.py +++ b/sylk/applications/xmppgateway/__init__.py @@ -1,352 +1,388 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os from application.notification import IObserver, NotificationCenter from application.python import Null from sipsimple.core import SIPURI from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.applications import ISylkApplication, SylkApplication, ApplicationLogger from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource from sylk.applications.xmppgateway.im import SIPMessageSender, SIPMessageError, ChatSessionHandler from sylk.applications.xmppgateway.presence import S2XPresenceHandler, X2SPresenceHandler +from sylk.applications.xmppgateway.muc import X2SMucHandler from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, NormalMessage log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) class XMPPGatewayApplication(object): __metaclass__ = SylkApplication implements(ISylkApplication, IObserver) __appname__ = 'xmppgateway' def __init__(self): self.xmpp_manager = XMPPManager() self.pending_sessions = {} self.chat_sessions = set() + self.s2x_muc_sessions = {} + self.x2s_muc_sessions = {} self.s2x_presence_subscriptions = {} self.x2s_presence_subscriptions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.xmpp_manager) self.xmpp_manager.start() def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.xmpp_manager) self.xmpp_manager.stop() def incoming_session(self, session): log.msg('New incoming session from %s' % session.remote_identity.uri) try: msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next() except StopIteration: session.reject(488, 'Only MSRP media is supported') return # Check domain if session.remote_identity.uri.host not in XMPPGatewayConfig.domains: session.reject(606, 'Not Acceptable') return # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) # Get URI representing the XMPP side request_uri = session._invitation.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): pass xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: pass else: # There is another pending session with same identifiers, can't accept this one session.reject(488) return sip_identity = Identity(sip_leg_uri, session.remote_identity.display_name) handler = ChatSessionHandler.new_from_sip_session(sip_identity, session) notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler if xmpp_leg_uri.resource is not None: # Incoming session target contained GRUU, so create XMPPChatSession immediately xmpp_session = XMPPChatSession(local_identity=handler.sip_identity, remote_identity=Identity(xmpp_leg_uri)) handler.xmpp_identity = xmpp_session.remote_identity handler.xmpp_session = xmpp_session def incoming_subscription(self, subscribe_request, data): if subscribe_request.event != 'presence': subscribe_request.reject(489) return # Check domain remote_identity_uri = data.headers['From'].uri if remote_identity_uri.host not in XMPPGatewayConfig.domains: subscribe_request.reject(606) return # Get URI representing the SIP side sip_leg_uri = FrozenURI(remote_identity_uri.user, remote_identity_uri.host) # Get URI representing the XMPP side request_uri = data.request_uri xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host) try: handler = self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: sip_identity = Identity(sip_leg_uri, data.headers['From'].display_name) xmpp_identity = Identity(xmpp_leg_uri) handler = S2XPresenceHandler(sip_identity, xmpp_identity) notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) handler.start() handler.add_sip_subscription(subscribe_request) def incoming_referral(self, refer_request, data): refer_request.reject(405) def incoming_sip_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null)[0] from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (content_type, from_header, to_header): message_request.answer(400) return log.msg('New incoming SIP MESSAGE from %s' % from_header.uri) # Check domain if from_header.uri.host not in XMPPGatewayConfig.domains: message_request.answer(606) return if content_type == 'message/cpim': try: cpim_message = CPIMMessage.parse(data.body) except CPIMParserError: message_request.answer(400) return else: body = cpim_message.body content_type = cpim_message.content_type sender = cpim_message.sender or from_header from_uri = sender.uri else: body = data.body from_uri = from_header.uri to_uri = str(to_header.uri) message_request.answer(200) if from_uri.parameters.get('gr', None) is None: from_uri = SIPURI.new(from_uri) from_uri.parameters['gr'] = generate_sylk_resource() sender = Identity(FrozenURI.parse(from_uri)) recipient = Identity(FrozenURI.parse(to_uri)) if content_type in ('text/plain', 'text/html'): if XMPPGatewayConfig.use_msrp_for_chat: message = NormalMessage(sender, recipient, body, content_type, use_receipt=False) self.xmpp_manager.send_stanza(message) else: message = ChatMessage(sender, recipient, body, content_type, use_receipt=False) self.xmpp_manager.send_stanza(message) elif content_type == IsComposingDocument.content_type: if not XMPPGatewayConfig.use_msrp_for_chat: try: msg = IsComposingMessage.parse(body) except ParserError: pass else: state = 'composing' if msg.state == 'active' else 'paused' message = ChatComposingIndication(sender, recipient, state, use_receipt=False) self.xmpp_manager.send_stanza(message) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Out of band XMPP stanza handling @run_in_green_thread def _NH_XMPPGotChatMessage(self, notification): # This notification is only processed here untill the ChatSessionHandler # has both (SIP and XMPP) sessions established message = notification.data.message sender = message.sender recipient = message.recipient if XMPPGatewayConfig.use_msrp_for_chat: if recipient.uri.resource is None: # If recipient resource is not set the session is started from # the XMPP side sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Not found, need to create a new handler and a outgoing SIP session xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) handler.xmpp_message_queue.append(message) else: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI(sender.uri.user, sender.uri.host) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI(recipient.uri.user, recipient.uri.host) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # It's a new XMPP session to a full JID, disregard the full JID and start a new SIP session to the bare JID xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) handler.xmpp_message_queue.append(message) else: # Found handle, create XMPP session and establish session session = XMPPChatSession(local_identity=recipient, remote_identity=sender) handler.xmpp_message_queue.append(message) handler.xmpp_identity = session.remote_identity handler.xmpp_session = session else: sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP MESSAGE: %s' % e) @run_in_green_thread def _NH_XMPPGotNormalMessage(self, notification): message = notification.data.message sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP MESSAGE: %s' % e) @run_in_green_thread def _NH_XMPPGotComposingIndication(self, notification): composing_indication = notification.data.composing_indication sender = composing_indication.sender recipient = composing_indication.recipient if not XMPPGatewayConfig.use_msrp_for_chat: state = 'active' if composing_indication.state == 'composing' else 'idle' body = IsComposingMessage(state=state, refresh=composing_indication.interval or 30).toxml() message = NormalMessage(sender, recipient, body, IsComposingDocument.content_type) sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP MESSAGE: %s' % e) def _NH_XMPPGotPresenceSubscriptionRequest(self, notification): stanza = notification.data.stanza # Disregard the resource part, the presence request could be a probe instead of a subscribe sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: handler = self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] except KeyError: xmpp_identity = stanza.sender xmpp_identity.uri = sender_uri_bare sip_identity = stanza.recipient handler = X2SPresenceHandler(sip_identity, xmpp_identity) notification_center = NotificationCenter() notification_center.add_observer(self, sender=handler) handler.start() + def _NH_XMPPGotMucJoinRequest(self, notification): + stanza = notification.data.stanza + muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) + nickname = stanza.recipient.uri.resource + try: + handler = self.x2s_muc_sessions[(stanza.sender.uri, muc_uri)] + except KeyError: + xmpp_identity = stanza.sender + sip_identity = stanza.recipient + sip_identity.uri = muc_uri + handler = X2SMucHandler(sip_identity, xmpp_identity, nickname) + handler._first_stanza = stanza + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=handler) + handler.start() + + def _NH_XMPPGotMucLeaveRequest(self, notification): + # TODO: give error? + pass + # Chat session handling def _NH_ChatSessionDidStart(self, notification): handler = notification.sender log.msg('Chat session established sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) for k,v in self.pending_sessions.items(): if v is handler: del self.pending_sessions[k] break self.chat_sessions.add(handler) def _NH_ChatSessionDidEnd(self, notification): handler = notification.sender log.msg('Chat session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.chat_sessions.remove(handler) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=handler) def _NH_ChatSessionDidFail(self, notification): handler = notification.sender uris = None for k,v in self.pending_sessions.items(): if v is handler: uris = k del self.pending_sessions[k] break sip_uri, xmpp_uri = uris log.msg('Chat session failed sip:%s <--> xmpp:%s' % (sip_uri, xmpp_uri)) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=handler) # Presence handling def _NH_S2XPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence session established sip:%s --> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.s2x_presence_subscriptions[(handler.sip_identity.uri, handler.xmpp_identity.uri)] = handler def _NH_S2XPresenceHandlerDidEnd(self, notification): handler = notification.sender log.msg('Presence session ended sip:%s --> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.s2x_presence_subscriptions.pop((handler.sip_identity.uri, handler.xmpp_identity.uri), None) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=handler) def _NH_X2SPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_presence_subscriptions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler def _NH_X2SPresenceHandlerDidEnd(self, notification): handler = notification.sender log.msg('Presence session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_presence_subscriptions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=handler) + # MUC handling + + def _NH_X2SMucHandlerDidStart(self, notification): + handler = notification.sender + log.msg('MUC session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) + self.x2s_muc_sessions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler + + def _NH_X2SMucHandlerDidEnd(self, notification): + handler = notification.sender + log.msg('MUC session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) + self.x2s_muc_sessions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=handler) diff --git a/sylk/applications/xmppgateway/configuration.py b/sylk/applications/xmppgateway/configuration.py index 0e0ee4d..0356901 100644 --- a/sylk/applications/xmppgateway/configuration.py +++ b/sylk/applications/xmppgateway/configuration.py @@ -1,22 +1,23 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.system import host from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import StringList from sipsimple.configuration.datatypes import NonNegativeInteger from sylk.configuration.datatypes import IPAddress, Port class XMPPGatewayConfig(ConfigSection): __cfgfile__ = 'xmppgateway.ini' __section__ = 'general' local_ip = ConfigSetting(type=IPAddress, value=host.default_ip) local_port = ConfigSetting(type=Port, value=5269) trace_xmpp = False domains = ConfigSetting(type=StringList, value=[]) + muc_prefix = 'conference' sip_session_timeout = ConfigSetting(type=NonNegativeInteger, value=600) use_msrp_for_chat = True diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py new file mode 100644 index 0000000..1f1a5a4 --- /dev/null +++ b/sylk/applications/xmppgateway/muc.py @@ -0,0 +1,226 @@ +# Copyright (C) 2012 AG Projects. See LICENSE for details +# + +import os +import uuid + +from application.notification import IObserver, NotificationCenter +from application.python import Null +from application.python.descriptor import WriteOnceAttribute +from sipsimple.account import AccountManager +from sipsimple.configuration.settings import SIPSimpleSettings +from sipsimple.core import SIPURI +from sipsimple.core import ContactHeader, FromHeader, ToHeader +from sipsimple.lookup import DNSLookup, DNSLookupError +from sipsimple.streams.applications.chat import CPIMIdentity +from sipsimple.threading.green import run_in_green_thread +from sipsimple.util import TimestampedNotificationData +from zope.interface import implements + +from sylk.applications import ApplicationLogger +from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource +from sylk.applications.xmppgateway.xmpp import XMPPManager +from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession +from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, STANZAS_NS +from sylk.extensions import ChatStream +from sylk.session import ServerSession + +log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) + + +class X2SMucHandler(object): + implements(IObserver) + + sip_identity = WriteOnceAttribute() + xmpp_identity = WriteOnceAttribute() + + def __init__(self, sip_identity, xmpp_identity, nickname): + self.sip_identity = sip_identity + self.xmpp_identity = xmpp_identity + self.nickname = nickname + self._xmpp_muc_session = None + self._sip_session = None + self._msrp_stream = None + self._first_stanza = None + self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza + self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza + self._participants = set() # set of (URI, nickname) tuples + self.ended = False + + def start(self): + notification_center = NotificationCenter() + self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) + notification_center.add_observer(self, sender=self._xmpp_muc_session) + self._xmpp_muc_session.start() + notification_center.post_notification('X2SMucHandlerDidStart', sender=self, data=TimestampedNotificationData()) + self._start_sip_session() + + def end(self): + if self.ended: + return + notification_center = NotificationCenter() + if self._xmpp_muc_session is not None: + notification_center.remove_observer(self, sender=self._xmpp_muc_session) + self._xmpp_muc_session.end() + self._xmpp_muc_session = None + if self._sip_session is not None: + notification_center.remove_observer(self, sender=self._sip_session) + self._sip_session.end() + self._sip_session = None + self.ended = True + notification_center.post_notification('X2SMucHandlerDidEnd', sender=self, data=TimestampedNotificationData()) + + @run_in_green_thread + def _start_sip_session(self): + notification_center = NotificationCenter() + # self.xmpp_identity is our local identity + from_uri = self.xmpp_identity.uri.as_sip_uri() + del from_uri.parameters['gr'] # no GRUU in From header + contact_uri = self.xmpp_identity.uri.as_sip_uri() + contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) + to_uri = self.sip_identity.uri.as_sip_uri() + lookup = DNSLookup() + settings = SIPSimpleSettings() + account = AccountManager().default_account + if account.sip.outbound_proxy is not None: + uri = SIPURI(host=account.sip.outbound_proxy.host, + port=account.sip.outbound_proxy.port, + parameters={'transport': account.sip.outbound_proxy.transport}) + else: + uri = to_uri + try: + routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() + except DNSLookupError: + log.warning('DNS lookup error while looking for %s proxy' % uri) + notification_center.post_notification('ChatSessionDidFail', sender=self, data=TimestampedNotificationData()) + return + self._msrp_stream = ChatStream(account) + route = routes.pop(0) + from_header = FromHeader(from_uri) + to_header = ToHeader(to_uri) + contact_header = ContactHeader(contact_uri) + self._sip_session = ServerSession(account) + notification_center.add_observer(self, sender=self._sip_session) + notification_center.add_observer(self, sender=self._msrp_stream) + self._sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self._msrp_stream]) + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_SIPSessionDidStart(self, notification): + log.msg("SIP session (MUC) %s started" % notification.sender._invitation.call_id) + if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed: + self.end() + return + message_id = self._msrp_stream.set_local_nickname(self.nickname) + self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza) + self._first_stanza = None + + def _NH_SIPSessionDidEnd(self, notification): + log.msg("SIP session (MUC) %s ended" % notification.sender._invitation.call_id) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self._sip_session) + notification_center.remove_observer(self, sender=self._msrp_stream) + self._sip_session = None + self._msrp_stream = None + self.end() + + def _NH_SIPSessionDidFail(self, notification): + log.msg("SIP session (MUC) %s failed" % notification.sender._invitation.call_id) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self._sip_session) + notification_center.remove_observer(self, sender=self._msrp_stream) + self._sip_session = None + self._msrp_stream = None + self.end() + + def _NH_SIPSessionGotProposal(self, notification): + self._sip_session.reject_proposal() + + def _NH_SIPSessionTransferNewIncoming(self, notification): + self._sip_session.reject_transfer(403) + + def _NH_SIPSessionGotConferenceInfo(self, notification): + # Translate to XMPP payload + xmpp_manager = XMPPManager() + own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host) + conference_info = notification.data.conference_info + new_participants = set() + for user in conference_info.users: + user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity) + nickname = user.display_text.value if user.display_text else user.entity + new_participants.add((user_uri, nickname)) + if user_uri == own_uri: + continue + sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) + stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) + stanza.jid = Identity(user_uri) + xmpp_manager.send_muc_stanza(stanza) + # Remove participants that are no longer in the room + for uri, nickname in self._participants - new_participants: + sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) + stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) + xmpp_manager.send_muc_stanza(stanza) + self._participants = new_participants + # Send own status last + sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) + stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) + stanza.jid = self.xmpp_identity + stanza.muc_statuses.append('110') + xmpp_manager.send_muc_stanza(stanza) + + def _NH_ChatStreamGotMessage(self, notification): + # Notification is sent by the MSRP stream + message = notification.data.message + content_type = message.content_type.lower() + if content_type in ('text/plain', 'text/html'): + resource = message.sender.display_name or str(message.sender.uri) + sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource)) + self._xmpp_muc_session.send_message(sender, message.body, message.content_type, message_id=str(uuid.uuid4())) + self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') + + def _NH_ChatStreamDidSetNickname(self, notification): + # Notification is sent by the MSRP stream + nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) + self.nickname = nickname + + def _NH_ChatStreamDidNotSetNickname(self, notification): + # Notification is sent by the MSRP stream + nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) + error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', ['conflict', STANZAS_NS]) + xmpp_manager = XMPPManager() + xmpp_manager.send_muc_stanza(error_stanza) + self.end() + + def _NH_ChatStreamDidDeliverMessage(self, notification): + # Echo back the message to the sender + stanza = self._pending_messages_map.pop(notification.data.message_id) + stanza.sender, stanza.recipient = stanza.recipient, stanza.sender + stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname) + xmpp_manager = XMPPManager() + xmpp_manager.send_muc_stanza(stanza) + + def _NH_ChatStreamDidNotDeliverMessage(self, notification): + self._pending_messages_map.pop(notification.data.message_id) + + def _NH_XMPPIncomingMucSessionDidEnd(self, notification): + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self._xmpp_muc_session) + self._xmpp_muc_session = None + self.end() + + def _NH_XMPPIncomingMucSessionGotMessage(self, notification): + message = notification.data.message + sender_uri = message.sender.uri.as_sip_uri() + del sender_uri.parameters['gr'] # no GRUU in CPIM From header + sender = CPIMIdentity(sender_uri, display_name=self.nickname) + message_id = self._msrp_stream.send_message(message.body, message.content_type, local_identity=sender) + self._pending_messages_map[message_id] = message + # Message will be echoed back to the sender on ChatStreamDidDeliverMessage + + def _NH_XMPPIncomingMucSessionChangedNickname(self, notification): + nickname = notification.data.nickname + message_id = self._msrp_stream.set_local_nickname(nickname) + self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza) + diff --git a/sylk/applications/xmppgateway/xmpp/__init__.py b/sylk/applications/xmppgateway/xmpp/__init__.py index 23108e2..4b5c6e7 100644 --- a/sylk/applications/xmppgateway/xmpp/__init__.py +++ b/sylk/applications/xmppgateway/xmpp/__init__.py @@ -1,254 +1,304 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.types import Singleton from datetime import datetime from sipsimple.util import Timestamp, TimestampedNotificationData from twisted.internet import reactor from twisted.words.protocols.jabber.jid import internJID as JID from wokkel.component import InternalComponent, Router as _Router from wokkel.server import ServerService, XMPPS2SServerFactory, DeferredS2SClientFactory from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import FrozenURI from sylk.applications.xmppgateway.logger import Logger -from sylk.applications.xmppgateway.xmpp.protocols import MessageProtocol, PresenceProtocol -from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager +from sylk.applications.xmppgateway.xmpp.protocols import MessageProtocol, MUCProtocol, PresenceProtocol +from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager, XMPPMucSessionManager from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscriptionManager log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) xmpp_logger = Logger() # Utility classes class Router(_Router): def route(self, stanza): """ Route a stanza. (subclassed to avoid vebose logging) @param stanza: The stanza to be routed. @type stanza: L{domish.Element}. """ destination = JID(stanza['to']) if destination.host in self.routes: self.routes[destination.host].send(stanza) else: self.routes[None].send(stanza) class XMPPS2SServerFactory(XMPPS2SServerFactory): def onConnectionMade(self, xs): super(self.__class__, self).onConnectionMade(xs) def logDataIn(buf): buf = buf.strip() if buf: xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) def logDataOut(buf): buf = buf.strip() if buf: xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) if XMPPGatewayConfig.trace_xmpp: xs.rawDataInFn = logDataIn xs.rawDataOutFn = logDataOut class DeferredS2SClientFactory(DeferredS2SClientFactory): def onConnectionMade(self, xs): super(self.__class__, self).onConnectionMade(xs) def logDataIn(buf): buf = buf.strip() if buf: xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) def logDataOut(buf): buf = buf.strip() if buf: xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) if XMPPGatewayConfig.trace_xmpp: xs.rawDataInFn = logDataIn xs.rawDataOutFn = logDataOut # Patch Wokkel's DeferredS2SClientFactory to use our logger import wokkel.server wokkel.server.DeferredS2SClientFactory = DeferredS2SClientFactory del wokkel.server # Manager class XMPPManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): config = XMPPGatewayConfig self.stopped = False + self.domains = config.domains + self.muc_domains = ['%s.%s' % (config.muc_prefix, domain) for domain in self.domains] + router = Router() self._server_service = ServerService(router) - self._server_service.domains = set(config.domains) + self._server_service.domains = set(self.domains+self.muc_domains) self._server_service.logTraffic = False # done manually self._s2s_factory = XMPPS2SServerFactory(self._server_service) self._s2s_factory.logTraffic = False # done manually self._internal_component = InternalComponent(router) - self._internal_component.domains = set(config.domains) + self._internal_component.domains = set(self.domains) self._message_protocol = MessageProtocol() self._message_protocol.setHandlerParent(self._internal_component) self._presence_protocol = PresenceProtocol() self._presence_protocol.setHandlerParent(self._internal_component) + self._muc_component = InternalComponent(router) + self._muc_component.domains = set(self.muc_domains) + + self._muc_protocol = MUCProtocol() + self._muc_protocol.setHandlerParent(self._muc_component) + self._s2s_listener = None self.chat_session_manager = XMPPChatSessionManager() + self.muc_session_manager = XMPPMucSessionManager() self.subscription_manager = XMPPSubscriptionManager() def start(self): self.stopped = False xmpp_logger.start() config = XMPPGatewayConfig self._s2s_listener = reactor.listenTCP(config.local_port, self._s2s_factory, interface=config.local_ip) listen_address = self._s2s_listener.getHost() log.msg("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port)) self.chat_session_manager.start() + self.muc_session_manager.start() self.subscription_manager.start() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._internal_component) + notification_center.add_observer(self, sender=self._muc_component) self._internal_component.startService() + self._muc_component.startService() def stop(self): self.stopped = True self._s2s_listener.stopListening() self.subscription_manager.stop() + self.muc_session_manager.stop() self.chat_session_manager.stop() self._internal_component.stopService() + self._muc_component.stopService() notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._internal_component) + notification_center.remove_observer(self, sender=self._muc_component) xmpp_logger.stop() def send_stanza(self, stanza): if self.stopped: return self._internal_component.send(stanza.to_xml_element()) + def send_muc_stanza(self, stanza): + if self.stopped: + return + self._muc_component.send(stanza.to_xml_element()) + def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Process message stanzas def _NH_XMPPGotChatMessage(self, notification): message = notification.data.message try: session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data) else: session.channel.send(message) def _NH_XMPPGotNormalMessage(self, notification): notification_center = NotificationCenter() notification_center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data) def _NH_XMPPGotComposingIndication(self, notification): notification_center = NotificationCenter() composing_indication = notification.data.composing_indication try: session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)] except KeyError: notification_center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data) else: session.channel.send(composing_indication) def _NH_XMPPGotErrorMessage(self, notification): error_message = notification.data.error_message try: session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data) else: session.channel.send(error_message) def _NH_XMPPGotReceipt(self, notification): receipt = notification.data.receipt try: session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)] except KeyError: pass else: session.channel.send(receipt) # Process presence stanzas def _NH_XMPPGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: # Ignore incoming presence stanzas if there is no subscription pass else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceSubscriptionStatus(self, notification): stanza = notification.data.presence_stanza if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None: # Skip directed presence return if stanza.type in ('subscribed', 'unsubscribed'): try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: pass else: subscription.channel.send(stanza) elif stanza.type in ('subscribe', 'unsubscribe'): try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: if stanza.type == 'subscribe': notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceProbe(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) else: subscription.channel.send(stanza) + # Process muc stanzas + + def _NH_XMPPMucGotGroupChat(self, notification): + message = notification.data.message + muc_uri = FrozenURI(message.recipient.uri.user, message.recipient.uri.host) + try: + session = self.muc_session_manager.incoming[(muc_uri, message.sender.uri)] + except KeyError: + # Ignore groupchat messages if there was no session created + pass + else: + session.channel.send(message) + + def _NH_XMPPMucGotPresenceAvailability(self, notification): + stanza = notification.data.presence_stanza + if not stanza.sender.uri.resource: + return + muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) + try: + session = self.muc_session_manager.incoming[(muc_uri, stanza.sender.uri)] + except KeyError: + notification_center = NotificationCenter() + if stanza.available: + notification_center.post_notification('XMPPGotMucJoinRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + else: + notification_center.post_notification('XMPPGotMucLeaveRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + else: + session.channel.send(stanza) + diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py index 708771c..e5cbaa3 100644 --- a/sylk/applications/xmppgateway/xmpp/protocols.py +++ b/sylk/applications/xmppgateway/xmpp/protocols.py @@ -1,133 +1,193 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import NotificationCenter from sipsimple.util import TimestampedNotificationData -from wokkel.xmppim import MessageProtocol, PresenceProtocol +from wokkel.muc import UserPresence +from wokkel.xmppim import BasePresenceProtocol, MessageProtocol, PresenceProtocol from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI -from sylk.applications.xmppgateway.xmpp.stanzas import NormalMessage, MessageReceipt, ChatMessage, \ - ChatComposingIndication, ErrorStanza, AvailabilityPresence, SubscriptionPresence, ProbePresence -from sylk.applications.xmppgateway.xmpp.stanzas import RECEIPTS_NS, CHATSTATES_NS +from sylk.applications.xmppgateway.xmpp.stanzas import RECEIPTS_NS, CHATSTATES_NS, ErrorStanza, \ + NormalMessage, MessageReceipt, ChatMessage, ChatComposingIndication, \ + AvailabilityPresence, SubscriptionPresence, ProbePresence, \ + MUCAvailabilityPresence, GroupChatMessage \ +__all__ = ['MessageProtocol', 'MUCProtocol', 'PresenceProtocol'] class MessageProtocol(MessageProtocol): messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType not in self.messageTypes: message["type"] = 'normal' self.onMessage(message) def onMessage(self, msg): notification_center = NotificationCenter() sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) type = msg.getAttribute('type') if type == 'error': error_type = msg.error['type'] conditions = [(child.name, child.defaultUri) for child in msg.error.children] error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg.getAttribute('id', None)) notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=TimestampedNotificationData(error_message=error_message)) return if type in (None, 'normal', 'chat') and msg.body is not None: if msg.html is not None: content_type = 'text/html' body = msg.html.toXml() else: content_type = 'text/plain' body = unicode(msg.body) use_receipt = msg.request is not None and msg.request.defaultUri == RECEIPTS_NS if type == 'chat': message = ChatMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) else: message = NormalMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) return if type == 'chat' and msg.body is None: # Check if it's a composing indication for state in ('active', 'inactive', 'composing', 'paused', 'gone'): state_obj = getattr(msg, state, None) if state_obj is not None and state_obj.defaultUri == CHATSTATES_NS: composing_indication = ChatComposingIndication(sender, recipient, state, id=msg.getAttribute('id', None)) notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=TimestampedNotificationData(composing_indication=composing_indication)) return # Check if it's a receipt acknowledgement if msg.body is None and msg.received is not None and msg.received.defaultUri == RECEIPTS_NS: receipt_id = msg.getAttribute('id', None) if receipt_id is not None: receipt = MessageReceipt(sender, recipient, receipt_id) notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=TimestampedNotificationData(receipt=receipt)) class PresenceProtocol(PresenceProtocol): - def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') show = stanza.show statuses = stanza.statuses presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) def _process_subscription_stanza(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') type = stanza.element.getAttribute('type') presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) def subscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def subscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def probeReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = ProbePresence(sender, recipient, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceProbe', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) +class MUCProtocol(BasePresenceProtocol): + messageTypes = None, 'normal', 'chat', 'groupchat' + + presenceTypeParserMap = {'available': UserPresence, + 'unavailable': UserPresence} + + def connectionInitialized(self): + BasePresenceProtocol.connectionInitialized(self) + self.xmlstream.addObserver('/message', self._onMessage) + + def _onMessage(self, message): + if message.handled: + return + messageType = message.getAttribute("type") + if messageType == 'error': + return + if messageType not in self.messageTypes: + message['type'] = 'normal' + if messageType == 'groupchat': + self.onGroupChat(message) + else: + # TODO: give error, private messages not supported + pass + + def onGroupChat(self, msg): + sender_uri = FrozenURI.parse('xmpp:'+msg['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) + recipient = Identity(recipient_uri) + if msg.html is not None: + content_type = 'text/html' + body = msg.html.toXml() + else: + content_type = 'text/plain' + body = unicode(msg.body) + message = GroupChatMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=False) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPMucGotGroupChat', sender=self.parent, data=TimestampedNotificationData(message=message)) + + def availableReceived(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + presence_stanza = MUCAvailabilityPresence(sender, recipient, available=True, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + + def unavailableReceived(self, stanza): + sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) + sender = Identity(sender_uri) + recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) + recipient = Identity(recipient_uri) + id = stanza.element.getAttribute('id') + presence_stanza = MUCAvailabilityPresence(sender, recipient, available=False, id=id) + notification_center = NotificationCenter() + notification_center.post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) diff --git a/sylk/applications/xmppgateway/xmpp/session.py b/sylk/applications/xmppgateway/xmpp/session.py index e80eb09..2339c3e 100644 --- a/sylk/applications/xmppgateway/xmpp/session.py +++ b/sylk/applications/xmppgateway/xmpp/session.py @@ -1,141 +1,224 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.descriptor import WriteOnceAttribute from application.python.types import Singleton from eventlet import coros, proc from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from zope.interface import implements -from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, MessageReceipt, ErrorStanza +from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, MessageReceipt, ErrorStanza, GroupChatMessage, MUCAvailabilityPresence -__all__ = ['XMPPChatSession', 'XMPPChatSessionManager'] +__all__ = ['XMPPChatSession', 'XMPPChatSessionManager', 'XMPPIncomingMucSession', 'XMPPMucSessionManager'] +# Chat sessions + class XMPPChatSession(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.pending_receipts = {} self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def start(self): notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidStart', sender=self, data=TimestampedNotificationData()) self._proc = proc.spawn(self._run) self.state = 'started' def end(self): self.send_composing_indication('gone') self._clear_pending_receipts() self._proc.kill() self._proc = None notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) self.state = 'terminated' def send_message(self, body, content_type='text/plain', message_id=None, use_receipt=True): message = ChatMessage(self.local_identity, self.remote_identity, body, content_type, id=message_id, use_receipt=use_receipt) self.xmpp_manager.send_stanza(message) if message_id is not None: timer = reactor.callLater(30, self._receipt_timer_expired, message_id) self.pending_receipts[message_id] = timer notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) def send_composing_indication(self, state, message_id=None, use_receipt=False): message = ChatComposingIndication(self.local_identity, self.remote_identity, state, id=message_id, use_receipt=use_receipt) self.xmpp_manager.send_stanza(message) if message_id is not None: timer = reactor.callLater(30, self._receipt_timer_expired, message_id) self.pending_receipts[message_id] = timer notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) def send_receipt_acknowledgement(self, receipt_id): message = MessageReceipt(self.local_identity, self.remote_identity, receipt_id) self.xmpp_manager.send_stanza(message) def send_error(self, stanza, error_type, conditions): message = ErrorStanza.from_stanza(stanza, error_type, conditions) self.xmpp_manager.send_stanza(message) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, ChatMessage): notification_center.post_notification('XMPPChatSessionGotMessage', sender=self, data=TimestampedNotificationData(message=item)) elif isinstance(item, ChatComposingIndication): if item.state == 'gone': self._clear_pending_receipts() notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='remote')) self.state = 'terminated' break else: notification_center.post_notification('XMPPChatSessionGotComposingIndication', sender=self, data=TimestampedNotificationData(message=item)) elif isinstance(item, MessageReceipt): if item.receipt_id in self.pending_receipts: timer = self.pending_receipts.pop(item.receipt_id) timer.cancel() notification_center.post_notification('XMPPChatSessionDidDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.receipt_id)) elif isinstance(item, ErrorStanza): if item.id in self.pending_receipts: timer = self.pending_receipts.pop(item.id) timer.cancel() # TODO: translate cause notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.id, code=503, reason='Service Unavailable')) self._proc = None def _receipt_timer_expired(self, message_id): self.pending_receipts.pop(message_id) notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) def _clear_pending_receipts(self): notification_center = NotificationCenter() while self.pending_receipts: message_id, timer = self.pending_receipts.popitem() timer.cancel() notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) class XMPPChatSessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPChatSessionDidStart') notification_center.add_observer(self, name='XMPPChatSessionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPChatSessionDidStart') notification_center.remove_observer(self, name='XMPPChatSessionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPChatSessionDidStart(self, notification): session = notification.sender self.sessions[(session.local_identity.uri, session.remote_identity.uri)] = session def _NH_XMPPChatSessionDidEnd(self, notification): session = notification.sender del self.sessions[(session.local_identity.uri, session.remote_identity.uri)] + +# MUC sessions + +class XMPPIncomingMucSession(object): + local_identity = WriteOnceAttribute() + remote_identity = WriteOnceAttribute() + + def __init__(self, local_identity, remote_identity): + self.local_identity = local_identity + self.remote_identity = remote_identity + self.state = None + self.channel = coros.queue() + self._proc = None + from sylk.applications.xmppgateway.xmpp import XMPPManager + self.xmpp_manager = XMPPManager() + + def start(self): + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingMucSessionDidStart', sender=self, data=TimestampedNotificationData()) + self._proc = proc.spawn(self._run) + self.state = 'started' + + def end(self): + self._proc.kill() + self._proc = None + notification_center = NotificationCenter() + notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + self.state = 'terminated' + + def send_message(self, sender, body, content_type='text/plain', message_id=None): + # TODO: timestamp? + message = GroupChatMessage(sender, self.remote_identity, body, content_type, id=message_id) + self.xmpp_manager.send_muc_stanza(message) + + def _run(self): + notification_center = NotificationCenter() + while True: + item = self.channel.wait() + if isinstance(item, GroupChatMessage): + notification_center.post_notification('XMPPIncomingMucSessionGotMessage', sender=self, data=TimestampedNotificationData(message=item)) + elif isinstance(item, MUCAvailabilityPresence): + if item.available: + nickname = item.recipient.uri.resource + notification_center.post_notification('XMPPIncomingMucSessionChangedNickname', sender=self, data=TimestampedNotificationData(stanza=item, nickname=nickname)) + else: + notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + self.state = 'terminated' + break + self._proc = None + + +class XMPPMucSessionManager(object): + __metaclass__ = Singleton + implements(IObserver) + + def __init__(self): + self.incoming = {} + self.outgoing = {} + + def start(self): + notification_center = NotificationCenter() + notification_center.add_observer(self, name='XMPPIncomingMucSessionDidStart') + notification_center.add_observer(self, name='XMPPIncomingMucSessionDidEnd') + + def stop(self): + notification_center = NotificationCenter() + notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidStart') + notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidEnd') + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_XMPPIncomingMucSessionDidStart(self, notification): + muc = notification.sender + self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)] = muc + + def _NH_XMPPIncomingMucSessionDidEnd(self, notification): + muc = notification.sender + del self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)] + diff --git a/sylk/applications/xmppgateway/xmpp/stanzas.py b/sylk/applications/xmppgateway/xmpp/stanzas.py index 37a8377..b30e4f7 100644 --- a/sylk/applications/xmppgateway/xmpp/stanzas.py +++ b/sylk/applications/xmppgateway/xmpp/stanzas.py @@ -1,189 +1,241 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from twisted.words.xish import domish CHATSTATES_NS = 'http://jabber.org/protocol/chatstates' RECEIPTS_NS = 'urn:xmpp:receipts' STANZAS_NS = 'urn:ietf:params:xml:ns:xmpp-stanzas' XML_NS = 'http://www.w3.org/XML/1998/namespace' +MUC_NS = 'http://jabber.org/protocol/muc' +MUC_USER_NS = MUC_NS + '#user' +MUC_ADMIN_NS = MUC_NS + '#admin' +MUC_OWNER_NS = MUC_NS + '#owner' +MUC_ROOMINFO_NS = MUC_NS + '#roominfo' +MUC_CONFIG_NS = MUC_NS + '#roomconfig' +MUC_REQUEST_NS = MUC_NS + '#request' +MUC_REGISTER_NS = MUC_NS + '#register' +# TODO: review ^^ class BaseStanza(object): stanza_type = None # to be defined by subclasses type = None def __init__(self, sender, recipient, id=None): self.sender = sender self.recipient = recipient self.id = id def to_xml_element(self): xml_element = domish.Element((None, self.stanza_type)) xml_element['from'] = self.sender.uri.as_string('xmpp') xml_element['to'] = self.recipient.uri.as_string('xmpp') if self.type: xml_element['type'] = self.type if self.id is not None: xml_element['id'] = self.id return xml_element +class ErrorStanza(object): + """ + Stanza representing an error of another stanza. It's not a base stanza type on its own. + """ + + def __init__(self, stanza_type, sender, recipient, error_type, conditions, id=None): + self.stanza_type = stanza_type + self.sender = sender + self.recipient = recipient + self.id = id + self.conditions = conditions + self.error_type = error_type + + @classmethod + def from_stanza(cls, stanza, error_type, conditions): + # In error stanzas sender and recipient are swapped + return cls(stanza.stanza_type, stanza.recipient, stanza.sender, error_type, conditions, id=stanza.id) + + def to_xml_element(self): + xml_element = domish.Element((None, self.stanza_type)) + xml_element['from'] = self.sender.uri.as_string('xmpp') + xml_element['to'] = self.recipient.uri.as_string('xmpp') + xml_element['type'] = 'error' + if self.id is not None: + xml_element['id'] = self.id + error_element = domish.Element((None, 'error')) + error_element['type'] = self.error_type + [error_element.addChild(domish.Element((ns, condition))) for condition, ns in self.conditions] + xml_element.addChild(error_element) + return xml_element + + class BaseMessageStanza(BaseStanza): stanza_type = 'message' def __init__(self, sender, recipient, id=None, use_receipt=False): super(BaseMessageStanza, self).__init__(sender, recipient, id=id) self.use_receipt = use_receipt def to_xml_element(self): xml_element = super(BaseMessageStanza, self).to_xml_element() if self.id is not None and self.recipient.uri.resource is not None and self.use_receipt: xml_element.addElement('request', defaultUri=RECEIPTS_NS) return xml_element class NormalMessage(BaseMessageStanza): def __init__(self, sender, recipient, body, content_type='text/plain', id=None, use_receipt=False): super(NormalMessage, self).__init__(sender, recipient, id=id, use_receipt=use_receipt) self.body = body self.content_type = content_type def to_xml_element(self): xml_element = super(NormalMessage, self).to_xml_element() xml_element.addElement('body', content=self.body) # TODO: what if content type is text/html ? return xml_element class ChatMessage(BaseMessageStanza): type = 'chat' def __init__(self, sender, recipient, body, content_type='text/plain', id=None, use_receipt=True): super(ChatMessage, self).__init__(sender, recipient, id=id, use_receipt=use_receipt) self.body = body self.content_type = content_type def to_xml_element(self): xml_element = super(ChatMessage, self).to_xml_element() xml_element.addElement('active', defaultUri=CHATSTATES_NS) xml_element.addElement('body', content=self.body) # TODO: what if content type is text/html ? return xml_element class ChatComposingIndication(BaseMessageStanza): type = 'chat' def __init__(self, sender, recipient, state, id=None, use_receipt=False): super(ChatComposingIndication, self).__init__(sender, recipient, id=id, use_receipt=use_receipt) self.state = state def to_xml_element(self): xml_element = super(ChatComposingIndication, self).to_xml_element() xml_element.addElement(self.state, defaultUri=CHATSTATES_NS) return xml_element +class GroupChatMessage(BaseMessageStanza): + type = 'groupchat' + + def __init__(self, sender, recipient, body, content_type='text/plain', id=None, use_receipt=True): + super(GroupChatMessage, self).__init__(sender, recipient, id=id, use_receipt=use_receipt) + self.body = body + self.content_type = content_type + + def to_xml_element(self): + xml_element = super(GroupChatMessage, self).to_xml_element() + xml_element.addElement('body', content=self.body) # TODO: what if content type is text/html ? + return xml_element + + class MessageReceipt(BaseMessageStanza): def __init__(self, sender, recipient, receipt_id, id=None): super(MessageReceipt, self).__init__(sender, recipient, id=id, use_receipt=False) self.receipt_id = receipt_id def to_xml_element(self): xml_element = super(MessageReceipt, self).to_xml_element() receipt_element = domish.Element((RECEIPTS_NS, 'received')) receipt_element['id'] = self.receipt_id xml_element.addChild(receipt_element) return xml_element - class BasePresenceStanza(BaseStanza): stanza_type = 'presence' class AvailabilityPresence(BasePresenceStanza): def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None): super(AvailabilityPresence, self).__init__(sender, recipient, id=id) self.available = available self.show = show self.priority = priority self.statuses = statuses or {} def _get_available(self): return self.__dict__['available'] def _set_available(self, available): if available: self.type = None else: self.type = 'unavailable' self.__dict__['available'] = available available = property(_get_available, _set_available) del _get_available, _set_available @property def status(self): status = self.statuses.get(None) if status is None: try: status = self.statuses.itervalues().next() except StopIteration: pass return status def to_xml_element(self): xml_element = super(BasePresenceStanza, self).to_xml_element() if self.available: if self.show is not None: xml_element.addElement('show', content=self.show) if self.priority != 0: xml_element.addElement('priority', content=unicode(self.priority)) for lang, text in self.statuses.iteritems(): status = xml_element.addElement('status', content=text) if lang: status[(XML_NS, 'lang')] = lang return xml_element class SubscriptionPresence(BasePresenceStanza): def __init__(self, sender, recipient, type, id=None): super(SubscriptionPresence, self).__init__(sender, recipient, type=type, id=id) self.type = type class ProbePresence(BasePresenceStanza): type = 'probe' -class ErrorStanza(object): - """ - Stanza representing an error of another stanza. It's not a base stanza type on its own. - """ +class MUCAvailabilityPresence(AvailabilityPresence): + def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None, affiliation=None, jid=None, role=None, muc_statuses=None): + super(MUCAvailabilityPresence, self).__init__(sender, recipient, available, show, statuses, priority, id) + self.affiliation = affiliation or 'member' + self.role = role or 'participant' + self.muc_statuses = muc_statuses or [] + self.jid = jid - def __init__(self, stanza_type, sender, recipient, error_type, conditions, id=None): - self.stanza_type = stanza_type - self.sender = sender - self.recipient = recipient - self.id = id - self.conditions = conditions - self.error_type = error_type + def to_xml_element(self): + xml_element = super(MUCAvailabilityPresence, self).to_xml_element() + muc = xml_element.addElement('x', defaultUri=MUC_USER_NS) + item = muc.addElement('item') + if self.affiliation: + item['affiliation'] = self.affiliation + if self.role: + item['role'] = self.role + if self.jid: + item['jid'] = self.jid.uri.as_string('xmpp') + for code in self.muc_statuses: + status = muc.addElement('status') + status['code'] = code + return xml_element - @classmethod - def from_stanza(cls, stanza, error_type, conditions): - # In error stanzas sender and recipient are swapped - return cls(stanza.stanza_type, stanza.recipient, stanza.sender, error_type, conditions, id=stanza.id) +class MUCErrorPresence(ErrorStanza): def to_xml_element(self): - xml_element = domish.Element((None, self.stanza_type)) - xml_element['from'] = self.sender.uri.as_string('xmpp') - xml_element['to'] = self.recipient.uri.as_string('xmpp') - xml_element['type'] = 'error' - if self.id is not None: - xml_element['id'] = self.id - error_element = domish.Element((None, 'error')) - error_element['type'] = self.error_type - [error_element.addChild(domish.Element((ns, condition))) for condition, ns in self.conditions] - xml_element.addChild(error_element) + xml_element = super(MUCErrorPresence, self).to_xml_element() + xml_element.addElement('x', defaultUri=MUC_USER_NS) return xml_element -