diff --git a/sylk/applications/xmppgateway/xmpp/__init__.py b/sylk/applications/xmppgateway/xmpp/__init__.py index 3db331d..1159e31 100644 --- a/sylk/applications/xmppgateway/xmpp/__init__.py +++ b/sylk/applications/xmppgateway/xmpp/__init__.py @@ -1,252 +1,252 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.types import Singleton from datetime import datetime from sipsimple.util import Timestamp, TimestampedNotificationData from twisted.internet import reactor from twisted.words.protocols.jabber.jid import internJID as JID from wokkel.component import InternalComponent, Router as _Router -from wokkel.server import ServerService, XMPPS2SServerFactory as _XMPPS2SServerFactory, DeferredS2SClientFactory as _DeferredS2SClientFactory +from wokkel.server import ServerService, XMPPS2SServerFactory, DeferredS2SClientFactory from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import FrozenURI from sylk.applications.xmppgateway.logger import Logger from sylk.applications.xmppgateway.xmpp.protocols import MessageProtocol, PresenceProtocol from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscriptionManager log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) xmpp_logger = Logger() # Utility classes class Router(_Router): def route(self, stanza): """ Route a stanza. (subclassed to avoid vebose logging) @param stanza: The stanza to be routed. @type stanza: L{domish.Element}. """ destination = JID(stanza['to']) if destination.host in self.routes: self.routes[destination.host].send(stanza) else: self.routes[None].send(stanza) -class XMPPS2SServerFactory(_XMPPS2SServerFactory): +class XMPPS2SServerFactory(XMPPS2SServerFactory): def onConnectionMade(self, xs): - super(XMPPS2SServerFactory, self).onConnectionMade(xs) + super(self.__class__, self).onConnectionMade(xs) def logDataIn(buf): buf = buf.strip() if buf: xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) def logDataOut(buf): buf = buf.strip() if buf: xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) if XMPPGatewayConfig.trace_xmpp: xs.rawDataInFn = logDataIn xs.rawDataOutFn = logDataOut -class DeferredS2SClientFactory(_DeferredS2SClientFactory): +class DeferredS2SClientFactory(DeferredS2SClientFactory): def onConnectionMade(self, xs): - super(DeferredS2SClientFactory, self).onConnectionMade(xs) + super(self.__class__, self).onConnectionMade(xs) def logDataIn(buf): if buf: xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) def logDataOut(buf): if buf: xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) if XMPPGatewayConfig.trace_xmpp: xs.rawDataInFn = logDataIn xs.rawDataOutFn = logDataOut # Patch Wokkel's DeferredS2SClientFactory to use our logger import wokkel.server wokkel.server.DeferredS2SClientFactory = DeferredS2SClientFactory del wokkel.server # Manager class XMPPManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): config = XMPPGatewayConfig self.stopped = False router = Router() self._server_service = ServerService(router) self._server_service.domains = set(config.domains) self._server_service.logTraffic = False # done manually self._s2s_factory = XMPPS2SServerFactory(self._server_service) self._s2s_factory.logTraffic = False # done manually self._internal_component = InternalComponent(router) self._internal_component.domains = set(config.domains) self._message_protocol = MessageProtocol() self._message_protocol.setHandlerParent(self._internal_component) self._presence_protocol = PresenceProtocol() self._presence_protocol.setHandlerParent(self._internal_component) self._s2s_listener = None self.chat_session_manager = XMPPChatSessionManager() self.subscription_manager = XMPPSubscriptionManager() def start(self): self.stopped = False xmpp_logger.start() config = XMPPGatewayConfig self._s2s_listener = reactor.listenTCP(config.local_port, self._s2s_factory, interface=config.local_ip) listen_address = self._s2s_listener.getHost() log.msg("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port)) self.chat_session_manager.start() self.subscription_manager.start() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._internal_component) self._internal_component.startService() def stop(self): self.stopped = True self._s2s_listener.stopListening() self.subscription_manager.stop() self.chat_session_manager.stop() self._internal_component.stopService() notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._internal_component) xmpp_logger.stop() def send_stanza(self, stanza): if self.stopped: return self._internal_component.send(stanza.to_xml_element()) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Process message stanzas def _NH_XMPPGotChatMessage(self, notification): message = notification.data.message try: session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data) else: session.channel.send(message) def _NH_XMPPGotNormalMessage(self, notification): notification_center = NotificationCenter() notification_center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data) def _NH_XMPPGotComposingIndication(self, notification): notification_center = NotificationCenter() composing_indication = notification.data.composing_indication try: session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)] except KeyError: notification_center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data) else: session.channel.send(composing_indication) def _NH_XMPPGotErrorMessage(self, notification): error_message = notification.data.error_message try: session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data) else: session.channel.send(error_message) def _NH_XMPPGotReceipt(self, notification): receipt = notification.data.receipt try: session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)] except KeyError: pass else: session.channel.send(receipt) # Process presence stanzas def _NH_XMPPGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: # Ignore incoming presence stanzas if there is no subscription pass else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceSubscriptionStatus(self, notification): stanza = notification.data.presence_stanza if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None: # Skip directed presence return if stanza.type in ('subscribed', 'unsubscribed'): try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: pass else: subscription.channel.send(stanza) elif stanza.type in ('subscribe', 'unsubscribe'): try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: if stanza.type == 'subscribe': notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceProbe(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) else: subscription.channel.send(stanza) diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py index 3756d07..1dfdd2f 100644 --- a/sylk/applications/xmppgateway/xmpp/protocols.py +++ b/sylk/applications/xmppgateway/xmpp/protocols.py @@ -1,137 +1,138 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import NotificationCenter from sipsimple.util import TimestampedNotificationData -from wokkel.xmppim import MessageProtocol as _MessageProtocol, PresenceProtocol as _PresenceProtocol +from wokkel.xmppim import MessageProtocol, PresenceProtocol from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI from sylk.applications.xmppgateway.xmpp.stanzas import NormalMessage, MessageReceipt, ChatMessage, \ ChatComposingIndication, ErrorStanza, AvailabilityPresence, SubscriptionPresence, ProbePresence from sylk.applications.xmppgateway.xmpp.stanzas import RECEIPTS_NS, CHATSTATES_NS -class MessageProtocol(_MessageProtocol): + +class MessageProtocol(MessageProtocol): messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType not in self.messageTypes: message["type"] = 'normal' self.onMessage(message) def onMessage(self, msg): notification_center = NotificationCenter() sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) type = msg.getAttribute('type') if type == 'error': error_type = msg.error['type'] conditions = [(child.name, child.defaultUri) for child in msg.error.children] error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg.getAttribute('id', None)) notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=TimestampedNotificationData(error_message=error_message)) return if type in (None, 'normal', 'chat') and msg.body is not None: if msg.html is not None: content_type = 'text/html' body = msg.html.toXml() else: content_type = 'text/plain' body = unicode(msg.body) use_receipt = msg.request is not None and msg.request.defaultUri == RECEIPTS_NS if type == 'chat': message = ChatMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) else: message = NormalMessage(sender, recipient, body, content_type, id=msg.getAttribute('id', None), use_receipt=use_receipt) notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) return if type == 'chat' and msg.body is None: # Check if it's a composing indication for attr in ('active', 'inactive', 'composing', 'paused', 'gone'): attr_obj = getattr(msg, attr, None) if attr_obj is not None and attr_obj.defaultUri == CHATSTATES_NS: state = attr break else: state = None if state is not None: composing_indication = ChatComposingIndication(sender, recipient, state, id=msg.getAttribute('id', None)) notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=TimestampedNotificationData(composing_indication=composing_indication)) return # Check if it's a receipt acknowledgement if msg.body is None and msg.received is not None and msg.received.defaultUri == RECEIPTS_NS: receipt_id = msg.getAttribute('id', None) if receipt_id is not None: receipt = MessageReceipt(sender, recipient, receipt_id) notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=TimestampedNotificationData(receipt=receipt)) -class PresenceProtocol(_PresenceProtocol): +class PresenceProtocol(PresenceProtocol): def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') show = stanza.show statuses = stanza.statuses presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) def _process_subscription_stanza(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') type = stanza.element.getAttribute('type') presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) def subscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def subscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def probeReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = ProbePresence(sender, recipient, id=id) notification_center = NotificationCenter() notification_center.post_notification('XMPPGotPresenceProbe', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza))