diff --git a/sylk/applications/xmppgateway/xmpp/__init__.py b/sylk/applications/xmppgateway/xmpp/__init__.py index a03daf9..dc10a41 100644 --- a/sylk/applications/xmppgateway/xmpp/__init__.py +++ b/sylk/applications/xmppgateway/xmpp/__init__.py @@ -1,303 +1,354 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from sipsimple.util import ISOTimestamp from twisted.internet import reactor -from twisted.words.protocols.jabber.jid import internJID as JID +from twisted.words.protocols.jabber.error import StanzaError +from twisted.words.protocols.jabber.jid import JID, internJID +from wokkel import disco from wokkel.component import InternalComponent, Router as _Router from wokkel.server import ServerService, XMPPS2SServerFactory, DeferredS2SClientFactory from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import FrozenURI from sylk.applications.xmppgateway.xmpp.logger import Logger as XMPPLogger -from sylk.applications.xmppgateway.xmpp.protocols import MessageProtocol, MUCServerProtocol, PresenceProtocol +from sylk.applications.xmppgateway.xmpp.protocols import DiscoProtocol, MessageProtocol, MUCServerProtocol, PresenceProtocol from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager, XMPPMucSessionManager from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscriptionManager log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) xmpp_logger = XMPPLogger() # Utility classes class Router(_Router): def route(self, stanza): """ Route a stanza. (subclassed to avoid vebose logging) @param stanza: The stanza to be routed. @type stanza: L{domish.Element}. """ - destination = JID(stanza['to']) + destination = internJID(stanza['to']) if destination.host in self.routes: self.routes[destination.host].send(stanza) else: self.routes[None].send(stanza) class XMPPS2SServerFactory(XMPPS2SServerFactory): def onConnectionMade(self, xs): super(self.__class__, self).onConnectionMade(xs) def logDataIn(buf): buf = buf.strip() if buf: xmpp_logger.msg("RECEIVED", ISOTimestamp.now(), buf) def logDataOut(buf): buf = buf.strip() if buf: xmpp_logger.msg("SENDING", ISOTimestamp.now(), buf) if XMPPGatewayConfig.trace_xmpp: xs.rawDataInFn = logDataIn xs.rawDataOutFn = logDataOut class DeferredS2SClientFactory(DeferredS2SClientFactory): def onConnectionMade(self, xs): super(self.__class__, self).onConnectionMade(xs) def logDataIn(buf): buf = buf.strip() if buf: xmpp_logger.msg("RECEIVED", ISOTimestamp.now(), buf) def logDataOut(buf): buf = buf.strip() if buf: xmpp_logger.msg("SENDING", ISOTimestamp.now(), buf) if XMPPGatewayConfig.trace_xmpp: xs.rawDataInFn = logDataIn xs.rawDataOutFn = logDataOut # Patch Wokkel's DeferredS2SClientFactory to use our logger import wokkel.server wokkel.server.DeferredS2SClientFactory = DeferredS2SClientFactory del wokkel.server # Manager class XMPPManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): config = XMPPGatewayConfig self.stopped = False - self.domains = config.domains - self.muc_domains = ['%s.%s' % (config.muc_prefix, domain) for domain in self.domains] + self.domains = set(config.domains) + self.muc_domains = set(['%s.%s' % (config.muc_prefix, domain) for domain in self.domains]) router = Router() self._server_service = ServerService(router) - self._server_service.domains = set(self.domains+self.muc_domains) + self._server_service.domains = self.domains | self.muc_domains self._server_service.logTraffic = False # done manually self._s2s_factory = XMPPS2SServerFactory(self._server_service) self._s2s_factory.logTraffic = False # done manually self._internal_component = InternalComponent(router) - self._internal_component.domains = set(self.domains) + self._internal_component.domains = self.domains self._message_protocol = MessageProtocol() self._message_protocol.setHandlerParent(self._internal_component) self._presence_protocol = PresenceProtocol() self._presence_protocol.setHandlerParent(self._internal_component) + self._disco_protocol = DiscoProtocol() + self._disco_protocol.setHandlerParent(self._internal_component) + self._muc_component = InternalComponent(router) - self._muc_component.domains = set(self.muc_domains) + self._muc_component.domains = self.muc_domains self._muc_protocol = MUCServerProtocol() self._muc_protocol.setHandlerParent(self._muc_component) + self._disco_muc_protocol = DiscoProtocol() + self._disco_muc_protocol.setHandlerParent(self._muc_component) + self._s2s_listener = None self.chat_session_manager = XMPPChatSessionManager() self.muc_session_manager = XMPPMucSessionManager() self.subscription_manager = XMPPSubscriptionManager() def start(self): self.stopped = False xmpp_logger.start() config = XMPPGatewayConfig self._s2s_listener = reactor.listenTCP(config.local_port, self._s2s_factory, interface=config.local_ip) listen_address = self._s2s_listener.getHost() log.msg("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port)) self.chat_session_manager.start() self.muc_session_manager.start() self.subscription_manager.start() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._internal_component) notification_center.add_observer(self, sender=self._muc_component) self._internal_component.startService() self._muc_component.startService() def stop(self): self.stopped = True self._s2s_listener.stopListening() self.subscription_manager.stop() self.muc_session_manager.stop() self.chat_session_manager.stop() self._internal_component.stopService() self._muc_component.stopService() notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._internal_component) notification_center.remove_observer(self, sender=self._muc_component) xmpp_logger.stop() def send_stanza(self, stanza): if self.stopped: return self._internal_component.send(stanza.to_xml_element()) def send_muc_stanza(self, stanza): if self.stopped: return self._muc_component.send(stanza.to_xml_element()) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Process message stanzas def _NH_XMPPGotChatMessage(self, notification): message = notification.data.message try: session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data) else: session.channel.send(message) def _NH_XMPPGotNormalMessage(self, notification): notification_center = NotificationCenter() notification_center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data) def _NH_XMPPGotComposingIndication(self, notification): notification_center = NotificationCenter() composing_indication = notification.data.composing_indication try: session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)] except KeyError: notification_center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data) else: session.channel.send(composing_indication) def _NH_XMPPGotErrorMessage(self, notification): error_message = notification.data.error_message try: session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data) else: session.channel.send(error_message) def _NH_XMPPGotReceipt(self, notification): receipt = notification.data.receipt try: session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)] except KeyError: pass else: session.channel.send(receipt) # Process presence stanzas def _NH_XMPPGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: # Ignore incoming presence stanzas if there is no subscription pass else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceSubscriptionStatus(self, notification): stanza = notification.data.presence_stanza if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None: # Skip directed presence return if stanza.type in ('subscribed', 'unsubscribed'): try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: pass else: subscription.channel.send(stanza) elif stanza.type in ('subscribe', 'unsubscribe'): try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: if stanza.type == 'subscribe': notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza)) else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceProbe(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza)) else: subscription.channel.send(stanza) # Process muc stanzas def _NH_XMPPMucGotGroupChat(self, notification): message = notification.data.message muc_uri = FrozenURI(message.recipient.uri.user, message.recipient.uri.host) try: session = self.muc_session_manager.incoming[(muc_uri, message.sender.uri)] except KeyError: # Ignore groupchat messages if there was no session created pass else: session.channel.send(message) def _NH_XMPPMucGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if not stanza.sender.uri.resource: return muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) try: session = self.muc_session_manager.incoming[(muc_uri, stanza.sender.uri)] except KeyError: notification_center = NotificationCenter() if stanza.available: notification_center.post_notification('XMPPGotMucJoinRequest', sender=self, data=NotificationData(stanza=stanza)) else: notification_center.post_notification('XMPPGotMucLeaveRequest', sender=self, data=NotificationData(stanza=stanza)) else: session.channel.send(stanza) + # Disco + + def _NH_XMPPGotDiscoInfoRequest(self, notification): + d = notification.data.deferred + target_uri = notification.data.target.uri + + if notification.data.node_identifier: + d.errback(StanzaError('service-unavailable')) + return + + if target_uri.host not in self.domains | self.muc_domains: + d.errback(StanzaError('service-unavailable')) + return + + elements = [] + + if target_uri.host in self.muc_domains: + elements.append(disco.DiscoIdentity('conference', 'text', 'SylkServer Chat Service')) + elements.append(disco.DiscoFeature('http://jabber.org/protocol/muc')) + if target_uri.user: + # We can't say much more here, because the actual conference may end up on a different server + elements.append(disco.DiscoFeature('muc_temporary')) + elements.append(disco.DiscoFeature('muc_unmoderated')) + else: + if not target_uri.user: + elements.append(disco.DiscoIdentity('gateway', 'simple')) + elements.append(disco.DiscoIdentity('server', 'im')) + else: + elements.append(disco.DiscoIdentity('account', 'registered')) + + elements.append(disco.DiscoFeature('http://sylkserver.com')) + d.callback(elements) + + def _NH_XMPPGotDiscoItemsRequest(self, notification): + d = notification.data.deferred + target_uri = notification.data.target.uri + items = [] + + if not target_uri.user and target_uri.host in self.domains: + items.append(disco.DiscoItem(JID('%s.%s' % (XMPPGatewayConfig.muc_prefix, target_uri.host)), name='Multi-User Chat')) + + d.callback(items) + diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py index 0e1fe00..b653edb 100644 --- a/sylk/applications/xmppgateway/xmpp/protocols.py +++ b/sylk/applications/xmppgateway/xmpp/protocols.py @@ -1,192 +1,249 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import NotificationCenter, NotificationData +from twisted.internet import defer +from wokkel.disco import DiscoHandler from wokkel.muc import UserPresence from wokkel.xmppim import BasePresenceProtocol, MessageProtocol, PresenceProtocol from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI from sylk.applications.xmppgateway.xmpp.stanzas import RECEIPTS_NS, CHATSTATES_NS, ErrorStanza, \ NormalMessage, MessageReceipt, ChatMessage, ChatComposingIndication, \ AvailabilityPresence, SubscriptionPresence, ProbePresence, \ MUCAvailabilityPresence, GroupChatMessage \ -__all__ = ['MessageProtocol', 'MUCServerProtocol', 'PresenceProtocol'] +__all__ = ['DiscoProtocol', 'MessageProtocol', 'MUCServerProtocol', 'PresenceProtocol'] class MessageProtocol(MessageProtocol): messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType not in self.messageTypes: message["type"] = 'normal' self.onMessage(message) def onMessage(self, msg): notification_center = NotificationCenter() sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) type = msg.getAttribute('type') if type == 'error': error_type = msg.error['type'] conditions = [(child.name, child.defaultUri) for child in msg.error.children] error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg.getAttribute('id', None)) notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=NotificationData(error_message=error_message)) return if type in (None, 'normal', 'chat') and msg.body is not None or msg.html is not None: body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) use_receipt = msg.request is not None and msg.request.defaultUri == RECEIPTS_NS if type == 'chat': message = ChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None), use_receipt=use_receipt) notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=NotificationData(message=message)) else: message = NormalMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None), use_receipt=use_receipt) notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=NotificationData(message=message)) return if type == 'chat' and msg.body is None and msg.html is None: # Check if it's a composing indication for state in ('active', 'inactive', 'composing', 'paused', 'gone'): state_obj = getattr(msg, state, None) if state_obj is not None and state_obj.defaultUri == CHATSTATES_NS: composing_indication = ChatComposingIndication(sender, recipient, state, id=msg.getAttribute('id', None)) notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=NotificationData(composing_indication=composing_indication)) return # Check if it's a receipt acknowledgement if msg.body is None and msg.html is None and msg.received is not None and msg.received.defaultUri == RECEIPTS_NS: receipt_id = msg.getAttribute('id', None) if receipt_id is not None: receipt = MessageReceipt(sender, recipient, receipt_id) notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=NotificationData(receipt=receipt)) class PresenceProtocol(PresenceProtocol): def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') show = stanza.show statuses = stanza.statuses presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def _process_subscription_stanza(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') type = stanza.element.getAttribute('type') presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def subscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def subscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def probeReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = ProbePresence(sender, recipient, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceProbe', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) class MUCServerProtocol(BasePresenceProtocol): messageTypes = None, 'normal', 'chat', 'groupchat' presenceTypeParserMap = {'available': UserPresence, 'unavailable': UserPresence} def connectionInitialized(self): BasePresenceProtocol.connectionInitialized(self) self.xmlstream.addObserver('/message', self._onMessage) def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType == 'error': return if messageType not in self.messageTypes: message['type'] = 'normal' if messageType == 'groupchat': self.onGroupChat(message) else: # TODO: give error, private messages not supported pass def onGroupChat(self, msg): sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) message = GroupChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None)) notification_center = NotificationCenter() notification_center.post_notification('XMPPMucGotGroupChat', sender=self.parent, data=NotificationData(message=message)) def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=True, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=False, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) + +class DiscoProtocol(DiscoHandler): + + def info(self, requestor, target, nodeIdentifier): + """ + Gather data for a disco info request. + + @param requestor: The entity that sent the request. + @type requestor: L{JID} + @param target: The entity the request was sent to. + @type target: L{JID} + @param nodeIdentifier: The optional node being queried, or C{''}. + @type nodeIdentifier: C{unicode} + @return: Deferred with the gathered results from sibling handlers. + @rtype: L{defer.Deferred} + """ + + d = defer.Deferred() + + sender_uri = FrozenURI.parse(requestor) + sender = Identity(sender_uri) + target_uri = FrozenURI.parse(target) + target = Identity(target_uri) + + data = NotificationData(sender=sender, target=target, node_identifier=nodeIdentifier, deferred=d) + NotificationCenter().post_notification('XMPPGotDiscoInfoRequest', sender=self.parent, data=data) + + return d + + def items(self, requestor, target, nodeIdentifier): + """ + Gather data for a disco items request. + + @param requestor: The entity that sent the request. + @type requestor: L{JID} + @param target: The entity the request was sent to. + @type target: L{JID} + @param nodeIdentifier: The optional node being queried, or C{''}. + @type nodeIdentifier: C{unicode} + @return: Deferred with the gathered results from sibling handlers. + @rtype: L{defer.Deferred} + """ + + d = defer.Deferred() + + sender_uri = FrozenURI.parse(requestor) + sender = Identity(sender_uri) + target_uri = FrozenURI.parse(target) + target = Identity(target_uri) + + data = NotificationData(sender=sender, target=target, node_identifier=nodeIdentifier, deferred=d) + NotificationCenter().post_notification('XMPPGotDiscoItemsRequest', sender=self.parent, data=data) + + return d +