Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py
index 78c8626..18eb99e 100644
--- a/sylk/applications/xmppgateway/muc.py
+++ b/sylk/applications/xmppgateway/muc.py
@@ -1,466 +1,475 @@
import uuid
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.descriptor import WriteOnceAttribute
from eventlib import coros, proc
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, SIPURI, SIPCoreError, Referral, sip_status_messages
from sipsimple.core import ContactHeader, FromHeader, ToHeader, ReferToHeader, RouteHeader
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.streams import MediaStreamRegistry
from sipsimple.streams.msrp.chat import ChatStreamError, ChatIdentity
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from time import time
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource
from sylk.applications.xmppgateway.logger import log
from sylk.applications.xmppgateway.xmpp import XMPPManager
from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession
from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, OutgoingInvitationMessage, STANZAS_NS
from sylk.configuration import SIPConfig
from sylk.session import Session
class ReferralError(Exception):
def __init__(self, error, code=0):
self.error = error
self.code = code
class SIPReferralDidFail(Exception):
def __init__(self, data):
self.data = data
class MucInvitationFailure(object):
def __init__(self, code, reason):
self.code = code
self.reason = reason
def __str__(self):
return '%s (%s)' % (self.code, self.reason)
class X2SMucInvitationHandler(object):
implements(IObserver)
def __init__(self, sender, recipient, participant):
self.sender = sender
self.recipient = recipient
self.participant = participant
self.active = False
self.route = None
self._channel = coros.queue()
self._referral = None
self._failure = None
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, name='NetworkConditionsDidChange')
proc.spawn(self._run)
notification_center.post_notification('X2SMucInvitationHandlerDidStart', sender=self)
def _run(self):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
sender_uri = self.sender.uri.as_sip_uri()
recipient_uri = self.recipient.uri.as_sip_uri()
participant_uri = self.participant.uri.as_sip_uri()
try:
# Lookup routes
account = DefaultAccount()
if account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = SIPURI.new(recipient_uri)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError as e:
raise ReferralError(error='DNS lookup failed: %s' % e)
timeout = time() + 30
for route in routes:
self.route = route
remaining_time = timeout - time()
if remaining_time > 0:
transport = route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
refer_to_header = ReferToHeader(str(participant_uri))
refer_to_header.parameters['method'] = 'INVITE'
referral = Referral(recipient_uri, FromHeader(sender_uri),
ToHeader(recipient_uri),
refer_to_header,
ContactHeader(contact_uri),
RouteHeader(route.uri),
account.credentials)
notification_center.add_observer(self, sender=referral)
try:
referral.send_refer(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=referral)
timeout = 5
raise ReferralError(error='Internal error')
self._referral = referral
try:
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidStart':
break
except SIPReferralDidFail as e:
notification_center.remove_observer(self, sender=referral)
self._referral = None
if e.data.code in (403, 405):
raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code)
else:
# Otherwise just try the next route
continue
else:
break
else:
self.route = None
raise ReferralError(error='No more routes to try')
# At this point it is subscribed. Handle notifications and ending/failures.
try:
self.active = True
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidEnd':
break
except SIPReferralDidFail as e:
notification_center.remove_observer(self, sender=self._referral)
raise ReferralError(error=e.data.reason, code=e.data.code)
else:
notification_center.remove_observer(self, sender=self._referral)
finally:
self.active = False
except ReferralError as e:
self._failure = MucInvitationFailure(e.code, e.error)
finally:
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self._referral = None
if self._failure is not None:
notification_center.post_notification('X2SMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure))
else:
notification_center.post_notification('X2SMucInvitationHandlerDidEnd', sender=self)
def _refresh(self):
account = DefaultAccount()
transport = self.route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
contact_header = ContactHeader(contact_uri)
self._referral.refresh(contact_header=contact_header, timeout=2)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPReferralDidStart(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidEnd(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidFail(self, notification):
self._channel.send_exception(SIPReferralDidFail(notification.data))
def _NH_SIPReferralGotNotify(self, notification):
self._channel.send(notification)
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._refresh()
class S2XMucInvitationHandler(object):
implements(IObserver)
def __init__(self, session, sender, recipient, inviter):
self.session = session
self.sender = sender
self.recipient = recipient
self.inviter = inviter
self._timer = None
self._failure = None
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
stanza = OutgoingInvitationMessage(self.sender, self.recipient, self.inviter, id='MUC.'+uuid.uuid4().hex)
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
self._timer = reactor.callLater(90, self._timeout)
notification_center.post_notification('S2XMucInvitationHandlerDidStart', sender=self)
def stop(self):
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
notification_center = NotificationCenter()
if self.session is not None:
notification_center.remove_observer(self, sender=self.session)
reactor.callLater(5, self._end_session, self.session)
self.session = None
if self._failure is not None:
notification_center.post_notification('S2XMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure))
else:
notification_center.post_notification('S2XMucInvitationHandlerDidEnd', sender=self)
def _end_session(self, session):
try:
session.end(480)
except Exception:
pass
def _timeout(self):
NotificationCenter().remove_observer(self, sender=self.session)
try:
self.session.end(408)
except Exception:
pass
self.session = None
self._failure = MucInvitationFailure('Timeout', 408)
self.stop()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidFail(self, notification):
notification.center.remove_observer(self, sender=self.session)
self.session = None
self._failure = MucInvitationFailure(notification.data.reason or notification.data.failure_reason, notification.data.code)
self.stop()
class X2SMucHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity, nickname):
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
self.nickname = nickname
self._xmpp_muc_session = None
self._sip_session = None
self._msrp_stream = None
self._first_stanza = None
self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza
self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza
self._participants = set() # set of (URI, nickname) tuples
self.ended = False
def start(self):
notification_center = NotificationCenter()
self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_muc_session)
self._xmpp_muc_session.start()
notification_center.post_notification('X2SMucHandlerDidStart', sender=self)
self._start_sip_session()
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_muc_session is not None:
notification_center.remove_observer(self, sender=self._xmpp_muc_session)
# Send indication that the user has been kicked from the room
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False)
stanza.jid = self.xmpp_identity
stanza.muc_statuses.append('307')
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
self._xmpp_muc_session.end()
self._xmpp_muc_session = None
if self._sip_session is not None:
notification_center.remove_observer(self, sender=self._sip_session)
self._sip_session.end()
self._sip_session = None
self.ended = True
notification_center.post_notification('X2SMucHandlerDidEnd', sender=self)
@run_in_green_thread
def _start_sip_session(self):
# self.xmpp_identity is our local identity
from_uri = self.xmpp_identity.uri.as_sip_uri()
del from_uri.parameters['gr'] # no GRUU in From header
contact_uri = self.xmpp_identity.uri.as_sip_uri()
contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8'))
to_uri = self.sip_identity.uri.as_sip_uri()
lookup = DNSLookup()
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = to_uri
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError:
log.warning('DNS lookup error while looking for %s proxy' % uri)
self.end()
return
self._msrp_stream = MediaStreamRegistry.get('chat')()
route = routes.pop(0)
from_header = FromHeader(from_uri)
to_header = ToHeader(to_uri)
contact_header = ContactHeader(contact_uri)
self._sip_session = Session(account)
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._sip_session)
notification_center.add_observer(self, sender=self._msrp_stream)
self._sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self._msrp_stream])
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
log.info("SIP multiparty session %s started" % self._sip_session.call_id)
if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed:
self.end()
return
message_id = self._msrp_stream.set_local_nickname(self.nickname)
self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza)
self._first_stanza = None
def _NH_SIPSessionDidEnd(self, notification):
log.info("SIP multiparty session %s ended" % self._sip_session.call_id)
notification.center.remove_observer(self, sender=self._sip_session)
notification.center.remove_observer(self, sender=self._msrp_stream)
self._sip_session = None
self._msrp_stream = None
self.end()
def _NH_SIPSessionDidFail(self, notification):
log.info("SIP multiparty session %s failed" % self._sip_session.call_id)
notification.center.remove_observer(self, sender=self._sip_session)
notification.center.remove_observer(self, sender=self._msrp_stream)
self._sip_session = None
self._msrp_stream = None
self.end()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
self._sip_session.reject_proposal()
def _NH_SIPSessionTransferNewIncoming(self, notification):
self._sip_session.reject_transfer(403)
def _NH_SIPSessionGotConferenceInfo(self, notification):
# Translate to XMPP payload
xmpp_manager = XMPPManager()
own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host)
conference_info = notification.data.conference_info
new_participants = set()
for user in conference_info.users:
user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity)
nickname = user.display_text.value if user.display_text else user.entity
new_participants.add((user_uri, nickname))
# Remove participants that are no longer in the room
for uri, nickname in self._participants - new_participants:
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False)
xmpp_manager.send_muc_stanza(stanza)
# Send presence for current participants
for uri, nickname in new_participants:
if uri == own_uri:
continue
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True)
stanza.jid = Identity(uri)
xmpp_manager.send_muc_stanza(stanza)
self._participants = new_participants
# Send own status last
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True)
stanza.jid = self.xmpp_identity
stanza.muc_statuses.append('110')
xmpp_manager.send_muc_stanza(stanza)
def _NH_ChatStreamGotMessage(self, notification):
# Notification is sent by the MSRP stream
if not self._xmpp_muc_session:
return
message = notification.data.message
content_type = message.content_type.lower()
if content_type not in ('text/plain', 'text/html'):
return
if content_type == 'text/plain':
html_body = None
body = message.content
else:
html_body = message.content
body = None
resource = message.sender.display_name or str(message.sender.uri)
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource))
self._xmpp_muc_session.send_message(sender, body, html_body, message_id='MUC.'+uuid.uuid4().hex)
self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
def _NH_ChatStreamDidSetNickname(self, notification):
# Notification is sent by the MSRP stream
nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id)
self.nickname = nickname
def _NH_ChatStreamDidNotSetNickname(self, notification):
# Notification is sent by the MSRP stream
nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id)
error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', [('conflict', STANZAS_NS)])
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(error_stanza)
def _NH_ChatStreamDidDeliverMessage(self, notification):
# Echo back the message to the sender
stanza = self._pending_messages_map.pop(notification.data.message_id)
stanza.sender, stanza.recipient = stanza.recipient, stanza.sender
stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname)
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
def _NH_ChatStreamDidNotDeliverMessage(self, notification):
self._pending_messages_map.pop(notification.data.message_id)
def _NH_XMPPIncomingMucSessionDidEnd(self, notification):
notification.center.remove_observer(self, sender=self._xmpp_muc_session)
self._xmpp_muc_session = None
self.end()
def _NH_XMPPIncomingMucSessionGotMessage(self, notification):
if not self._sip_session:
return
message = notification.data.message
sender_uri = message.sender.uri.as_sip_uri()
del sender_uri.parameters['gr'] # no GRUU in CPIM From header
sender = ChatIdentity(sender_uri, display_name=self.nickname)
message_id = self._msrp_stream.send_message(message.body, 'text/plain', sender=sender)
self._pending_messages_map[message_id] = message
# Message will be echoed back to the sender on ChatStreamDidDeliverMessage
def _NH_XMPPIncomingMucSessionChangedNickname(self, notification):
if not self._sip_session:
return
nickname = notification.data.nickname
try:
message_id = self._msrp_stream.set_local_nickname(nickname)
except ChatStreamError:
return
self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza)
+ def _NH_XMPPIncomingMucSessionSubject(self, notification):
+ if not self._sip_session:
+ return
+ message = notification.data.message
+ sender_uri = message.sender.uri.as_sip_uri()
+ del sender_uri.parameters['gr'] # no GRUU in CPIM From header
+ sender = ChatIdentity(sender_uri, display_name=self.nickname)
+ message_id = self._msrp_stream.send_message('Conference title set to: %s' % message.body, 'text/plain', sender=sender)
+ self._pending_messages_map[message_id] = message
diff --git a/sylk/applications/xmppgateway/xmpp/__init__.py b/sylk/applications/xmppgateway/xmpp/__init__.py
index 05897a0..0936249 100644
--- a/sylk/applications/xmppgateway/xmpp/__init__.py
+++ b/sylk/applications/xmppgateway/xmpp/__init__.py
@@ -1,319 +1,330 @@
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from twisted.internet import reactor
from wokkel.disco import DiscoClientProtocol
from wokkel.generic import FallbackHandler, VersionHandler
from wokkel.ping import PingHandler
from wokkel.server import ServerService, XMPPS2SServerFactory
from zope.interface import implements
from sylk import __version__ as SYLK_VERSION
from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig
from sylk.applications.xmppgateway.datatypes import FrozenURI
from sylk.applications.xmppgateway.logger import log
from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession, JingleSessionManager
from sylk.applications.xmppgateway.xmpp.protocols import DiscoProtocol, JingleProtocol, MessageProtocol, MUCServerProtocol, MUCPresenceProtocol, PresenceProtocol
from sylk.applications.xmppgateway.xmpp.server import SylkInternalComponent, SylkRouter
from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager, XMPPMucSessionManager
from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscriptionManager
class XMPPManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
config = XMPPGatewayConfig
self.stopped = False
self.domains = set(config.domains)
self.muc_domains = set('%s.%s' % (config.muc_prefix, domain) for domain in self.domains)
router = SylkRouter()
self._server_service = ServerService(router)
self._server_service.domains = self.domains | self.muc_domains
self._server_service.logTraffic = False # done manually
self._s2s_factory = XMPPS2SServerFactory(self._server_service)
self._s2s_factory.logTraffic = False # done manually
# Setup internal components
self._internal_component = SylkInternalComponent(router)
self._internal_component.domains = self.domains
self._internal_component.manager = self
self._muc_component = SylkInternalComponent(router)
self._muc_component.domains = self.muc_domains
self._muc_component.manager = self
# Setup protocols
self.message_protocol = MessageProtocol()
self.message_protocol.setHandlerParent(self._internal_component)
self.presence_protocol = PresenceProtocol()
self.presence_protocol.setHandlerParent(self._internal_component)
self.disco_protocol = DiscoProtocol()
self.disco_protocol.setHandlerParent(self._internal_component)
self.disco_client_protocol = DiscoClientProtocol()
self.disco_client_protocol.setHandlerParent(self._internal_component)
self.muc_protocol = MUCServerProtocol()
self.muc_protocol.setHandlerParent(self._muc_component)
self.muc_presence_protocol = MUCPresenceProtocol()
self.muc_presence_protocol.setHandlerParent(self._muc_component)
self.disco_muc_protocol = DiscoProtocol()
self.disco_muc_protocol.setHandlerParent(self._muc_component)
self.version_protocol = VersionHandler('SylkServer', SYLK_VERSION)
self.version_protocol.setHandlerParent(self._internal_component)
self.fallback_protocol = FallbackHandler()
self.fallback_protocol.setHandlerParent(self._internal_component)
self.fallback_muc_protocol = FallbackHandler()
self.fallback_muc_protocol.setHandlerParent(self._muc_component)
self.ping_protocol = PingHandler()
self.ping_protocol.setHandlerParent(self._internal_component)
self.jingle_protocol = JingleProtocol()
self.jingle_protocol.setHandlerParent(self._internal_component)
self.jingle_coin_protocol = JingleProtocol()
self.jingle_coin_protocol.setHandlerParent(self._muc_component)
self._s2s_listener = None
self.chat_session_manager = XMPPChatSessionManager()
self.muc_session_manager = XMPPMucSessionManager()
self.subscription_manager = XMPPSubscriptionManager()
self.jingle_session_manager = JingleSessionManager()
def start(self):
self.stopped = False
# noinspection PyUnresolvedReferences
self._s2s_listener = reactor.listenTCP(XMPPGatewayConfig.local_port, self._s2s_factory, interface=XMPPGatewayConfig.local_ip)
listen_address = self._s2s_listener.getHost()
log.info("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port))
self.chat_session_manager.start()
self.muc_session_manager.start()
self.subscription_manager.start()
self.jingle_session_manager.start()
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._internal_component)
notification_center.add_observer(self, sender=self._muc_component)
self._internal_component.startService()
self._muc_component.startService()
def stop(self):
self.stopped = True
self._s2s_listener.stopListening()
self.jingle_session_manager.stop()
self.subscription_manager.stop()
self.muc_session_manager.stop()
self.chat_session_manager.stop()
self._internal_component.stopService()
self._muc_component.stopService()
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self._internal_component)
notification_center.remove_observer(self, sender=self._muc_component)
def send_stanza(self, stanza):
if self.stopped:
return
self._internal_component.send(stanza.to_xml_element())
def send_muc_stanza(self, stanza):
if self.stopped:
return
self._muc_component.send(stanza.to_xml_element())
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
# Process message stanzas
def _NH_XMPPGotChatMessage(self, notification):
message = notification.data.message
try:
session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)]
except KeyError:
notification.center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data)
else:
session.channel.send(message)
def _NH_XMPPGotNormalMessage(self, notification):
notification.center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data)
def _NH_XMPPGotComposingIndication(self, notification):
composing_indication = notification.data.composing_indication
try:
session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)]
except KeyError:
notification.center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data)
else:
session.channel.send(composing_indication)
def _NH_XMPPGotErrorMessage(self, notification):
error_message = notification.data.error_message
try:
session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)]
except KeyError:
notification.center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data)
else:
session.channel.send(error_message)
def _NH_XMPPGotReceipt(self, notification):
receipt = notification.data.receipt
try:
session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)]
except KeyError:
pass
else:
session.channel.send(receipt)
# Process presence stanzas
def _NH_XMPPGotPresenceAvailability(self, notification):
stanza = notification.data.presence_stanza
if stanza.recipient.uri.resource is not None:
# Skip directed presence
return
sender_uri = stanza.sender.uri
sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host)
try:
subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)]
except KeyError:
# Ignore incoming presence stanzas if there is no subscription
pass
else:
subscription.channel.send(stanza)
def _NH_XMPPGotPresenceSubscriptionStatus(self, notification):
stanza = notification.data.presence_stanza
if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None:
# Skip directed presence
return
if stanza.type in ('subscribed', 'unsubscribed'):
try:
subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)]
except KeyError:
pass
else:
subscription.channel.send(stanza)
elif stanza.type in ('subscribe', 'unsubscribe'):
try:
subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)]
except KeyError:
if stanza.type == 'subscribe':
notification.center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza))
else:
subscription.channel.send(stanza)
def _NH_XMPPGotPresenceProbe(self, notification):
stanza = notification.data.presence_stanza
if stanza.recipient.uri.resource is not None:
# Skip directed presence
return
sender_uri = stanza.sender.uri
sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host)
try:
subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)]
except KeyError:
notification.center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza))
else:
subscription.channel.send(stanza)
# Process muc stanzas
def _NH_XMPPMucGotGroupChat(self, notification):
message = notification.data.message
muc_uri = FrozenURI(message.recipient.uri.user, message.recipient.uri.host)
try:
session = self.muc_session_manager.incoming[(muc_uri, message.sender.uri)]
except KeyError:
# Ignore groupchat messages if there was no session created
pass
else:
session.channel.send(message)
+ def _NH_XMPPMucGotSubject(self, notification):
+ message = notification.data.message
+ muc_uri = FrozenURI(message.recipient.uri.user, message.recipient.uri.host)
+ try:
+ session = self.muc_session_manager.incoming[(muc_uri, message.sender.uri)]
+ except KeyError:
+ # Ignore groupchat messages if there was no session created
+ pass
+ else:
+ session.channel.send(message)
+
def _NH_XMPPMucGotPresenceAvailability(self, notification):
stanza = notification.data.presence_stanza
if not stanza.sender.uri.resource:
return
muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host)
try:
session = self.muc_session_manager.incoming[(muc_uri, stanza.sender.uri)]
except KeyError:
if stanza.available:
notification.center.post_notification('XMPPGotMucJoinRequest', sender=self, data=NotificationData(stanza=stanza))
else:
notification.center.post_notification('XMPPGotMucLeaveRequest', sender=self, data=NotificationData(stanza=stanza))
else:
session.channel.send(stanza)
def _NH_XMPPMucGotInvitation(self, notification):
invitation = notification.data.invitation
data = NotificationData(sender=invitation.sender, recipient=invitation.recipient, participant=invitation.invited_user)
notification.center.post_notification('XMPPGotMucAddParticipantRequest', sender=self, data=data)
# Jingle
def _NH_XMPPGotJingleSessionInitiate(self, notification):
stanza = notification.data.stanza
try:
self.jingle_session_manager.sessions[stanza.jingle.sid]
except KeyError:
session = JingleSession(notification.data.protocol)
session.init_incoming(stanza)
session.send_ring_indication()
def _NH_XMPPGotJingleSessionTerminate(self, notification):
stanza = notification.data.stanza
try:
session = self.jingle_session_manager.sessions[stanza.jingle.sid]
except KeyError:
return
session.handle_notification(notification)
def _NH_XMPPGotJingleSessionInfo(self, notification):
stanza = notification.data.stanza
try:
session = self.jingle_session_manager.sessions[stanza.jingle.sid]
except KeyError:
return
session.handle_notification(notification)
def _NH_XMPPGotJingleSessionAccept(self, notification):
stanza = notification.data.stanza
try:
session = self.jingle_session_manager.sessions[stanza.jingle.sid]
except KeyError:
return
session.handle_notification(notification)
def _NH_XMPPGotJingleDescriptionInfo(self, notification):
stanza = notification.data.stanza
try:
session = self.jingle_session_manager.sessions[stanza.jingle.sid]
except KeyError:
return
session.handle_notification(notification)
def _NH_XMPPGotJingleTransportInfo(self, notification):
stanza = notification.data.stanza
try:
session = self.jingle_session_manager.sessions[stanza.jingle.sid]
except KeyError:
return
session.handle_notification(notification)
diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py
index 56205d9..a361cbb 100644
--- a/sylk/applications/xmppgateway/xmpp/protocols.py
+++ b/sylk/applications/xmppgateway/xmpp/protocols.py
@@ -1,380 +1,387 @@
from application.notification import NotificationCenter, NotificationData
from twisted.internet import defer, reactor
from twisted.words.protocols.jabber.error import StanzaError
from twisted.words.protocols.jabber.jid import JID
from wokkel import disco, muc, ping, xmppim
from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI
from sylk.applications.xmppgateway.xmpp.stanzas import RECEIPTS_NS, CHATSTATES_NS, MUC_USER_NS, ErrorStanza, ChatComposingIndication
-from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, GroupChatMessage, IncomingInvitationMessage, NormalMessage, MessageReceipt
+from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, GroupChatMessage, GroupChatSubject, IncomingInvitationMessage, NormalMessage, MessageReceipt
from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence, SubscriptionPresence, ProbePresence, MUCAvailabilityPresence
from sylk.applications.xmppgateway.xmpp.stanzas import jingle
__all__ = 'DiscoProtocol', 'JingleProtocol', 'MessageProtocol', 'MUCServerProtocol', 'MUCPresenceProtocol', 'PresenceProtocol'
class MessageProtocol(xmppim.MessageProtocol):
messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error'
def _onMessage(self, message):
if message.handled:
return
messageType = message.getAttribute("type")
if messageType not in self.messageTypes:
message["type"] = 'normal'
self.onMessage(message)
def onMessage(self, msg):
notification_center = NotificationCenter()
sender_uri = FrozenURI.parse('xmpp:'+msg['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+msg['to'])
recipient = Identity(recipient_uri)
msg_type = msg.getAttribute('type')
msg_id = msg.getAttribute('id', None)
is_empty = msg.body is None and msg.html is None
if msg_type == 'error':
error_type = msg.error['type']
conditions = [(child.name, child.defaultUri) for child in msg.error.elements()]
error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg_id)
notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=NotificationData(error_message=error_message))
return
if msg_type in (None, 'normal', 'chat') and not is_empty:
body = None
html_body = None
if msg.html is not None:
html_body = msg.html.toXml()
if msg.body is not None:
body = unicode(msg.body)
try:
elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS)
except StopIteration:
use_receipt = False
else:
use_receipt = elem.name == u'request'
if msg_type == 'chat':
message = ChatMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt)
notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=NotificationData(message=message))
else:
message = NormalMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt)
notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=NotificationData(message=message))
return
# Check if it's a composing indication
if msg_type == 'chat' and is_empty:
for elem in msg.elements():
try:
elem = next(c for c in msg.elements() if c.uri == CHATSTATES_NS)
except StopIteration:
pass
else:
composing_indication = ChatComposingIndication(sender, recipient, elem.name, id=msg_id)
notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=NotificationData(composing_indication=composing_indication))
return
# Check if it's a receipt acknowledgement
if is_empty:
try:
elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS)
except StopIteration:
pass
else:
if elem.name == u'received' and msg_id is not None:
receipt = MessageReceipt(sender, recipient, msg_id)
notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=NotificationData(receipt=receipt))
class PresenceProtocol(xmppim.PresenceProtocol):
def availableReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
show = stanza.show
statuses = stanza.statuses
presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id)
NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
def unavailableReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id)
NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
def _process_subscription_stanza(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
type = stanza.element.getAttribute('type')
presence_stanza = SubscriptionPresence(sender, recipient, type, id=id)
NotificationCenter().post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
def subscribedReceived(self, stanza):
self._process_subscription_stanza(stanza)
def unsubscribedReceived(self, stanza):
self._process_subscription_stanza(stanza)
def subscribeReceived(self, stanza):
self._process_subscription_stanza(stanza)
def unsubscribeReceived(self, stanza):
self._process_subscription_stanza(stanza)
def probeReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
presence_stanza = ProbePresence(sender, recipient, id=id)
NotificationCenter().post_notification('XMPPGotPresenceProbe', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
class MUCServerProtocol(xmppim.BasePresenceProtocol):
messageTypes = None, 'normal', 'chat', 'groupchat'
presenceTypeParserMap = {'available': muc.UserPresence,
'unavailable': muc.UserPresence}
def connectionInitialized(self):
self.xmlstream.addObserver('/presence/x[@xmlns="%s"]' % muc.NS_MUC, self._onPresence)
self.xmlstream.addObserver('/message', self._onMessage)
def _onMessage(self, message):
if message.handled:
return
messageType = message.getAttribute("type")
if messageType == 'error':
return
if messageType not in self.messageTypes:
message['type'] = 'normal'
if messageType == 'groupchat':
self.onGroupChat(message)
else:
to_uri = FrozenURI.parse('xmpp:'+message['to'])
if to_uri.host in self.parent.domains:
# Check if it's an invitation
if message.x is not None and message.x.invite is not None and message.x.invite.uri == MUC_USER_NS:
self.onInvitation(message)
else:
# TODO: give error, private messages not supported
pass
def onGroupChat(self, msg):
sender_uri = FrozenURI.parse('xmpp:'+msg['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+msg['to'])
recipient = Identity(recipient_uri)
body = None
html_body = None
+ subject = msg.subject
+
if msg.html is not None:
html_body = msg.html.toXml()
if msg.body is not None:
body = unicode(msg.body)
- message = GroupChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None))
- NotificationCenter().post_notification('XMPPMucGotGroupChat', sender=self.parent, data=NotificationData(message=message))
+
+ if body or html_body:
+ message = GroupChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None))
+ NotificationCenter().post_notification('XMPPMucGotGroupChat', sender=self.parent, data=NotificationData(message=message))
+ elif subject:
+ message = GroupChatSubject(sender, recipient, subject, id=msg.getAttribute('id', None))
+ NotificationCenter().post_notification('XMPPMucGotSubject', sender=self.parent, data=NotificationData(message=message))
def onInvitation(self, msg):
sender_uri = FrozenURI.parse('xmpp:'+msg['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+msg['to'])
recipient = Identity(recipient_uri)
invited_user_uri = FrozenURI.parse('xmpp:'+msg.x.invite['to'])
invited_user = Identity(invited_user_uri)
if msg.x.invite.reason is not None and msg.x.invite.reason.uri == MUC_USER_NS:
reason = unicode(msg.x.invite.reason)
else:
reason = None
invitation = IncomingInvitationMessage(sender, recipient, invited_user=invited_user, reason=reason, id=msg.getAttribute('id', None))
NotificationCenter().post_notification('XMPPMucGotInvitation', sender=self.parent, data=NotificationData(invitation=invitation))
def availableReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
presence_stanza = MUCAvailabilityPresence(sender, recipient, available=True, id=id)
NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
def unavailableReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
presence_stanza = MUCAvailabilityPresence(sender, recipient, available=False, id=id)
NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
class DiscoProtocol(disco.DiscoHandler):
def info(self, requestor, target, nodeIdentifier):
"""
Gather data for a disco info request.
@param requestor: The entity that sent the request.
@type requestor: L{JID<twisted.words.protocols.jabber.jid.JID>}
@param target: The entity the request was sent to.
@type target: L{JID<twisted.words.protocols.jabber.jid.JID>}
@param nodeIdentifier: The optional node being queried, or C{''}.
@type nodeIdentifier: C{unicode}
@return: Deferred with the gathered results from sibling handlers.
@rtype: L{defer.Deferred}
"""
xmpp_manager = self.parent.manager
if target.host not in xmpp_manager.domains | xmpp_manager.muc_domains:
return defer.fail(StanzaError('service-unavailable'))
elements = [disco.DiscoFeature(disco.NS_DISCO_INFO),
disco.DiscoFeature(disco.NS_DISCO_ITEMS),
disco.DiscoFeature('http://sylkserver.com')]
if target.host in xmpp_manager.muc_domains:
elements.append(disco.DiscoIdentity('conference', 'text', 'SylkServer Chat Service'))
elements.append(disco.DiscoFeature('http://jabber.org/protocol/muc'))
elements.append(disco.DiscoFeature('urn:ietf:rfc:3264'))
elements.append(disco.DiscoFeature('urn:xmpp:coin'))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_AUDIO))
#elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_VIDEO))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE_ICE_UDP_TRANSPORT))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE_RAW_UDP_TRANSPORT))
if target.user:
# We can't say much more here, because the actual conference may end up on a different server
elements.append(disco.DiscoFeature('muc_temporary'))
elements.append(disco.DiscoFeature('muc_unmoderated'))
else:
elements.append(disco.DiscoFeature(ping.NS_PING))
if not target.user:
elements.append(disco.DiscoIdentity('gateway', 'simple', 'SylkServer'))
elements.append(disco.DiscoIdentity('server', 'im', 'SylkServer'))
else:
elements.append(disco.DiscoIdentity('client', 'pc'))
elements.append(disco.DiscoFeature('http://jabber.org/protocol/caps'))
elements.append(disco.DiscoFeature('http://jabber.org/protocol/chatstates'))
elements.append(disco.DiscoFeature('urn:ietf:rfc:3264'))
elements.append(disco.DiscoFeature('urn:xmpp:coin'))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_AUDIO))
#elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_VIDEO))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE_ICE_UDP_TRANSPORT))
elements.append(disco.DiscoFeature(jingle.NS_JINGLE_RAW_UDP_TRANSPORT))
return defer.succeed(elements)
def items(self, requestor, target, nodeIdentifier):
"""
Gather data for a disco items request.
@param requestor: The entity that sent the request.
@type requestor: L{JID<twisted.words.protocols.jabber.jid.JID>}
@param target: The entity the request was sent to.
@type target: L{JID<twisted.words.protocols.jabber.jid.JID>}
@param nodeIdentifier: The optional node being queried, or C{''}.
@type nodeIdentifier: C{unicode}
@return: Deferred with the gathered results from sibling handlers.
@rtype: L{defer.Deferred}
"""
xmpp_manager = self.parent.manager
items = []
if not target.user and target.host in xmpp_manager.domains:
items.append(disco.DiscoItem(JID('%s.%s' % (XMPPGatewayConfig.muc_prefix, target.host)), name='Multi-User Chat'))
return defer.succeed(items)
class JingleProtocol(jingle.JingleHandler):
# Functions here need to return immediately so that the IQ result is sent, so schedule them in the reactor
# TODO: review and remove this, just post notifications?
def onSessionInitiate(self, request):
reactor.callLater(0, NotificationCenter().post_notification,
'XMPPGotJingleSessionInitiate',
sender=self.parent,
data=NotificationData(stanza=request, protocol=self))
def onSessionTerminate(self, request):
reactor.callLater(0, NotificationCenter().post_notification,
'XMPPGotJingleSessionTerminate',
sender=self.parent,
data=NotificationData(stanza=request))
def onSessionAccept(self, request):
reactor.callLater(0, NotificationCenter().post_notification,
'XMPPGotJingleSessionAccept',
sender=self.parent,
data=NotificationData(stanza=request))
def onSessionInfo(self, request):
reactor.callLater(0, NotificationCenter().post_notification,
'XMPPGotJingleSessionInfo',
sender=self.parent,
data=NotificationData(stanza=request))
def onDescriptionInfo(self, request):
reactor.callLater(0, NotificationCenter().post_notification,
'XMPPGotJingleDescriptionInfo',
sender=self.parent,
data=NotificationData(stanza=request))
def onTransportInfo(self, request):
reactor.callLater(0, NotificationCenter().post_notification,
'XMPPGotJingleTransportInfo',
sender=self.parent,
data=NotificationData(stanza=request))
class MUCPresenceProtocol(xmppim.PresenceProtocol):
"""Protocol implementation to handle presence subscription to MUC URIs
"""
def subscribeReceived(self, stanza):
"""
Subscription request was received.
"""
self.subscribed(stanza.sender, sender=stanza.recipient)
self.send_available(stanza)
def unsubscribeReceived(self, stanza):
"""
Unsubscription request was received.
"""
self.unsubscribed(stanza.sender, sender=stanza.recipient)
def probeReceived(self, stanza):
"""
Probe presence was received.
"""
self.send_available(stanza)
def send_available(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
available = AvailabilityPresence(sender=recipient, recipient=sender)
self.send(available.to_xml_element())
diff --git a/sylk/applications/xmppgateway/xmpp/session.py b/sylk/applications/xmppgateway/xmpp/session.py
index 458566f..7828150 100644
--- a/sylk/applications/xmppgateway/xmpp/session.py
+++ b/sylk/applications/xmppgateway/xmpp/session.py
@@ -1,215 +1,217 @@
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.descriptor import WriteOnceAttribute
from application.python.types import Singleton
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implements
-from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, MessageReceipt, ErrorStanza, GroupChatMessage, MUCAvailabilityPresence
+from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, MessageReceipt, ErrorStanza, GroupChatMessage, GroupChatSubject, MUCAvailabilityPresence
__all__ = 'XMPPChatSession', 'XMPPChatSessionManager', 'XMPPIncomingMucSession', 'XMPPMucSessionManager'
# Chat sessions
class XMPPChatSession(object):
local_identity = WriteOnceAttribute()
remote_identity = WriteOnceAttribute()
def __init__(self, local_identity, remote_identity):
self.local_identity = local_identity
self.remote_identity = remote_identity
self.state = None
self.pending_receipts = {}
self.channel = coros.queue()
self._proc = None
from sylk.applications.xmppgateway.xmpp import XMPPManager
self.xmpp_manager = XMPPManager()
def start(self):
NotificationCenter().post_notification('XMPPChatSessionDidStart', sender=self)
self._proc = proc.spawn(self._run)
self.state = 'started'
def end(self):
self.send_composing_indication('gone')
self._clear_pending_receipts()
self._proc.kill()
self._proc = None
NotificationCenter().post_notification('XMPPChatSessionDidEnd', sender=self, data=NotificationData(originator='local'))
self.state = 'terminated'
def send_message(self, body, html_body, message_id=None, use_receipt=True):
message = ChatMessage(self.local_identity, self.remote_identity, body, html_body, id=message_id, use_receipt=use_receipt)
self.xmpp_manager.send_stanza(message)
if message_id is not None:
timer = reactor.callLater(30, self._receipt_timer_expired, message_id)
self.pending_receipts[message_id] = timer
NotificationCenter().post_notification('XMPPChatSessionDidSendMessage', sender=self, data=NotificationData(message=message))
def send_composing_indication(self, state, message_id=None, use_receipt=False):
message = ChatComposingIndication(self.local_identity, self.remote_identity, state, id=message_id, use_receipt=use_receipt)
self.xmpp_manager.send_stanza(message)
if message_id is not None:
timer = reactor.callLater(30, self._receipt_timer_expired, message_id)
self.pending_receipts[message_id] = timer
NotificationCenter().post_notification('XMPPChatSessionDidSendMessage', sender=self, data=NotificationData(message=message))
def send_receipt_acknowledgement(self, receipt_id):
message = MessageReceipt(self.local_identity, self.remote_identity, receipt_id)
self.xmpp_manager.send_stanza(message)
def send_error(self, stanza, error_type, conditions):
message = ErrorStanza.from_stanza(stanza, error_type, conditions)
self.xmpp_manager.send_stanza(message)
def _run(self):
notification_center = NotificationCenter()
while True:
item = self.channel.wait()
if isinstance(item, ChatMessage):
notification_center.post_notification('XMPPChatSessionGotMessage', sender=self, data=NotificationData(message=item))
elif isinstance(item, ChatComposingIndication):
if item.state == 'gone':
self._clear_pending_receipts()
notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=NotificationData(originator='remote'))
self.state = 'terminated'
break
else:
notification_center.post_notification('XMPPChatSessionGotComposingIndication', sender=self, data=NotificationData(message=item))
elif isinstance(item, MessageReceipt):
if item.receipt_id in self.pending_receipts:
timer = self.pending_receipts.pop(item.receipt_id)
timer.cancel()
notification_center.post_notification('XMPPChatSessionDidDeliverMessage', sender=self, data=NotificationData(message_id=item.receipt_id))
elif isinstance(item, ErrorStanza):
if item.id in self.pending_receipts:
timer = self.pending_receipts.pop(item.id)
timer.cancel()
# TODO: translate cause
notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=item.id, code=503, reason='Service Unavailable'))
self._proc = None
def _receipt_timer_expired(self, message_id):
self.pending_receipts.pop(message_id)
NotificationCenter().post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=message_id, code=408, reason='Timeout'))
def _clear_pending_receipts(self):
notification_center = NotificationCenter()
while self.pending_receipts:
message_id, timer = self.pending_receipts.popitem()
timer.cancel()
notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=message_id, code=408, reason='Timeout'))
class XMPPChatSessionManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
self.sessions = {}
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, name='XMPPChatSessionDidStart')
notification_center.add_observer(self, name='XMPPChatSessionDidEnd')
def stop(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='XMPPChatSessionDidStart')
notification_center.remove_observer(self, name='XMPPChatSessionDidEnd')
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_XMPPChatSessionDidStart(self, notification):
session = notification.sender
self.sessions[(session.local_identity.uri, session.remote_identity.uri)] = session
def _NH_XMPPChatSessionDidEnd(self, notification):
session = notification.sender
del self.sessions[(session.local_identity.uri, session.remote_identity.uri)]
# MUC sessions
class XMPPIncomingMucSession(object):
local_identity = WriteOnceAttribute()
remote_identity = WriteOnceAttribute()
def __init__(self, local_identity, remote_identity):
self.local_identity = local_identity
self.remote_identity = remote_identity
self.state = None
self.channel = coros.queue()
self._proc = None
from sylk.applications.xmppgateway.xmpp import XMPPManager
self.xmpp_manager = XMPPManager()
def start(self):
NotificationCenter().post_notification('XMPPIncomingMucSessionDidStart', sender=self)
self._proc = proc.spawn(self._run)
self.state = 'started'
def end(self):
self._proc.kill()
self._proc = None
NotificationCenter().post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=NotificationData(originator='local'))
self.state = 'terminated'
def send_message(self, sender, body, html_body, message_id=None):
# TODO: timestamp?
message = GroupChatMessage(sender, self.remote_identity, body, html_body, id=message_id)
self.xmpp_manager.send_muc_stanza(message)
def _run(self):
notification_center = NotificationCenter()
while True:
item = self.channel.wait()
if isinstance(item, GroupChatMessage):
notification_center.post_notification('XMPPIncomingMucSessionGotMessage', sender=self, data=NotificationData(message=item))
+ elif isinstance(item, GroupChatSubject):
+ notification_center.post_notification('XMPPIncomingMucSessionSubject', sender=self, data=NotificationData(message=item))
elif isinstance(item, MUCAvailabilityPresence):
if item.available:
nickname = item.recipient.uri.resource
notification_center.post_notification('XMPPIncomingMucSessionChangedNickname', sender=self, data=NotificationData(stanza=item, nickname=nickname))
else:
notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=NotificationData(originator='local'))
self.state = 'terminated'
break
self._proc = None
class XMPPMucSessionManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
self.incoming = {}
self.outgoing = {}
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, name='XMPPIncomingMucSessionDidStart')
notification_center.add_observer(self, name='XMPPIncomingMucSessionDidEnd')
def stop(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidStart')
notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidEnd')
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_XMPPIncomingMucSessionDidStart(self, notification):
muc = notification.sender
self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)] = muc
def _NH_XMPPIncomingMucSessionDidEnd(self, notification):
muc = notification.sender
del self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)]
diff --git a/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py b/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py
index f250137..c26d864 100644
--- a/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py
+++ b/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py
@@ -1,284 +1,295 @@
import hashlib
from twisted.words.xish import domish
from sylk import __version__ as SYLK_VERSION
from sylk.applications.xmppgateway.util import html2text
CHATSTATES_NS = 'http://jabber.org/protocol/chatstates'
RECEIPTS_NS = 'urn:xmpp:receipts'
STANZAS_NS = 'urn:ietf:params:xml:ns:xmpp-stanzas'
XML_NS = 'http://www.w3.org/XML/1998/namespace'
MUC_NS = 'http://jabber.org/protocol/muc'
MUC_USER_NS = MUC_NS + '#user'
CAPS_NS = 'http://jabber.org/protocol/caps'
SYLK_HASH = hashlib.sha1('SylkServer-%s' % SYLK_VERSION).hexdigest()
SYLK_CAPS = []
class BaseStanza(object):
stanza_type = None # to be defined by subclasses
type = None
def __init__(self, sender, recipient, id=None):
self.sender = sender
self.recipient = recipient
self.id = id
def to_xml_element(self):
xml_element = domish.Element((None, self.stanza_type))
xml_element['from'] = unicode(self.sender.uri.as_xmpp_jid())
xml_element['to'] = unicode(self.recipient.uri.as_xmpp_jid())
if self.type:
xml_element['type'] = self.type
if self.id is not None:
xml_element['id'] = self.id
return xml_element
class ErrorStanza(object):
"""
Stanza representing an error of another stanza. It's not a base stanza type on its own.
"""
def __init__(self, stanza_type, sender, recipient, error_type, conditions, id=None):
self.stanza_type = stanza_type
self.sender = sender
self.recipient = recipient
self.id = id
self.conditions = conditions
self.error_type = error_type
@classmethod
def from_stanza(cls, stanza, error_type, conditions):
# In error stanzas sender and recipient are swapped
return cls(stanza.stanza_type, stanza.recipient, stanza.sender, error_type, conditions, id=stanza.id)
def to_xml_element(self):
xml_element = domish.Element((None, self.stanza_type))
xml_element['from'] = unicode(self.sender.uri.as_xmpp_jid())
xml_element['to'] = unicode(self.recipient.uri.as_xmpp_jid())
xml_element['type'] = 'error'
if self.id is not None:
xml_element['id'] = self.id
error_element = domish.Element((None, 'error'))
error_element['type'] = self.error_type
[error_element.addChild(domish.Element((ns, condition))) for condition, ns in self.conditions]
xml_element.addChild(error_element)
return xml_element
class BaseMessageStanza(BaseStanza):
stanza_type = 'message'
def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=False):
super(BaseMessageStanza, self).__init__(sender, recipient, id=id)
self.use_receipt = use_receipt
if body is not None and html_body is None:
self.body = body
self.html_body = None
elif body is None and html_body is not None:
self.body = html2text(html_body)
self.html_body = html_body
else:
self.body = body
self.html_body = html_body
def to_xml_element(self):
xml_element = super(BaseMessageStanza, self).to_xml_element()
if self.id is not None and self.recipient.uri.resource is not None and self.use_receipt:
xml_element.addElement('request', defaultUri=RECEIPTS_NS)
if self.body is not None:
xml_element.addElement('body', content=self.body)
if self.html_body is not None:
xml_element.addElement('html', content=self.html_body)
return xml_element
class NormalMessage(BaseMessageStanza):
def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=False):
if body is None and html_body is None:
raise ValueError('either body or html_body need to be set')
super(NormalMessage, self).__init__(sender, recipient, body, html_body, id, use_receipt)
class ChatMessage(BaseMessageStanza):
type = 'chat'
def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=True):
if body is None and html_body is None:
raise ValueError('either body or html_body need to be set')
super(ChatMessage, self).__init__(sender, recipient, body, html_body, id, use_receipt)
def to_xml_element(self):
xml_element = super(ChatMessage, self).to_xml_element()
xml_element.addElement('active', defaultUri=CHATSTATES_NS)
return xml_element
class ChatComposingIndication(BaseMessageStanza):
type = 'chat'
def __init__(self, sender, recipient, state, id=None, use_receipt=False):
super(ChatComposingIndication, self).__init__(sender, recipient, id=id, use_receipt=use_receipt)
self.state = state
def to_xml_element(self):
xml_element = super(ChatComposingIndication, self).to_xml_element()
xml_element.addElement(self.state, defaultUri=CHATSTATES_NS)
return xml_element
class GroupChatMessage(BaseMessageStanza):
type = 'groupchat'
def __init__(self, sender, recipient, body=None, html_body=None, id=None):
# TODO: add timestamp
if body is None and html_body is None:
raise ValueError('either body or html_body need to be set')
super(GroupChatMessage, self).__init__(sender, recipient, body, html_body, id, False)
+class GroupChatSubject(BaseMessageStanza):
+ type = 'groupchat'
+ # TODO: add delay
+
+ def __init__(self, sender, recipient, subject, id=None):
+ # TODO: add timestamp
+ if subject is None:
+ raise ValueError('subject need to be set')
+ super(GroupChatSubject, self).__init__(sender, recipient, subject, None, id, False)
+
+
class MessageReceipt(BaseMessageStanza):
def __init__(self, sender, recipient, receipt_id, id=None):
super(MessageReceipt, self).__init__(sender, recipient, id=id, use_receipt=False)
self.receipt_id = receipt_id
def to_xml_element(self):
xml_element = super(MessageReceipt, self).to_xml_element()
receipt_element = domish.Element((RECEIPTS_NS, 'received'))
receipt_element['id'] = self.receipt_id
xml_element.addChild(receipt_element)
return xml_element
class BasePresenceStanza(BaseStanza):
stanza_type = 'presence'
class IncomingInvitationMessage(BaseMessageStanza):
def __init__(self, sender, recipient, invited_user, reason=None, id=None):
super(IncomingInvitationMessage, self).__init__(sender, recipient, body=None, html_body=None, id=id, use_receipt=False)
self.invited_user = invited_user
self.reason = reason
def to_xml_element(self):
xml_element = super(IncomingInvitationMessage, self).to_xml_element()
child = xml_element.addElement((MUC_USER_NS, 'x'))
child.addElement('invite')
child.invite['to'] = unicode(self.invited_user.uri.as_xmpp_jid())
if self.reason:
child.invite.addElement('reason', content=self.reason)
return xml_element
class OutgoingInvitationMessage(BaseMessageStanza):
def __init__(self, sender, recipient, originator, reason=None, id=None):
super(OutgoingInvitationMessage, self).__init__(sender, recipient, body=None, html_body=None, id=id, use_receipt=False)
self.originator = originator
self.reason = reason
def to_xml_element(self):
xml_element = super(OutgoingInvitationMessage, self).to_xml_element()
child = xml_element.addElement((MUC_USER_NS, 'x'))
child.addElement('invite')
child.invite['from'] = unicode(self.originator.uri.as_xmpp_jid())
if self.reason:
child.invite.addElement('reason', content=self.reason)
return xml_element
class AvailabilityPresence(BasePresenceStanza):
def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None):
super(AvailabilityPresence, self).__init__(sender, recipient, id=id)
self.available = available
self.show = show
self.priority = priority
self.statuses = statuses or {}
@property
def available(self):
return self.__dict__['available']
@available.setter
def available(self, available):
if available:
self.type = None
else:
self.type = 'unavailable'
self.__dict__['available'] = available
@property
def status(self):
status = self.statuses.get(None)
if status is None:
try:
status = next(self.statuses.itervalues())
except StopIteration:
pass
return status
def to_xml_element(self):
xml_element = super(BasePresenceStanza, self).to_xml_element()
if self.available:
if self.show is not None:
xml_element.addElement('show', content=self.show)
if self.priority != 0:
xml_element.addElement('priority', content=unicode(self.priority))
caps = xml_element.addElement('c', defaultUri=CAPS_NS)
caps['node'] = 'http://sylkserver.com'
caps['hash'] = 'sha-1'
caps['ver'] = SYLK_HASH
if SYLK_CAPS:
caps['ext'] = ' '.join(SYLK_CAPS)
for lang, text in self.statuses.iteritems():
status = xml_element.addElement('status', content=text)
if lang:
status[(XML_NS, 'lang')] = lang
return xml_element
class SubscriptionPresence(BasePresenceStanza):
def __init__(self, sender, recipient, type, id=None):
super(SubscriptionPresence, self).__init__(sender, recipient, id=id)
self.type = type
class ProbePresence(BasePresenceStanza):
type = 'probe'
class MUCAvailabilityPresence(AvailabilityPresence):
def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None, affiliation=None, jid=None, role=None, muc_statuses=None):
super(MUCAvailabilityPresence, self).__init__(sender, recipient, available, show, statuses, priority, id)
self.affiliation = affiliation or 'member'
self.role = role or 'participant'
self.muc_statuses = muc_statuses or []
self.jid = jid
def to_xml_element(self):
xml_element = super(MUCAvailabilityPresence, self).to_xml_element()
muc = xml_element.addElement('x', defaultUri=MUC_USER_NS)
item = muc.addElement('item')
if self.affiliation:
item['affiliation'] = self.affiliation
if self.role:
item['role'] = self.role
if self.jid:
item['jid'] = unicode(self.jid.uri.as_xmpp_jid())
for code in self.muc_statuses:
status = muc.addElement('status')
status['code'] = code
return xml_element
class MUCErrorPresence(ErrorStanza):
def to_xml_element(self):
xml_element = super(MUCErrorPresence, self).to_xml_element()
xml_element.addElement('x', defaultUri=MUC_USER_NS)
return xml_element

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 3:31 AM (14 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408721
Default Alt Text
(77 KB)

Event Timeline