Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/xmppgateway/__init__.py b/sylk/applications/xmppgateway/__init__.py
index d44aca3..324b72f 100644
--- a/sylk/applications/xmppgateway/__init__.py
+++ b/sylk/applications/xmppgateway/__init__.py
@@ -1,395 +1,497 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
import os
from application.notification import IObserver, NotificationCenter
from application.python import Null
-from sipsimple.core import SIPURI
+from sipsimple.core import SIPURI, SIPCoreError
from sipsimple.payloads import ParserError
from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage
from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError
from sipsimple.threading.green import run_in_green_thread
from zope.interface import implements
from sylk.applications import ISylkApplication, SylkApplication, ApplicationLogger
from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource
from sylk.applications.xmppgateway.im import SIPMessageSender, SIPMessageError, ChatSessionHandler
from sylk.applications.xmppgateway.presence import S2XPresenceHandler, X2SPresenceHandler
-from sylk.applications.xmppgateway.muc import X2SMucHandler
+from sylk.applications.xmppgateway.muc import X2SMucInvitationHandler, S2XMucInvitationHandler, X2SMucHandler
+from sylk.applications.xmppgateway.util import format_uri
from sylk.applications.xmppgateway.xmpp import XMPPManager
from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession
from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, NormalMessage
log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1])
class XMPPGatewayApplication(object):
__metaclass__ = SylkApplication
implements(ISylkApplication, IObserver)
__appname__ = 'xmppgateway'
def __init__(self):
self.xmpp_manager = XMPPManager()
self.pending_sessions = {}
self.chat_sessions = set()
self.s2x_muc_sessions = {}
self.x2s_muc_sessions = {}
self.s2x_presence_subscriptions = {}
self.x2s_presence_subscriptions = {}
+ self.s2x_muc_add_participant_handlers = {}
+ self.x2s_muc_add_participant_handlers = {}
def start(self):
NotificationCenter().add_observer(self, sender=self.xmpp_manager)
self.xmpp_manager.start()
def stop(self):
NotificationCenter().remove_observer(self, sender=self.xmpp_manager)
self.xmpp_manager.stop()
def incoming_session(self, session):
log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri))
try:
msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next()
except StopIteration:
log.msg('Session rejected: Only MSRP media is supported')
session.reject(488, 'Only MSRP media is supported')
return
+ # Check if this session is really an invitation to add a participant to a conference room / muc
+ if session.remote_identity.uri.host in self.xmpp_manager.muc_domains and 'isfocus' in session._invitation.remote_contact_header.parameters:
+ try:
+ referred_by_uri = SIPURI.parse(session.transfer_info.referred_by)
+ except SIPCoreError:
+ log.msg("SIP multiparty session invitation %s failed: invalid Referred-By header" % session._invitation.call_id)
+ session.reject(488)
+ return
+ muc_uri = FrozenURI(session.remote_identity.uri.user, session.remote_identity.uri.host)
+ inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host)
+ recipient_uri = FrozenURI(session.local_identity.uri.user, session.local_identity.uri.host)
+ sender = Identity(muc_uri)
+ recipient = Identity(recipient_uri)
+ inviter = Identity(inviter_uri)
+ try:
+ handler = self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)]
+ except KeyError:
+ handler = S2XMucInvitationHandler(session, sender, recipient, inviter)
+ self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] = handler
+ NotificationCenter().add_observer(self, sender=handler)
+ handler.start()
+ else:
+ log.msg("SIP multiparty session invitation %s failed: there is another invitation in progress from %s to %s" % (session._invitation.call_id,
+ format_uri(inviter_uri, 'sip'),
+ format_uri(recipient_uri, 'xmpp')))
+ session.reject(480)
+ return
+
# Check domain
if session.remote_identity.uri.host not in XMPPGatewayConfig.domains:
log.msg('Session rejected: From domain is not a local XMPP domain')
session.reject(606, 'Not Acceptable')
return
# Get URI representing the SIP side
contact_uri = session._invitation.remote_contact_header.uri
if contact_uri.parameters.get('gr') is not None:
sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr'))
else:
tmp = session.remote_identity.uri
sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource())
# Get URI representing the XMPP side
request_uri = session._invitation.request_uri
remote_resource = request_uri.parameters.get('gr', None)
if remote_resource is not None:
try:
remote_resource = decode_resource(remote_resource)
except (TypeError, UnicodeError):
pass
xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource)
try:
handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
pass
else:
# There is another pending session with same identifiers, can't accept this one
log.msg('Session rejected: other session with same identifiers in progress')
session.reject(488)
return
sip_identity = Identity(sip_leg_uri, session.remote_identity.display_name)
handler = ChatSessionHandler.new_from_sip_session(sip_identity, session)
NotificationCenter().add_observer(self, sender=handler)
key = (sip_leg_uri, xmpp_leg_uri)
self.pending_sessions[key] = handler
if xmpp_leg_uri.resource is not None:
# Incoming session target contained GRUU, so create XMPPChatSession immediately
xmpp_session = XMPPChatSession(local_identity=handler.sip_identity, remote_identity=Identity(xmpp_leg_uri))
handler.xmpp_identity = xmpp_session.remote_identity
handler.xmpp_session = xmpp_session
def incoming_subscription(self, subscribe_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
if Null in (from_header, to_header):
subscribe_request.reject(400)
return
log.msg('New subscription from %s to %s' % (from_header.uri, to_header.uri))
if subscribe_request.event != 'presence':
log.msg('Subscription rejected: only presence event is supported')
subscribe_request.reject(489)
return
# Check domain
remote_identity_uri = data.headers['From'].uri
if remote_identity_uri.host not in XMPPGatewayConfig.domains:
log.msg('Subscription rejected: From domain is not a local XMPP domain')
subscribe_request.reject(606)
return
# Get URI representing the SIP side
sip_leg_uri = FrozenURI(remote_identity_uri.user, remote_identity_uri.host)
# Get URI representing the XMPP side
request_uri = data.request_uri
xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host)
try:
handler = self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
sip_identity = Identity(sip_leg_uri, data.headers['From'].display_name)
xmpp_identity = Identity(xmpp_leg_uri)
handler = S2XPresenceHandler(sip_identity, xmpp_identity)
NotificationCenter().add_observer(self, sender=handler)
handler.start()
handler.add_sip_subscription(subscribe_request)
def incoming_referral(self, refer_request, data):
refer_request.reject(405)
def incoming_sip_message(self, message_request, data):
content_type = data.headers.get('Content-Type', Null).content_type
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
if Null in (content_type, from_header, to_header):
message_request.answer(400)
return
log.msg('New SIP Message from %s to %s' % (from_header.uri, to_header.uri))
# Check domain
if from_header.uri.host not in XMPPGatewayConfig.domains:
log.msg('Message rejected: From domain is not a local XMPP domain')
message_request.answer(606)
return
if content_type == 'message/cpim':
try:
cpim_message = CPIMMessage.parse(data.body)
except CPIMParserError:
log.msg('Message rejected: CPIM parse error')
message_request.answer(400)
return
else:
body = cpim_message.body
content_type = cpim_message.content_type
sender = cpim_message.sender or from_header
from_uri = sender.uri
else:
body = data.body
from_uri = from_header.uri
to_uri = str(to_header.uri)
message_request.answer(200)
if from_uri.parameters.get('gr', None) is None:
from_uri = SIPURI.new(from_uri)
from_uri.parameters['gr'] = generate_sylk_resource()
sender = Identity(FrozenURI.parse(from_uri))
recipient = Identity(FrozenURI.parse(to_uri))
if content_type in ('text/plain', 'text/html'):
if content_type == 'text/plain':
html_body = None
else:
html_body = body
body = None
if XMPPGatewayConfig.use_msrp_for_chat:
message = NormalMessage(sender, recipient, body, html_body, use_receipt=False)
self.xmpp_manager.send_stanza(message)
else:
message = ChatMessage(sender, recipient, body, html_body, use_receipt=False)
self.xmpp_manager.send_stanza(message)
elif content_type == IsComposingDocument.content_type:
if not XMPPGatewayConfig.use_msrp_for_chat:
try:
msg = IsComposingMessage.parse(body)
except ParserError:
pass
else:
state = 'composing' if msg.state == 'active' else 'paused'
message = ChatComposingIndication(sender, recipient, state, use_receipt=False)
self.xmpp_manager.send_stanza(message)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
# Out of band XMPP stanza handling
@run_in_green_thread
def _NH_XMPPGotChatMessage(self, notification):
# This notification is only processed here untill the ChatSessionHandler
# has both (SIP and XMPP) sessions established
message = notification.data.message
sender = message.sender
recipient = message.recipient
if XMPPGatewayConfig.use_msrp_for_chat:
if recipient.uri.resource is None:
# If recipient resource is not set the session is started from
# the XMPP side
sip_leg_uri = FrozenURI.new(recipient.uri)
xmpp_leg_uri = FrozenURI.new(sender.uri)
try:
handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
# Not found, need to create a new handler and a outgoing SIP session
xmpp_identity = Identity(xmpp_leg_uri)
handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri)
key = (sip_leg_uri, xmpp_leg_uri)
self.pending_sessions[key] = handler
NotificationCenter().add_observer(self, sender=handler)
handler.enqueue_xmpp_message(message)
else:
# Find handler pending XMPP confirmation
sip_leg_uri = FrozenURI.new(recipient.uri)
xmpp_leg_uri = FrozenURI(sender.uri.user, sender.uri.host)
try:
handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
# Find handler pending XMPP confirmation
sip_leg_uri = FrozenURI(recipient.uri.user, recipient.uri.host)
xmpp_leg_uri = FrozenURI.new(sender.uri)
try:
handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
# It's a new XMPP session to a full JID, disregard the full JID and start a new SIP session to the bare JID
xmpp_identity = Identity(xmpp_leg_uri)
handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri)
key = (sip_leg_uri, xmpp_leg_uri)
self.pending_sessions[key] = handler
NotificationCenter().add_observer(self, sender=handler)
handler.enqueue_xmpp_message(message)
else:
# Found handle, create XMPP session and establish session
session = XMPPChatSession(local_identity=recipient, remote_identity=sender)
handler.enqueue_xmpp_message(message)
handler.xmpp_identity = session.remote_identity
handler.xmpp_session = session
else:
sip_message_sender = SIPMessageSender(message)
try:
sip_message_sender.send().wait()
except SIPMessageError as e:
# TODO report back an error stanza
log.error('Error sending SIP Message: %s' % e)
@run_in_green_thread
def _NH_XMPPGotNormalMessage(self, notification):
message = notification.data.message
sip_message_sender = SIPMessageSender(message)
try:
sip_message_sender.send().wait()
except SIPMessageError as e:
# TODO report back an error stanza
log.error('Error sending SIP Message: %s' % e)
@run_in_green_thread
def _NH_XMPPGotComposingIndication(self, notification):
composing_indication = notification.data.composing_indication
sender = composing_indication.sender
recipient = composing_indication.recipient
if not XMPPGatewayConfig.use_msrp_for_chat:
state = 'active' if composing_indication.state == 'composing' else 'idle'
body = IsComposingMessage(state=state, refresh=composing_indication.interval or 30).toxml()
message = NormalMessage(sender, recipient, body, IsComposingDocument.content_type)
sip_message_sender = SIPMessageSender(message)
try:
sip_message_sender.send().wait()
except SIPMessageError as e:
# TODO report back an error stanza
log.error('Error sending SIP Message: %s' % e)
def _NH_XMPPGotPresenceSubscriptionRequest(self, notification):
stanza = notification.data.stanza
# Disregard the resource part, the presence request could be a probe instead of a subscribe
sender_uri = stanza.sender.uri
sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host)
try:
handler = self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)]
except KeyError:
xmpp_identity = stanza.sender
xmpp_identity.uri = sender_uri_bare
sip_identity = stanza.recipient
handler = X2SPresenceHandler(sip_identity, xmpp_identity)
notification.center.add_observer(self, sender=handler)
handler.start()
def _NH_XMPPGotMucJoinRequest(self, notification):
stanza = notification.data.stanza
muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host)
nickname = stanza.recipient.uri.resource
try:
handler = self.x2s_muc_sessions[(stanza.sender.uri, muc_uri)]
except KeyError:
xmpp_identity = stanza.sender
sip_identity = stanza.recipient
sip_identity.uri = muc_uri
handler = X2SMucHandler(sip_identity, xmpp_identity, nickname)
handler._first_stanza = stanza
notification.center.add_observer(self, sender=handler)
handler.start()
-
- def _NH_XMPPGotMucLeaveRequest(self, notification):
- # TODO: give error?
- pass
+ # Check if there was a pending join request on the SIP side
+ try:
+ handler = self.s2x_muc_add_participant_handlers[(muc_uri, FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host))]
+ except KeyError:
+ pass
+ else:
+ handler.stop()
+
+ def _NH_XMPPGotMucAddParticipantRequest(self, notification):
+ sender = notification.data.sender
+ recipient = notification.data.recipient
+ participant = notification.data.participant
+ muc_uri = FrozenURI(recipient.uri.user, recipient.uri.host)
+ sender_uri = FrozenURI(sender.uri.user, sender.uri.host)
+ participant_uri = FrozenURI(participant.uri.user, participant.uri.host)
+ sender = Identity(sender_uri)
+ recipient = Identity(muc_uri)
+ participant = Identity(participant_uri)
+ try:
+ handler = self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)]
+ except KeyError:
+ handler = X2SMucInvitationHandler(sender, recipient, participant)
+ self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] = handler
+ notification.center.add_observer(self, sender=handler)
+ handler.start()
# Chat session handling
def _NH_ChatSessionDidStart(self, notification):
handler = notification.sender
log.msg('Chat session established sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri))
for k,v in self.pending_sessions.items():
if v is handler:
del self.pending_sessions[k]
break
self.chat_sessions.add(handler)
def _NH_ChatSessionDidEnd(self, notification):
handler = notification.sender
log.msg('Chat session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri))
self.chat_sessions.remove(handler)
notification.center.remove_observer(self, sender=handler)
def _NH_ChatSessionDidFail(self, notification):
handler = notification.sender
uris = None
for k,v in self.pending_sessions.items():
if v is handler:
uris = k
del self.pending_sessions[k]
break
sip_uri, xmpp_uri = uris
log.msg('Chat session failed sip:%s <--> xmpp:%s (%s)' % (sip_uri, xmpp_uri, notification.data.reason))
notification.center.remove_observer(self, sender=handler)
# Presence handling
def _NH_S2XPresenceHandlerDidStart(self, notification):
handler = notification.sender
log.msg('Presence subscription established sip:%s --> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri))
self.s2x_presence_subscriptions[(handler.sip_identity.uri, handler.xmpp_identity.uri)] = handler
def _NH_S2XPresenceHandlerDidEnd(self, notification):
handler = notification.sender
log.msg('Presence subscription ended sip:%s --> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri))
self.s2x_presence_subscriptions.pop((handler.sip_identity.uri, handler.xmpp_identity.uri), None)
notification.center.remove_observer(self, sender=handler)
def _NH_X2SPresenceHandlerDidStart(self, notification):
handler = notification.sender
log.msg('Presence subscription established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri))
self.x2s_presence_subscriptions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler
def _NH_X2SPresenceHandlerDidEnd(self, notification):
handler = notification.sender
log.msg('Presence subscription ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri))
self.x2s_presence_subscriptions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None)
notification.center.remove_observer(self, sender=handler)
# MUC handling
def _NH_X2SMucHandlerDidStart(self, notification):
handler = notification.sender
log.msg('Multiparty session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri))
self.x2s_muc_sessions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler
def _NH_X2SMucHandlerDidEnd(self, notification):
handler = notification.sender
log.msg('Multiparty session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri))
self.x2s_muc_sessions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None)
notification.center.remove_observer(self, sender=handler)
+ def _NH_X2SMucInvitationHandlerDidStart(self, notification):
+ handler = notification.sender
+ sender_uri = handler.sender.uri
+ muc_uri = handler.recipient.uri
+ participant_uri = handler.participant.uri
+ log.msg('%s invited %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip')))
+
+ def _NH_X2SMucInvitationHandlerDidEnd(self, notification):
+ handler = notification.sender
+ sender_uri = handler.sender.uri
+ muc_uri = handler.recipient.uri
+ participant_uri = handler.participant.uri
+ log.msg('%s added %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip')))
+ del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)]
+ notification.center.remove_observer(self, sender=handler)
+
+ def _NH_X2SMucInvitationHandlerDidFail(self, notification):
+ handler = notification.sender
+ sender_uri = handler.sender.uri
+ muc_uri = handler.recipient.uri
+ participant_uri = handler.participant.uri
+ log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'), notification.data.failure))
+ del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)]
+ notification.center.remove_observer(self, sender=handler)
+
+ def _NH_S2XMucInvitationHandlerDidStart(self, notification):
+ handler = notification.sender
+ muc_uri = handler.sender.uri
+ inviter_uri = handler.inviter.uri
+ recipient_uri = handler.recipient.uri
+ log.msg("%s invited %s to multiparty chat %s" % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip')))
+
+ def _NH_S2XMucInvitationHandlerDidEnd(self, notification):
+ handler = notification.sender
+ muc_uri = handler.sender.uri
+ inviter_uri = handler.inviter.uri
+ recipient_uri = handler.recipient.uri
+ log.msg('%s added %s to multiparty chat %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip')))
+ del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)]
+ notification.center.remove_observer(self, sender=handler)
+
+ def _NH_S2XMucInvitationHandlerDidFail(self, notification):
+ handler = notification.sender
+ muc_uri = handler.sender.uri
+ inviter_uri = handler.inviter.uri
+ recipient_uri = handler.recipient.uri
+ log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'), str(notification.data.failure)))
+ del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)]
+ notification.center.remove_observer(self, sender=handler)
+
diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py
index b063a45..865564b 100644
--- a/sylk/applications/xmppgateway/muc.py
+++ b/sylk/applications/xmppgateway/muc.py
@@ -1,247 +1,494 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
import os
+import random
import uuid
-from application.notification import IObserver, NotificationCenter
-from application.python import Null
+from application.notification import IObserver, NotificationCenter, NotificationData
+from application.python import Null, limit
from application.python.descriptor import WriteOnceAttribute
-from sipsimple.account import AccountManager
+from eventlib import coros, proc
+from sipsimple.account import AccountManager, BonjourAccount
from sipsimple.configuration.settings import SIPSimpleSettings
-from sipsimple.core import SIPURI
-from sipsimple.core import ContactHeader, FromHeader, ToHeader
+from sipsimple.core import Engine, SIPURI, SIPCoreError, Referral, sip_status_messages
+from sipsimple.core import ContactHeader, FromHeader, ToHeader, ReferToHeader, RouteHeader
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.streams.msrp import ChatStreamError
from sipsimple.streams.applications.chat import CPIMIdentity
+from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
+from time import time
+from twisted.internet import reactor
from zope.interface import implements
from sylk.applications import ApplicationLogger
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource
from sylk.applications.xmppgateway.xmpp import XMPPManager
from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession
-from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, STANZAS_NS
+from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, OutgoingInvitationMessage, STANZAS_NS
+from sylk.configuration import SIPConfig
from sylk.extensions import ChatStream
from sylk.session import Session
log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1])
+class ReferralError(Exception):
+ def __init__(self, error, code=0):
+ self.error = error
+ self.code = code
+
+
+class SIPReferralDidFail(Exception):
+ def __init__(self, data):
+ self.data = data
+
+
+class MucInvitationFailure(object):
+ def __init__(self, code, reason):
+ self.code = code
+ self.reason = reason
+ def __str__(self):
+ return '%s (%s)' % (self.code, self.reason)
+
+
+class X2SMucInvitationHandler(object):
+ implements(IObserver)
+
+ def __init__(self, sender, recipient, participant):
+ self.sender = sender
+ self.recipient = recipient
+ self.participant = participant
+ self.active = False
+ self.route = None
+ self._channel = coros.queue()
+ self._referral = None
+ self._wakeup_timer = None
+ self._failure = None
+
+ def start(self):
+ notification_center = NotificationCenter()
+ notification_center.add_observer(self, name='DNSNameserversDidChange')
+ notification_center.add_observer(self, name='SystemIPAddressDidChange')
+ notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
+ proc.spawn(self._run)
+ notification_center.post_notification('X2SMucInvitationHandlerDidStart', sender=self)
+
+ def _run(self):
+ notification_center = NotificationCenter()
+ settings = SIPSimpleSettings()
+
+ sender_uri = self.sender.uri.as_sip_uri()
+ recipient_uri = self.recipient.uri.as_sip_uri()
+ participant_uri = self.participant.uri.as_sip_uri()
+
+ try:
+ # Lookup routes
+ account = AccountManager().sylkserver_account
+ if account is BonjourAccount():
+ raise ReferralError(error='Bonjour account is not supported')
+ elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
+ uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
+ elif account.sip.always_use_my_proxy:
+ uri = SIPURI(host=account.id.domain)
+ else:
+ uri = SIPURI.new(recipient_uri)
+ lookup = DNSLookup()
+ try:
+ routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
+ except DNSLookupError, e:
+ timeout = random.uniform(15, 30)
+ raise ReferralError(error='DNS lookup failed: %s' % e)
+
+ timeout = time() + 30
+ for route in routes:
+ self.route = route
+ remaining_time = timeout - time()
+ if remaining_time > 0:
+ transport = route.transport
+ parameters = {} if transport=='udp' else {'transport': transport}
+ contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
+ refer_to_header = ReferToHeader(str(participant_uri))
+ refer_to_header.parameters['method'] = 'INVITE'
+ referral = Referral(recipient_uri, FromHeader(sender_uri),
+ ToHeader(recipient_uri),
+ refer_to_header,
+ ContactHeader(contact_uri),
+ RouteHeader(route.uri),
+ account.credentials)
+ notification_center.add_observer(self, sender=referral)
+ try:
+ referral.send_refer(timeout=limit(remaining_time, min=1, max=5))
+ except SIPCoreError:
+ notification_center.remove_observer(self, sender=referral)
+ timeout = 5
+ raise ReferralError(error='Internal error')
+ self._referral = referral
+ try:
+ while True:
+ notification = self._channel.wait()
+ if notification.name == 'SIPReferralDidStart':
+ break
+ except SIPReferralDidFail, e:
+ notification_center.remove_observer(self, sender=referral)
+ self._referral = None
+ if e.data.code in (403, 405):
+ raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code)
+ else:
+ # Otherwise just try the next route
+ continue
+ else:
+ break
+ else:
+ self.route = None
+ raise ReferralError(error='No more routes to try')
+ # At this point it is subscribed. Handle notifications and ending/failures.
+ try:
+ self.active = True
+ while True:
+ notification = self._channel.wait()
+ if notification.name == 'SIPReferralDidEnd':
+ break
+ except SIPReferralDidFail, e:
+ notification_center.remove_observer(self, sender=self._referral)
+ raise ReferralError(error=e.data.reason, code=e.data.code)
+ else:
+ notification_center.remove_observer(self, sender=self._referral)
+ finally:
+ self.active = False
+ except ReferralError, e:
+ self._failure = MucInvitationFailure(e.code, e.error)
+ finally:
+ if self._wakeup_timer is not None and self._wakeup_timer.active():
+ self._wakeup_timer.cancel()
+ self._wakeup_timer = None
+ notification_center.remove_observer(self, name='DNSNameserversDidChange')
+ notification_center.remove_observer(self, name='SystemIPAddressDidChange')
+ notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep')
+ self._referral = None
+ if self._failure is not None:
+ notification_center.post_notification('X2SMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure))
+ else:
+ notification_center.post_notification('X2SMucInvitationHandlerDidEnd', sender=self)
+
+ def _refresh(self):
+ account = AccountManager().sylkserver_account
+ transport = self.route.transport
+ parameters = {} if transport=='udp' else {'transport': transport}
+ contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
+ contact_header = ContactHeader(contact_uri)
+ self._referral.refresh(contact_header=contact_header, timeout=2)
+
+ @run_in_twisted_thread
+ def handle_notification(self, notification):
+ handler = getattr(self, '_NH_%s' % notification.name, Null)
+ handler(notification)
+
+ def _NH_SIPReferralDidStart(self, notification):
+ self._channel.send(notification)
+
+ def _NH_SIPReferralDidEnd(self, notification):
+ self._channel.send(notification)
+
+ def _NH_SIPReferralDidFail(self, notification):
+ self._channel.send_exception(SIPReferralDidFail(notification.data))
+
+ def _NH_SIPReferralGotNotify(self, notification):
+ self._channel.send(notification)
+
+ def _NH_DNSNameserversDidChange(self, notification):
+ if self.active:
+ self._refresh()
+
+ def _NH_SystemIPAddressDidChange(self, notification):
+ if self.active:
+ self._refresh()
+
+ def _NH_SystemDidWakeUpFromSleep(self, notification):
+ if self._wakeup_timer is None:
+ def wakeup_action():
+ if self.active:
+ self._refresh()
+ self._wakeup_timer = None
+ self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
+
+
+class S2XMucInvitationHandler(object):
+ implements(IObserver)
+
+ def __init__(self, session, sender, recipient, inviter):
+ self.session = session
+ self.sender = sender
+ self.recipient = recipient
+ self.inviter = inviter
+ self._timer = None
+ self._failure = None
+
+ def start(self):
+ notification_center = NotificationCenter()
+ notification_center.add_observer(self, sender=self.session)
+ stanza = OutgoingInvitationMessage(self.sender, self.recipient, self.inviter, id='MUC.'+uuid.uuid4().hex)
+ xmpp_manager = XMPPManager()
+ xmpp_manager.send_muc_stanza(stanza)
+ self._timer = reactor.callLater(90, self._timeout)
+ notification_center.post_notification('S2XMucInvitationHandlerDidStart', sender=self)
+
+ def stop(self):
+ if self._timer is not None and self._timer.active():
+ self._timer.cancel()
+ self._timer = None
+ notification_center = NotificationCenter()
+ if self.session is not None:
+ notification_center.remove_observer(self, sender=self.session)
+ reactor.callLater(5, self._end_session, self.session)
+ self.session = None
+ if self._failure is not None:
+ notification_center.post_notification('S2XMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure))
+ else:
+ notification_center.post_notification('S2XMucInvitationHandlerDidEnd', sender=self)
+
+ def _end_session(self, session):
+ try:
+ session.end(480)
+ except Exception:
+ pass
+
+ def _timeout(self):
+ NotificationCenter().remove_observer(self, sender=self.session)
+ try:
+ self.session.end(408)
+ except Exception:
+ pass
+ self.session = None
+ self._failure = MucInvitationFailure('Timeout', 408)
+ self.stop()
+
+ def handle_notification(self, notification):
+ handler = getattr(self, '_NH_%s' % notification.name, Null)
+ handler(notification)
+
+ def _NH_SIPSessionDidFail(self, notification):
+ notification.center.remove_observer(self, sender=self.session)
+ self.session = None
+ self._failure = MucInvitationFailure(notification.data.reason or notification.data.failure_reason, notification.data.code)
+ self.stop()
+
+
class X2SMucHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity, nickname):
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
self.nickname = nickname
self._xmpp_muc_session = None
self._sip_session = None
self._msrp_stream = None
self._first_stanza = None
self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza
self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza
self._participants = set() # set of (URI, nickname) tuples
self.ended = False
def start(self):
notification_center = NotificationCenter()
self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_muc_session)
self._xmpp_muc_session.start()
notification_center.post_notification('X2SMucHandlerDidStart', sender=self)
self._start_sip_session()
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_muc_session is not None:
notification_center.remove_observer(self, sender=self._xmpp_muc_session)
# Send indication that the user has been kicked from the room
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False)
stanza.jid = self.xmpp_identity
stanza.muc_statuses.append('307')
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
self._xmpp_muc_session.end()
self._xmpp_muc_session = None
if self._sip_session is not None:
notification_center.remove_observer(self, sender=self._sip_session)
self._sip_session.end()
self._sip_session = None
self.ended = True
notification_center.post_notification('X2SMucHandlerDidEnd', sender=self)
@run_in_green_thread
def _start_sip_session(self):
# self.xmpp_identity is our local identity
from_uri = self.xmpp_identity.uri.as_sip_uri()
del from_uri.parameters['gr'] # no GRUU in From header
contact_uri = self.xmpp_identity.uri.as_sip_uri()
contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8'))
to_uri = self.sip_identity.uri.as_sip_uri()
lookup = DNSLookup()
settings = SIPSimpleSettings()
account = AccountManager().sylkserver_account
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = to_uri
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError:
log.warning('DNS lookup error while looking for %s proxy' % uri)
self.end()
return
self._msrp_stream = ChatStream()
route = routes.pop(0)
from_header = FromHeader(from_uri)
to_header = ToHeader(to_uri)
contact_header = ContactHeader(contact_uri)
self._sip_session = Session(account)
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._sip_session)
notification_center.add_observer(self, sender=self._msrp_stream)
self._sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self._msrp_stream])
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
log.msg("SIP multiparty session %s started" % notification.sender._invitation.call_id)
if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed:
self.end()
return
message_id = self._msrp_stream.set_local_nickname(self.nickname)
self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza)
self._first_stanza = None
def _NH_SIPSessionDidEnd(self, notification):
log.msg("SIP multiparty session %s ended" % notification.sender._invitation.call_id)
notification.center.remove_observer(self, sender=self._sip_session)
notification.center.remove_observer(self, sender=self._msrp_stream)
self._sip_session = None
self._msrp_stream = None
self.end()
def _NH_SIPSessionDidFail(self, notification):
log.msg("SIP multiparty session %s failed" % notification.sender._invitation.call_id)
notification.center.remove_observer(self, sender=self._sip_session)
notification.center.remove_observer(self, sender=self._msrp_stream)
self._sip_session = None
self._msrp_stream = None
self.end()
def _NH_SIPSessionGotProposal(self, notification):
self._sip_session.reject_proposal()
def _NH_SIPSessionTransferNewIncoming(self, notification):
self._sip_session.reject_transfer(403)
def _NH_SIPSessionGotConferenceInfo(self, notification):
# Translate to XMPP payload
xmpp_manager = XMPPManager()
own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host)
conference_info = notification.data.conference_info
new_participants = set()
for user in conference_info.users:
user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity)
nickname = user.display_text.value if user.display_text else user.entity
new_participants.add((user_uri, nickname))
# Remove participants that are no longer in the room
for uri, nickname in self._participants - new_participants:
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False)
xmpp_manager.send_muc_stanza(stanza)
# Send presence for current participants
for uri, nickname in new_participants:
if uri == own_uri:
continue
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True)
stanza.jid = Identity(uri)
xmpp_manager.send_muc_stanza(stanza)
self._participants = new_participants
# Send own status last
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True)
stanza.jid = self.xmpp_identity
stanza.muc_statuses.append('110')
xmpp_manager.send_muc_stanza(stanza)
def _NH_ChatStreamGotMessage(self, notification):
# Notification is sent by the MSRP stream
if not self._xmpp_muc_session:
return
message = notification.data.message
content_type = message.content_type.lower()
if content_type not in ('text/plain', 'text/html'):
return
if content_type == 'text/plain':
html_body = None
body = message.body
else:
html_body = message.body
body = None
resource = message.sender.display_name or str(message.sender.uri)
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource))
- self._xmpp_muc_session.send_message(sender, body, html_body, message_id=str(uuid.uuid4()))
+ self._xmpp_muc_session.send_message(sender, body, html_body, message_id='MUC.'+uuid.uuid4().hex)
self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
def _NH_ChatStreamDidSetNickname(self, notification):
# Notification is sent by the MSRP stream
nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id)
self.nickname = nickname
def _NH_ChatStreamDidNotSetNickname(self, notification):
# Notification is sent by the MSRP stream
nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id)
error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', [('conflict', STANZAS_NS)])
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(error_stanza)
def _NH_ChatStreamDidDeliverMessage(self, notification):
# Echo back the message to the sender
stanza = self._pending_messages_map.pop(notification.data.message_id)
stanza.sender, stanza.recipient = stanza.recipient, stanza.sender
stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname)
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
def _NH_ChatStreamDidNotDeliverMessage(self, notification):
self._pending_messages_map.pop(notification.data.message_id)
def _NH_XMPPIncomingMucSessionDidEnd(self, notification):
notification.center.remove_observer(self, sender=self._xmpp_muc_session)
self._xmpp_muc_session = None
self.end()
def _NH_XMPPIncomingMucSessionGotMessage(self, notification):
if not self._sip_session:
return
message = notification.data.message
sender_uri = message.sender.uri.as_sip_uri()
del sender_uri.parameters['gr'] # no GRUU in CPIM From header
sender = CPIMIdentity(sender_uri, display_name=self.nickname)
message_id = self._msrp_stream.send_message(message.body, 'text/plain', local_identity=sender)
self._pending_messages_map[message_id] = message
# Message will be echoed back to the sender on ChatStreamDidDeliverMessage
def _NH_XMPPIncomingMucSessionChangedNickname(self, notification):
if not self._sip_session:
return
nickname = notification.data.nickname
try:
message_id = self._msrp_stream.set_local_nickname(nickname)
except ChatStreamError:
return
self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza)
diff --git a/sylk/applications/xmppgateway/util.py b/sylk/applications/xmppgateway/util.py
index 55d7fd9..cac0927 100644
--- a/sylk/applications/xmppgateway/util.py
+++ b/sylk/applications/xmppgateway/util.py
@@ -1,31 +1,36 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
from cStringIO import StringIO
from formatter import AbstractFormatter, DumbWriter
from htmllib import HTMLParser, HTMLParseError
-__all__ = ['html2text', 'text2html']
+__all__ = ['html2text', 'text2html', 'format_uri']
def html2text(data):
# Based on http://stackoverflow.com/questions/328356/extracting-text-from-html-file-using-python
f = StringIO()
parser = HTMLParser(AbstractFormatter(DumbWriter(f)))
try:
parser.feed(data)
except HTMLParseError:
return ''
else:
parser.close()
return f.getvalue()
+
xhtml_im_template = """<html xmlns='http://jabber.org/protocol/xhtml-im'>
<body xmlns='http://www.w3.org/1999/xhtml'>
%(data)s
</body>
</html>"""
def text2html(data):
return xhtml_im_template % {'data': data}
+
+def format_uri(uri, scheme=''):
+ return '%s%s@%s' % ('' if not scheme else scheme+':', uri.user, uri.host)
+
diff --git a/sylk/applications/xmppgateway/xmpp/__init__.py b/sylk/applications/xmppgateway/xmpp/__init__.py
index 56a3e75..e14ed78 100644
--- a/sylk/applications/xmppgateway/xmpp/__init__.py
+++ b/sylk/applications/xmppgateway/xmpp/__init__.py
@@ -1,343 +1,348 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
import os
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from sipsimple.util import ISOTimestamp
from twisted.internet import reactor
from twisted.words.protocols.jabber.error import StanzaError
from twisted.words.protocols.jabber.jid import JID, internJID
from wokkel import disco
from wokkel.component import InternalComponent, Router as _Router
from wokkel.server import ServerService, XMPPS2SServerFactory, DeferredS2SClientFactory
from zope.interface import implements
from sylk.applications import ApplicationLogger
from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig
from sylk.applications.xmppgateway.datatypes import FrozenURI
from sylk.applications.xmppgateway.xmpp.logger import Logger as XMPPLogger
from sylk.applications.xmppgateway.xmpp.protocols import DiscoProtocol, MessageProtocol, MUCServerProtocol, PresenceProtocol
from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager, XMPPMucSessionManager
from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscriptionManager
log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1])
xmpp_logger = XMPPLogger()
# Utility classes
class Router(_Router):
def route(self, stanza):
"""
Route a stanza. (subclassed to avoid vebose logging)
@param stanza: The stanza to be routed.
@type stanza: L{domish.Element}.
"""
destination = internJID(stanza['to'])
if destination.host in self.routes:
self.routes[destination.host].send(stanza)
else:
self.routes[None].send(stanza)
class XMPPS2SServerFactory(XMPPS2SServerFactory):
def onConnectionMade(self, xs):
super(self.__class__, self).onConnectionMade(xs)
def logDataIn(buf):
buf = buf.strip()
if buf:
xmpp_logger.msg("RECEIVED", ISOTimestamp.now(), buf)
def logDataOut(buf):
buf = buf.strip()
if buf:
xmpp_logger.msg("SENDING", ISOTimestamp.now(), buf)
if XMPPGatewayConfig.trace_xmpp:
xs.rawDataInFn = logDataIn
xs.rawDataOutFn = logDataOut
class DeferredS2SClientFactory(DeferredS2SClientFactory):
def onConnectionMade(self, xs):
super(self.__class__, self).onConnectionMade(xs)
def logDataIn(buf):
buf = buf.strip()
if buf:
xmpp_logger.msg("RECEIVED", ISOTimestamp.now(), buf)
def logDataOut(buf):
buf = buf.strip()
if buf:
xmpp_logger.msg("SENDING", ISOTimestamp.now(), buf)
if XMPPGatewayConfig.trace_xmpp:
xs.rawDataInFn = logDataIn
xs.rawDataOutFn = logDataOut
# Patch Wokkel's DeferredS2SClientFactory to use our logger
import wokkel.server
wokkel.server.DeferredS2SClientFactory = DeferredS2SClientFactory
del wokkel.server
# Manager
class XMPPManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
config = XMPPGatewayConfig
self.stopped = False
self.domains = set(config.domains)
self.muc_domains = set(['%s.%s' % (config.muc_prefix, domain) for domain in self.domains])
router = Router()
self._server_service = ServerService(router)
self._server_service.domains = self.domains | self.muc_domains
self._server_service.logTraffic = False # done manually
self._s2s_factory = XMPPS2SServerFactory(self._server_service)
self._s2s_factory.logTraffic = False # done manually
self._internal_component = InternalComponent(router)
self._internal_component.domains = self.domains
self._message_protocol = MessageProtocol()
self._message_protocol.setHandlerParent(self._internal_component)
self._presence_protocol = PresenceProtocol()
self._presence_protocol.setHandlerParent(self._internal_component)
self._disco_protocol = DiscoProtocol()
self._disco_protocol.setHandlerParent(self._internal_component)
self._muc_component = InternalComponent(router)
self._muc_component.domains = self.muc_domains
self._muc_protocol = MUCServerProtocol()
self._muc_protocol.setHandlerParent(self._muc_component)
self._disco_muc_protocol = DiscoProtocol()
self._disco_muc_protocol.setHandlerParent(self._muc_component)
self._s2s_listener = None
self.chat_session_manager = XMPPChatSessionManager()
self.muc_session_manager = XMPPMucSessionManager()
self.subscription_manager = XMPPSubscriptionManager()
def start(self):
self.stopped = False
xmpp_logger.start()
config = XMPPGatewayConfig
self._s2s_listener = reactor.listenTCP(config.local_port, self._s2s_factory, interface=config.local_ip)
listen_address = self._s2s_listener.getHost()
log.msg("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port))
self.chat_session_manager.start()
self.muc_session_manager.start()
self.subscription_manager.start()
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._internal_component)
notification_center.add_observer(self, sender=self._muc_component)
self._internal_component.startService()
self._muc_component.startService()
def stop(self):
self.stopped = True
self._s2s_listener.stopListening()
self.subscription_manager.stop()
self.muc_session_manager.stop()
self.chat_session_manager.stop()
self._internal_component.stopService()
self._muc_component.stopService()
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self._internal_component)
notification_center.remove_observer(self, sender=self._muc_component)
xmpp_logger.stop()
def send_stanza(self, stanza):
if self.stopped:
return
self._internal_component.send(stanza.to_xml_element())
def send_muc_stanza(self, stanza):
if self.stopped:
return
self._muc_component.send(stanza.to_xml_element())
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
# Process message stanzas
def _NH_XMPPGotChatMessage(self, notification):
message = notification.data.message
try:
session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)]
except KeyError:
notification.center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data)
else:
session.channel.send(message)
def _NH_XMPPGotNormalMessage(self, notification):
notification.center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data)
def _NH_XMPPGotComposingIndication(self, notification):
composing_indication = notification.data.composing_indication
try:
session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)]
except KeyError:
notification.center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data)
else:
session.channel.send(composing_indication)
def _NH_XMPPGotErrorMessage(self, notification):
error_message = notification.data.error_message
try:
session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)]
except KeyError:
notification.center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data)
else:
session.channel.send(error_message)
def _NH_XMPPGotReceipt(self, notification):
receipt = notification.data.receipt
try:
session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)]
except KeyError:
pass
else:
session.channel.send(receipt)
# Process presence stanzas
def _NH_XMPPGotPresenceAvailability(self, notification):
stanza = notification.data.presence_stanza
if stanza.recipient.uri.resource is not None:
# Skip directed presence
return
sender_uri = stanza.sender.uri
sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host)
try:
subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)]
except KeyError:
# Ignore incoming presence stanzas if there is no subscription
pass
else:
subscription.channel.send(stanza)
def _NH_XMPPGotPresenceSubscriptionStatus(self, notification):
stanza = notification.data.presence_stanza
if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None:
# Skip directed presence
return
if stanza.type in ('subscribed', 'unsubscribed'):
try:
subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)]
except KeyError:
pass
else:
subscription.channel.send(stanza)
elif stanza.type in ('subscribe', 'unsubscribe'):
try:
subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)]
except KeyError:
if stanza.type == 'subscribe':
notification.center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza))
else:
subscription.channel.send(stanza)
def _NH_XMPPGotPresenceProbe(self, notification):
stanza = notification.data.presence_stanza
if stanza.recipient.uri.resource is not None:
# Skip directed presence
return
sender_uri = stanza.sender.uri
sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host)
try:
subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)]
except KeyError:
notification.center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza))
else:
subscription.channel.send(stanza)
# Process muc stanzas
def _NH_XMPPMucGotGroupChat(self, notification):
message = notification.data.message
muc_uri = FrozenURI(message.recipient.uri.user, message.recipient.uri.host)
try:
session = self.muc_session_manager.incoming[(muc_uri, message.sender.uri)]
except KeyError:
# Ignore groupchat messages if there was no session created
pass
else:
session.channel.send(message)
def _NH_XMPPMucGotPresenceAvailability(self, notification):
stanza = notification.data.presence_stanza
if not stanza.sender.uri.resource:
return
muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host)
try:
session = self.muc_session_manager.incoming[(muc_uri, stanza.sender.uri)]
except KeyError:
if stanza.available:
notification.center.post_notification('XMPPGotMucJoinRequest', sender=self, data=NotificationData(stanza=stanza))
else:
notification.center.post_notification('XMPPGotMucLeaveRequest', sender=self, data=NotificationData(stanza=stanza))
else:
session.channel.send(stanza)
+ def _NH_XMPPMucGotInvitation(self, notification):
+ invitation = notification.data.invitation
+ data = NotificationData(sender=invitation.sender, recipient=invitation.recipient, participant=invitation.invited_user)
+ notification.center.post_notification('XMPPGotMucAddParticipantRequest', sender=self, data=data)
+
# Disco
def _NH_XMPPGotDiscoInfoRequest(self, notification):
d = notification.data.deferred
target_uri = notification.data.target.uri
if target_uri.host not in self.domains | self.muc_domains:
d.errback(StanzaError('service-unavailable'))
return
elements = []
if target_uri.host in self.muc_domains:
elements.append(disco.DiscoIdentity('conference', 'text', 'SylkServer Chat Service'))
elements.append(disco.DiscoFeature('http://jabber.org/protocol/muc'))
if target_uri.user:
# We can't say much more here, because the actual conference may end up on a different server
elements.append(disco.DiscoFeature('muc_temporary'))
elements.append(disco.DiscoFeature('muc_unmoderated'))
else:
if not target_uri.user:
elements.append(disco.DiscoIdentity('gateway', 'simple'))
elements.append(disco.DiscoIdentity('server', 'im'))
else:
elements.append(disco.DiscoIdentity('account', 'registered'))
elements.append(disco.DiscoFeature('http://sylkserver.com'))
d.callback(elements)
def _NH_XMPPGotDiscoItemsRequest(self, notification):
d = notification.data.deferred
target_uri = notification.data.target.uri
items = []
if not target_uri.user and target_uri.host in self.domains:
items.append(disco.DiscoItem(JID('%s.%s' % (XMPPGatewayConfig.muc_prefix, target_uri.host)), name='Multi-User Chat'))
d.callback(items)
diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py
index 1223dc1..dffdb48 100644
--- a/sylk/applications/xmppgateway/xmpp/protocols.py
+++ b/sylk/applications/xmppgateway/xmpp/protocols.py
@@ -1,257 +1,277 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
from application.notification import NotificationCenter, NotificationData
from twisted.internet import defer
from wokkel.disco import DiscoHandler
from wokkel.muc import UserPresence
from wokkel.xmppim import BasePresenceProtocol, MessageProtocol, PresenceProtocol
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI
-from sylk.applications.xmppgateway.xmpp.stanzas import RECEIPTS_NS, CHATSTATES_NS, ErrorStanza, \
- NormalMessage, MessageReceipt, ChatMessage, ChatComposingIndication, \
- AvailabilityPresence, SubscriptionPresence, ProbePresence, \
- MUCAvailabilityPresence, GroupChatMessage \
+from sylk.applications.xmppgateway.xmpp.stanzas import (RECEIPTS_NS, CHATSTATES_NS, MUC_USER_NS, ErrorStanza,
+ NormalMessage, MessageReceipt, ChatMessage, ChatComposingIndication,
+ AvailabilityPresence, SubscriptionPresence, ProbePresence,
+ MUCAvailabilityPresence, GroupChatMessage, IncomingInvitationMessage)
__all__ = ['DiscoProtocol', 'MessageProtocol', 'MUCServerProtocol', 'PresenceProtocol']
class MessageProtocol(MessageProtocol):
messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error'
def _onMessage(self, message):
if message.handled:
return
messageType = message.getAttribute("type")
if messageType not in self.messageTypes:
message["type"] = 'normal'
self.onMessage(message)
def onMessage(self, msg):
notification_center = NotificationCenter()
sender_uri = FrozenURI.parse('xmpp:'+msg['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+msg['to'])
recipient = Identity(recipient_uri)
msg_type = msg.getAttribute('type')
msg_id = msg.getAttribute('id', None)
is_empty = msg.body is None and msg.html is None
if msg_type == 'error':
error_type = msg.error['type']
conditions = [(child.name, child.defaultUri) for child in msg.error.elements()]
error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg_id)
notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=NotificationData(error_message=error_message))
return
if msg_type in (None, 'normal', 'chat') and not is_empty:
body = None
html_body = None
if msg.html is not None:
html_body = msg.html.toXml()
if msg.body is not None:
body = unicode(msg.body)
try:
elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS)
except StopIteration:
use_receipt = False
else:
use_receipt = elem.name == u'request'
if msg_type == 'chat':
message = ChatMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt)
notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=NotificationData(message=message))
else:
message = NormalMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt)
notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=NotificationData(message=message))
return
# Check if it's a composing indication
if msg_type == 'chat' and is_empty:
for elem in msg.elements():
try:
elem = next(c for c in msg.elements() if c.uri == CHATSTATES_NS)
except StopIteration:
pass
else:
composing_indication = ChatComposingIndication(sender, recipient, elem.name, id=msg_id)
notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=NotificationData(composing_indication=composing_indication))
return
# Check if it's a receipt acknowledgement
if is_empty:
try:
elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS)
except StopIteration:
pass
else:
if elem.name == u'received' and msg_id is not None:
receipt = MessageReceipt(sender, recipient, msg_id)
notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=NotificationData(receipt=receipt))
class PresenceProtocol(PresenceProtocol):
def availableReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
show = stanza.show
statuses = stanza.statuses
presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id)
NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
def unavailableReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id)
NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
def _process_subscription_stanza(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
type = stanza.element.getAttribute('type')
presence_stanza = SubscriptionPresence(sender, recipient, type, id=id)
NotificationCenter().post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
def subscribedReceived(self, stanza):
self._process_subscription_stanza(stanza)
def unsubscribedReceived(self, stanza):
self._process_subscription_stanza(stanza)
def subscribeReceived(self, stanza):
self._process_subscription_stanza(stanza)
def unsubscribeReceived(self, stanza):
self._process_subscription_stanza(stanza)
def probeReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
presence_stanza = ProbePresence(sender, recipient, id=id)
NotificationCenter().post_notification('XMPPGotPresenceProbe', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
class MUCServerProtocol(BasePresenceProtocol):
messageTypes = None, 'normal', 'chat', 'groupchat'
presenceTypeParserMap = {'available': UserPresence,
'unavailable': UserPresence}
def connectionInitialized(self):
BasePresenceProtocol.connectionInitialized(self)
self.xmlstream.addObserver('/message', self._onMessage)
def _onMessage(self, message):
if message.handled:
return
messageType = message.getAttribute("type")
if messageType == 'error':
return
if messageType not in self.messageTypes:
message['type'] = 'normal'
if messageType == 'groupchat':
self.onGroupChat(message)
else:
- # TODO: give error, private messages not supported
- pass
+ to_uri = FrozenURI.parse('xmpp:'+message['to'])
+ if to_uri.host in self.parent.domains:
+ # Check if it's an invitation
+ if message.x is not None and message.x.invite is not None and message.x.invite.uri == MUC_USER_NS:
+ self.onInvitation(message)
+ else:
+ # TODO: give error, private messages not supported
+ pass
def onGroupChat(self, msg):
sender_uri = FrozenURI.parse('xmpp:'+msg['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+msg['to'])
recipient = Identity(recipient_uri)
body = None
html_body = None
if msg.html is not None:
html_body = msg.html.toXml()
if msg.body is not None:
body = unicode(msg.body)
message = GroupChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None))
NotificationCenter().post_notification('XMPPMucGotGroupChat', sender=self.parent, data=NotificationData(message=message))
+ def onInvitation(self, msg):
+ sender_uri = FrozenURI.parse('xmpp:'+msg['from'])
+ sender = Identity(sender_uri)
+ recipient_uri = FrozenURI.parse('xmpp:'+msg['to'])
+ recipient = Identity(recipient_uri)
+ invited_user_uri = FrozenURI.parse('xmpp:'+msg.x.invite['to'])
+ invited_user = Identity(invited_user_uri)
+ if msg.x.invite.reason is not None and msg.x.invite.reason.uri == MUC_USER_NS:
+ reason = unicode(msg.x.invite.reason)
+ else:
+ reason = None
+ invitation = IncomingInvitationMessage(sender, recipient, invited_user=invited_user, reason=reason, id=msg.getAttribute('id', None))
+ NotificationCenter().post_notification('XMPPMucGotInvitation', sender=self.parent, data=NotificationData(invitation=invitation))
+
def availableReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
presence_stanza = MUCAvailabilityPresence(sender, recipient, available=True, id=id)
NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
def unavailableReceived(self, stanza):
sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from'])
sender = Identity(sender_uri)
recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to'])
recipient = Identity(recipient_uri)
id = stanza.element.getAttribute('id')
presence_stanza = MUCAvailabilityPresence(sender, recipient, available=False, id=id)
NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza))
class DiscoProtocol(DiscoHandler):
def info(self, requestor, target, nodeIdentifier):
"""
Gather data for a disco info request.
@param requestor: The entity that sent the request.
@type requestor: L{JID<twisted.words.protocols.jabber.jid.JID>}
@param target: The entity the request was sent to.
@type target: L{JID<twisted.words.protocols.jabber.jid.JID>}
@param nodeIdentifier: The optional node being queried, or C{''}.
@type nodeIdentifier: C{unicode}
@return: Deferred with the gathered results from sibling handlers.
@rtype: L{defer.Deferred}
"""
d = defer.Deferred()
sender_uri = FrozenURI.parse(requestor)
sender = Identity(sender_uri)
target_uri = FrozenURI.parse(target)
target = Identity(target_uri)
data = NotificationData(sender=sender, target=target, node_identifier=nodeIdentifier, deferred=d)
NotificationCenter().post_notification('XMPPGotDiscoInfoRequest', sender=self.parent, data=data)
return d
def items(self, requestor, target, nodeIdentifier):
"""
Gather data for a disco items request.
@param requestor: The entity that sent the request.
@type requestor: L{JID<twisted.words.protocols.jabber.jid.JID>}
@param target: The entity the request was sent to.
@type target: L{JID<twisted.words.protocols.jabber.jid.JID>}
@param nodeIdentifier: The optional node being queried, or C{''}.
@type nodeIdentifier: C{unicode}
@return: Deferred with the gathered results from sibling handlers.
@rtype: L{defer.Deferred}
"""
d = defer.Deferred()
sender_uri = FrozenURI.parse(requestor)
sender = Identity(sender_uri)
target_uri = FrozenURI.parse(target)
target = Identity(target_uri)
data = NotificationData(sender=sender, target=target, node_identifier=nodeIdentifier, deferred=d)
NotificationCenter().post_notification('XMPPGotDiscoItemsRequest', sender=self.parent, data=data)
return d
diff --git a/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py b/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py
index bd06aa4..4fcd8b3 100644
--- a/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py
+++ b/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py
@@ -1,249 +1,281 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
from twisted.words.xish import domish
from sylk import __version__ as SYLK_VERSION
from sylk.applications.xmppgateway.util import html2text
CHATSTATES_NS = 'http://jabber.org/protocol/chatstates'
RECEIPTS_NS = 'urn:xmpp:receipts'
STANZAS_NS = 'urn:ietf:params:xml:ns:xmpp-stanzas'
XML_NS = 'http://www.w3.org/XML/1998/namespace'
MUC_NS = 'http://jabber.org/protocol/muc'
MUC_USER_NS = MUC_NS + '#user'
CAPS_NS = 'http://jabber.org/protocol/caps'
SYLK_CAPS = []
class BaseStanza(object):
stanza_type = None # to be defined by subclasses
type = None
def __init__(self, sender, recipient, id=None):
self.sender = sender
self.recipient = recipient
self.id = id
def to_xml_element(self):
xml_element = domish.Element((None, self.stanza_type))
xml_element['from'] = self.sender.uri.as_string('xmpp')
xml_element['to'] = self.recipient.uri.as_string('xmpp')
if self.type:
xml_element['type'] = self.type
if self.id is not None:
xml_element['id'] = self.id
return xml_element
class ErrorStanza(object):
"""
Stanza representing an error of another stanza. It's not a base stanza type on its own.
"""
def __init__(self, stanza_type, sender, recipient, error_type, conditions, id=None):
self.stanza_type = stanza_type
self.sender = sender
self.recipient = recipient
self.id = id
self.conditions = conditions
self.error_type = error_type
@classmethod
def from_stanza(cls, stanza, error_type, conditions):
# In error stanzas sender and recipient are swapped
return cls(stanza.stanza_type, stanza.recipient, stanza.sender, error_type, conditions, id=stanza.id)
def to_xml_element(self):
xml_element = domish.Element((None, self.stanza_type))
xml_element['from'] = self.sender.uri.as_string('xmpp')
xml_element['to'] = self.recipient.uri.as_string('xmpp')
xml_element['type'] = 'error'
if self.id is not None:
xml_element['id'] = self.id
error_element = domish.Element((None, 'error'))
error_element['type'] = self.error_type
[error_element.addChild(domish.Element((ns, condition))) for condition, ns in self.conditions]
xml_element.addChild(error_element)
return xml_element
class BaseMessageStanza(BaseStanza):
stanza_type = 'message'
def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=False):
super(BaseMessageStanza, self).__init__(sender, recipient, id=id)
self.use_receipt = use_receipt
if body is not None and html_body is None:
self.body = body
self.html_body = None
elif body is None and html_body is not None:
self.body = html2text(html_body)
self.html_body = html_body
else:
self.body = body
self.html_body = html_body
def to_xml_element(self):
xml_element = super(BaseMessageStanza, self).to_xml_element()
if self.id is not None and self.recipient.uri.resource is not None and self.use_receipt:
xml_element.addElement('request', defaultUri=RECEIPTS_NS)
if self.body is not None:
xml_element.addElement('body', content=self.body)
if self.html_body is not None:
xml_element.addElement('html', content=self.html_body)
return xml_element
class NormalMessage(BaseMessageStanza):
def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=False):
if body is None and html_body is None:
raise ValueError('either body or html_body need to be set')
super(NormalMessage, self).__init__(sender, recipient, body, html_body, id, use_receipt)
class ChatMessage(BaseMessageStanza):
type = 'chat'
def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=True):
if body is None and html_body is None:
raise ValueError('either body or html_body need to be set')
super(ChatMessage, self).__init__(sender, recipient, body, html_body, id, use_receipt)
def to_xml_element(self):
xml_element = super(ChatMessage, self).to_xml_element()
xml_element.addElement('active', defaultUri=CHATSTATES_NS)
return xml_element
class ChatComposingIndication(BaseMessageStanza):
type = 'chat'
def __init__(self, sender, recipient, state, id=None, use_receipt=False):
super(ChatComposingIndication, self).__init__(sender, recipient, id=id, use_receipt=use_receipt)
self.state = state
def to_xml_element(self):
xml_element = super(ChatComposingIndication, self).to_xml_element()
xml_element.addElement(self.state, defaultUri=CHATSTATES_NS)
return xml_element
class GroupChatMessage(BaseMessageStanza):
type = 'groupchat'
def __init__(self, sender, recipient, body=None, html_body=None, id=None):
# TODO: add timestamp
if body is None and html_body is None:
raise ValueError('either body or html_body need to be set')
super(GroupChatMessage, self).__init__(sender, recipient, body, html_body, id, False)
class MessageReceipt(BaseMessageStanza):
def __init__(self, sender, recipient, receipt_id, id=None):
super(MessageReceipt, self).__init__(sender, recipient, id=id, use_receipt=False)
self.receipt_id = receipt_id
def to_xml_element(self):
xml_element = super(MessageReceipt, self).to_xml_element()
receipt_element = domish.Element((RECEIPTS_NS, 'received'))
receipt_element['id'] = self.receipt_id
xml_element.addChild(receipt_element)
return xml_element
class BasePresenceStanza(BaseStanza):
stanza_type = 'presence'
+class IncomingInvitationMessage(BaseMessageStanza):
+ def __init__(self, sender, recipient, invited_user, reason=None, id=None):
+ super(IncomingInvitationMessage, self).__init__(sender, recipient, body=None, html_body=None, id=id, use_receipt=False)
+ self.invited_user = invited_user
+ self.reason = reason
+
+ def to_xml_element(self):
+ xml_element = super(IncomingInvitationMessage, self).to_xml_element()
+ child = xml_element.addElement((MUC_USER_NS, 'x'))
+ child.addElement('invite')
+ child.invite['to'] = self.invited_user.uri.as_string('xmpp')
+ if self.reason:
+ child.invite.addElement('reason', content=self.reason)
+ return xml_element
+
+
+class OutgoingInvitationMessage(BaseMessageStanza):
+ def __init__(self, sender, recipient, originator, reason=None, id=None):
+ super(OutgoingInvitationMessage, self).__init__(sender, recipient, body=None, html_body=None, id=id, use_receipt=False)
+ self.originator = originator
+ self.reason = reason
+
+ def to_xml_element(self):
+ xml_element = super(OutgoingInvitationMessage, self).to_xml_element()
+ child = xml_element.addElement((MUC_USER_NS, 'x'))
+ child.addElement('invite')
+ child.invite['from'] = self.originator.uri.as_string('xmpp')
+ if self.reason:
+ child.invite.addElement('reason', content=self.reason)
+ return xml_element
+
+
class AvailabilityPresence(BasePresenceStanza):
def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None):
super(AvailabilityPresence, self).__init__(sender, recipient, id=id)
self.available = available
self.show = show
self.priority = priority
self.statuses = statuses or {}
def _get_available(self):
return self.__dict__['available']
def _set_available(self, available):
if available:
self.type = None
else:
self.type = 'unavailable'
self.__dict__['available'] = available
available = property(_get_available, _set_available)
del _get_available, _set_available
@property
def status(self):
status = self.statuses.get(None)
if status is None:
try:
status = self.statuses.itervalues().next()
except StopIteration:
pass
return status
def to_xml_element(self):
xml_element = super(BasePresenceStanza, self).to_xml_element()
if self.available:
if self.show is not None:
xml_element.addElement('show', content=self.show)
if self.priority != 0:
xml_element.addElement('priority', content=unicode(self.priority))
caps = xml_element.addElement('c', defaultUri=CAPS_NS)
caps['node'] = 'http://sylkserver.com'
caps['ver'] = SYLK_VERSION
if SYLK_CAPS:
caps['ext'] = ' '.join(SYLK_CAPS)
for lang, text in self.statuses.iteritems():
status = xml_element.addElement('status', content=text)
if lang:
status[(XML_NS, 'lang')] = lang
return xml_element
class SubscriptionPresence(BasePresenceStanza):
def __init__(self, sender, recipient, type, id=None):
super(SubscriptionPresence, self).__init__(sender, recipient, id=id)
self.type = type
class ProbePresence(BasePresenceStanza):
type = 'probe'
class MUCAvailabilityPresence(AvailabilityPresence):
def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None, affiliation=None, jid=None, role=None, muc_statuses=None):
super(MUCAvailabilityPresence, self).__init__(sender, recipient, available, show, statuses, priority, id)
self.affiliation = affiliation or 'member'
self.role = role or 'participant'
self.muc_statuses = muc_statuses or []
self.jid = jid
def to_xml_element(self):
xml_element = super(MUCAvailabilityPresence, self).to_xml_element()
muc = xml_element.addElement('x', defaultUri=MUC_USER_NS)
item = muc.addElement('item')
if self.affiliation:
item['affiliation'] = self.affiliation
if self.role:
item['role'] = self.role
if self.jid:
item['jid'] = self.jid.uri.as_string('xmpp')
for code in self.muc_statuses:
status = muc.addElement('status')
status['code'] = code
return xml_element
class MUCErrorPresence(ErrorStanza):
def to_xml_element(self):
xml_element = super(MUCErrorPresence, self).to_xml_element()
xml_element.addElement('x', defaultUri=MUC_USER_NS)
return xml_element

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 4:04 AM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408789
Default Alt Text
(88 KB)

Event Timeline