Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/xmppgateway/__init__.py b/sylk/applications/xmppgateway/__init__.py
index 67be9f1..07300df 100644
--- a/sylk/applications/xmppgateway/__init__.py
+++ b/sylk/applications/xmppgateway/__init__.py
@@ -1,500 +1,502 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
import os
from application.notification import IObserver, NotificationCenter
from application.python import Null
from sipsimple.core import SIPURI, SIPCoreError
from sipsimple.payloads import ParserError
from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage
from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError
from sipsimple.threading.green import run_in_green_thread
from zope.interface import implements
from sylk.applications import ISylkApplication, SylkApplication, ApplicationLogger
from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource
from sylk.applications.xmppgateway.im import SIPMessageSender, SIPMessageError, ChatSessionHandler
from sylk.applications.xmppgateway.presence import S2XPresenceHandler, X2SPresenceHandler
from sylk.applications.xmppgateway.muc import X2SMucInvitationHandler, S2XMucInvitationHandler, X2SMucHandler
from sylk.applications.xmppgateway.util import format_uri
from sylk.applications.xmppgateway.xmpp import XMPPManager
from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession
from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, NormalMessage
log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1])
class XMPPGatewayApplication(object):
__metaclass__ = SylkApplication
implements(ISylkApplication, IObserver)
__appname__ = 'xmppgateway'
def __init__(self):
self.xmpp_manager = XMPPManager()
self.pending_sessions = {}
self.chat_sessions = set()
self.s2x_muc_sessions = {}
self.x2s_muc_sessions = {}
self.s2x_presence_subscriptions = {}
self.x2s_presence_subscriptions = {}
self.s2x_muc_add_participant_handlers = {}
self.x2s_muc_add_participant_handlers = {}
def start(self):
NotificationCenter().add_observer(self, sender=self.xmpp_manager)
self.xmpp_manager.start()
def stop(self):
NotificationCenter().remove_observer(self, sender=self.xmpp_manager)
self.xmpp_manager.stop()
def incoming_session(self, session):
log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri))
try:
msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next()
except StopIteration:
log.msg('Session rejected: Only MSRP media is supported')
session.reject(488, 'Only MSRP media is supported')
return
# Check if this session is really an invitation to add a participant to a conference room / muc
if session.remote_identity.uri.host in self.xmpp_manager.muc_domains and 'isfocus' in session._invitation.remote_contact_header.parameters:
try:
referred_by_uri = SIPURI.parse(session.transfer_info.referred_by)
except SIPCoreError:
log.msg("SIP multiparty session invitation %s failed: invalid Referred-By header" % session._invitation.call_id)
session.reject(488)
return
muc_uri = FrozenURI(session.remote_identity.uri.user, session.remote_identity.uri.host)
inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host)
recipient_uri = FrozenURI(session.local_identity.uri.user, session.local_identity.uri.host)
sender = Identity(muc_uri)
recipient = Identity(recipient_uri)
inviter = Identity(inviter_uri)
try:
handler = self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)]
except KeyError:
handler = S2XMucInvitationHandler(session, sender, recipient, inviter)
self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] = handler
NotificationCenter().add_observer(self, sender=handler)
handler.start()
else:
log.msg("SIP multiparty session invitation %s failed: there is another invitation in progress from %s to %s" % (session._invitation.call_id,
format_uri(inviter_uri, 'sip'),
format_uri(recipient_uri, 'xmpp')))
session.reject(480)
return
# Check domain
if session.remote_identity.uri.host not in XMPPGatewayConfig.domains:
log.msg('Session rejected: From domain is not a local XMPP domain')
session.reject(606, 'Not Acceptable')
return
# Get URI representing the SIP side
contact_uri = session._invitation.remote_contact_header.uri
if contact_uri.parameters.get('gr') is not None:
sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr'))
else:
tmp = session.remote_identity.uri
sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource())
# Get URI representing the XMPP side
request_uri = session._invitation.request_uri
remote_resource = request_uri.parameters.get('gr', None)
if remote_resource is not None:
try:
remote_resource = decode_resource(remote_resource)
except (TypeError, UnicodeError):
pass
xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource)
try:
handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
pass
else:
# There is another pending session with same identifiers, can't accept this one
log.msg('Session rejected: other session with same identifiers in progress')
session.reject(488)
return
sip_identity = Identity(sip_leg_uri, session.remote_identity.display_name)
handler = ChatSessionHandler.new_from_sip_session(sip_identity, session)
NotificationCenter().add_observer(self, sender=handler)
key = (sip_leg_uri, xmpp_leg_uri)
self.pending_sessions[key] = handler
if xmpp_leg_uri.resource is not None:
# Incoming session target contained GRUU, so create XMPPChatSession immediately
xmpp_session = XMPPChatSession(local_identity=handler.sip_identity, remote_identity=Identity(xmpp_leg_uri))
handler.xmpp_identity = xmpp_session.remote_identity
handler.xmpp_session = xmpp_session
def incoming_subscription(self, subscribe_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
if Null in (from_header, to_header):
subscribe_request.reject(400)
return
- log.msg('New SIP subscription from %s to %s' % (format_uri(from_header.uri, 'sip'), format_uri(to_header.uri, 'xmpp')))
+ log.msg('SIP subscription from %s to %s' % (format_uri(from_header.uri, 'sip'), format_uri(to_header.uri, 'xmpp')))
if subscribe_request.event != 'presence':
- log.msg('Subscription rejected: only presence event is supported')
+ log.msg('SIP Subscription rejected: only presence event is supported')
subscribe_request.reject(489)
return
# Check domain
remote_identity_uri = data.headers['From'].uri
if remote_identity_uri.host not in XMPPGatewayConfig.domains:
- log.msg('Subscription rejected: From domain is not a local XMPP domain')
+ log.msg('SIP Subscription rejected: From domain is not a local XMPP domain')
subscribe_request.reject(606)
return
# Get URI representing the SIP side
sip_leg_uri = FrozenURI(remote_identity_uri.user, remote_identity_uri.host)
# Get URI representing the XMPP side
request_uri = data.request_uri
xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host)
try:
handler = self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
sip_identity = Identity(sip_leg_uri, data.headers['From'].display_name)
xmpp_identity = Identity(xmpp_leg_uri)
handler = S2XPresenceHandler(sip_identity, xmpp_identity)
self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] = handler
NotificationCenter().add_observer(self, sender=handler)
handler.start()
- else:
- log.msg('New SIP subscription from %s to %s added to presence flow 0x%x' % (format_uri(from_header.uri, 'sip'), format_uri(to_header.uri, 'xmpp'), id(handler)))
handler.add_sip_subscription(subscribe_request)
def incoming_referral(self, refer_request, data):
refer_request.reject(405)
def incoming_sip_message(self, message_request, data):
content_type = data.headers.get('Content-Type', Null).content_type
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
if Null in (content_type, from_header, to_header):
message_request.answer(400)
return
log.msg('New SIP Message from %s to %s' % (from_header.uri, to_header.uri))
# Check domain
if from_header.uri.host not in XMPPGatewayConfig.domains:
log.msg('Message rejected: From domain is not a local XMPP domain')
message_request.answer(606)
return
if content_type == 'message/cpim':
try:
cpim_message = CPIMMessage.parse(data.body)
except CPIMParserError:
log.msg('Message rejected: CPIM parse error')
message_request.answer(400)
return
else:
body = cpim_message.body
content_type = cpim_message.content_type
sender = cpim_message.sender or from_header
from_uri = sender.uri
else:
body = data.body
from_uri = from_header.uri
to_uri = str(to_header.uri)
message_request.answer(200)
if from_uri.parameters.get('gr', None) is None:
from_uri = SIPURI.new(from_uri)
from_uri.parameters['gr'] = generate_sylk_resource()
sender = Identity(FrozenURI.parse(from_uri))
recipient = Identity(FrozenURI.parse(to_uri))
if content_type in ('text/plain', 'text/html'):
if content_type == 'text/plain':
html_body = None
else:
html_body = body
body = None
if XMPPGatewayConfig.use_msrp_for_chat:
message = NormalMessage(sender, recipient, body, html_body, use_receipt=False)
self.xmpp_manager.send_stanza(message)
else:
message = ChatMessage(sender, recipient, body, html_body, use_receipt=False)
self.xmpp_manager.send_stanza(message)
elif content_type == IsComposingDocument.content_type:
if not XMPPGatewayConfig.use_msrp_for_chat:
try:
msg = IsComposingMessage.parse(body)
except ParserError:
pass
else:
state = 'composing' if msg.state == 'active' else 'paused'
message = ChatComposingIndication(sender, recipient, state, use_receipt=False)
self.xmpp_manager.send_stanza(message)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
# Out of band XMPP stanza handling
@run_in_green_thread
def _NH_XMPPGotChatMessage(self, notification):
# This notification is only processed here untill the ChatSessionHandler
# has both (SIP and XMPP) sessions established
message = notification.data.message
sender = message.sender
recipient = message.recipient
if XMPPGatewayConfig.use_msrp_for_chat:
if recipient.uri.resource is None:
# If recipient resource is not set the session is started from
# the XMPP side
sip_leg_uri = FrozenURI.new(recipient.uri)
xmpp_leg_uri = FrozenURI.new(sender.uri)
try:
handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
# Not found, need to create a new handler and a outgoing SIP session
xmpp_identity = Identity(xmpp_leg_uri)
handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri)
key = (sip_leg_uri, xmpp_leg_uri)
self.pending_sessions[key] = handler
NotificationCenter().add_observer(self, sender=handler)
handler.enqueue_xmpp_message(message)
else:
# Find handler pending XMPP confirmation
sip_leg_uri = FrozenURI.new(recipient.uri)
xmpp_leg_uri = FrozenURI(sender.uri.user, sender.uri.host)
try:
handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
# Find handler pending XMPP confirmation
sip_leg_uri = FrozenURI(recipient.uri.user, recipient.uri.host)
xmpp_leg_uri = FrozenURI.new(sender.uri)
try:
handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)]
except KeyError:
# It's a new XMPP session to a full JID, disregard the full JID and start a new SIP session to the bare JID
xmpp_identity = Identity(xmpp_leg_uri)
handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri)
key = (sip_leg_uri, xmpp_leg_uri)
self.pending_sessions[key] = handler
NotificationCenter().add_observer(self, sender=handler)
handler.enqueue_xmpp_message(message)
else:
# Found handle, create XMPP session and establish session
session = XMPPChatSession(local_identity=recipient, remote_identity=sender)
handler.enqueue_xmpp_message(message)
handler.xmpp_identity = session.remote_identity
handler.xmpp_session = session
else:
sip_message_sender = SIPMessageSender(message)
try:
sip_message_sender.send().wait()
except SIPMessageError as e:
# TODO report back an error stanza
log.error('Error sending SIP Message: %s' % e)
@run_in_green_thread
def _NH_XMPPGotNormalMessage(self, notification):
message = notification.data.message
sip_message_sender = SIPMessageSender(message)
try:
sip_message_sender.send().wait()
except SIPMessageError as e:
# TODO report back an error stanza
log.error('Error sending SIP Message: %s' % e)
@run_in_green_thread
def _NH_XMPPGotComposingIndication(self, notification):
composing_indication = notification.data.composing_indication
sender = composing_indication.sender
recipient = composing_indication.recipient
if not XMPPGatewayConfig.use_msrp_for_chat:
state = 'active' if composing_indication.state == 'composing' else 'idle'
body = IsComposingMessage(state=state, refresh=composing_indication.interval or 30).toxml()
message = NormalMessage(sender, recipient, body, IsComposingDocument.content_type)
sip_message_sender = SIPMessageSender(message)
try:
sip_message_sender.send().wait()
except SIPMessageError as e:
# TODO report back an error stanza
log.error('Error sending SIP Message: %s' % e)
def _NH_XMPPGotPresenceSubscriptionRequest(self, notification):
stanza = notification.data.stanza
# Disregard the resource part, the presence request could be a probe instead of a subscribe
sender_uri = stanza.sender.uri
sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host)
try:
handler = self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)]
except KeyError:
xmpp_identity = stanza.sender
xmpp_identity.uri = sender_uri_bare
sip_identity = stanza.recipient
handler = X2SPresenceHandler(sip_identity, xmpp_identity)
self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] = handler
notification.center.add_observer(self, sender=handler)
handler.start()
def _NH_XMPPGotMucJoinRequest(self, notification):
stanza = notification.data.stanza
muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host)
nickname = stanza.recipient.uri.resource
try:
handler = self.x2s_muc_sessions[(stanza.sender.uri, muc_uri)]
except KeyError:
xmpp_identity = stanza.sender
sip_identity = stanza.recipient
sip_identity.uri = muc_uri
handler = X2SMucHandler(sip_identity, xmpp_identity, nickname)
handler._first_stanza = stanza
notification.center.add_observer(self, sender=handler)
handler.start()
# Check if there was a pending join request on the SIP side
try:
handler = self.s2x_muc_add_participant_handlers[(muc_uri, FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host))]
except KeyError:
pass
else:
handler.stop()
def _NH_XMPPGotMucAddParticipantRequest(self, notification):
sender = notification.data.sender
recipient = notification.data.recipient
participant = notification.data.participant
muc_uri = FrozenURI(recipient.uri.user, recipient.uri.host)
sender_uri = FrozenURI(sender.uri.user, sender.uri.host)
participant_uri = FrozenURI(participant.uri.user, participant.uri.host)
sender = Identity(sender_uri)
recipient = Identity(muc_uri)
participant = Identity(participant_uri)
try:
handler = self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)]
except KeyError:
handler = X2SMucInvitationHandler(sender, recipient, participant)
self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] = handler
notification.center.add_observer(self, sender=handler)
handler.start()
# Chat session handling
def _NH_ChatSessionDidStart(self, notification):
handler = notification.sender
log.msg('Chat session established sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri))
for k,v in self.pending_sessions.items():
if v is handler:
del self.pending_sessions[k]
break
self.chat_sessions.add(handler)
def _NH_ChatSessionDidEnd(self, notification):
handler = notification.sender
log.msg('Chat session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri))
self.chat_sessions.remove(handler)
notification.center.remove_observer(self, sender=handler)
def _NH_ChatSessionDidFail(self, notification):
handler = notification.sender
uris = None
for k,v in self.pending_sessions.items():
if v is handler:
uris = k
del self.pending_sessions[k]
break
sip_uri, xmpp_uri = uris
log.msg('Chat session failed sip:%s <--> xmpp:%s (%s)' % (sip_uri, xmpp_uri, notification.data.reason))
notification.center.remove_observer(self, sender=handler)
# Presence handling
def _NH_S2XPresenceHandlerDidStart(self, notification):
handler = notification.sender
log.msg('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp')))
+ log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys())))
def _NH_S2XPresenceHandlerDidEnd(self, notification):
handler = notification.sender
- log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp')))
self.s2x_presence_subscriptions.pop((handler.sip_identity.uri, handler.xmpp_identity.uri), None)
notification.center.remove_observer(self, sender=handler)
+ log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp')))
+ log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys())))
def _NH_X2SPresenceHandlerDidStart(self, notification):
handler = notification.sender
log.msg('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip')))
+ log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys())))
def _NH_X2SPresenceHandlerDidEnd(self, notification):
handler = notification.sender
- log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip')))
self.x2s_presence_subscriptions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None)
notification.center.remove_observer(self, sender=handler)
+ log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip')))
+ log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys())))
# MUC handling
def _NH_X2SMucHandlerDidStart(self, notification):
handler = notification.sender
log.msg('Multiparty session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri))
self.x2s_muc_sessions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler
def _NH_X2SMucHandlerDidEnd(self, notification):
handler = notification.sender
log.msg('Multiparty session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri))
self.x2s_muc_sessions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None)
notification.center.remove_observer(self, sender=handler)
def _NH_X2SMucInvitationHandlerDidStart(self, notification):
handler = notification.sender
sender_uri = handler.sender.uri
muc_uri = handler.recipient.uri
participant_uri = handler.participant.uri
log.msg('%s invited %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip')))
def _NH_X2SMucInvitationHandlerDidEnd(self, notification):
handler = notification.sender
sender_uri = handler.sender.uri
muc_uri = handler.recipient.uri
participant_uri = handler.participant.uri
log.msg('%s added %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip')))
del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)]
notification.center.remove_observer(self, sender=handler)
def _NH_X2SMucInvitationHandlerDidFail(self, notification):
handler = notification.sender
sender_uri = handler.sender.uri
muc_uri = handler.recipient.uri
participant_uri = handler.participant.uri
log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'), notification.data.failure))
del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)]
notification.center.remove_observer(self, sender=handler)
def _NH_S2XMucInvitationHandlerDidStart(self, notification):
handler = notification.sender
muc_uri = handler.sender.uri
inviter_uri = handler.inviter.uri
recipient_uri = handler.recipient.uri
log.msg("%s invited %s to multiparty chat %s" % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip')))
def _NH_S2XMucInvitationHandlerDidEnd(self, notification):
handler = notification.sender
muc_uri = handler.sender.uri
inviter_uri = handler.inviter.uri
recipient_uri = handler.recipient.uri
log.msg('%s added %s to multiparty chat %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip')))
del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)]
notification.center.remove_observer(self, sender=handler)
def _NH_S2XMucInvitationHandlerDidFail(self, notification):
handler = notification.sender
muc_uri = handler.sender.uri
inviter_uri = handler.inviter.uri
recipient_uri = handler.recipient.uri
log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'), str(notification.data.failure)))
del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)]
notification.center.remove_observer(self, sender=handler)
diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py
index cf25e26..818fa5d 100644
--- a/sylk/applications/xmppgateway/presence.py
+++ b/sylk/applications/xmppgateway/presence.py
@@ -1,488 +1,490 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
import hashlib
import os
import random
from application.notification import IObserver, NotificationCenter
from application.python import Null, limit
from application.python.descriptor import WriteOnceAttribute
from eventlib import coros, proc
from sipsimple.account import AccountManager
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, SIPURI, SIPCoreError
from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader
from sipsimple.core import Subscription
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import pidf, rpid, caps
from sipsimple.payloads import ParserError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
from time import time
from twisted.internet import reactor
from zope.interface import implements
from sylk.applications import ApplicationLogger
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource
from sylk.applications.xmppgateway.util import format_uri
from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence
from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscription, XMPPIncomingSubscription
from sylk.configuration import SIPConfig
log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1])
__all__ = ['S2XPresenceHandler', 'X2SPresenceHandler']
class S2XPresenceHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity):
self.ended = False
self._sip_subscriptions = []
self._stanza_cache = {}
self._pidf = None
self._xmpp_subscription = None
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
def start(self):
notification_center = NotificationCenter()
self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.start()
notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self)
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_subscription is not None:
notification_center.remove_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.end()
self._xmpp_subscription = None
while self._sip_subscriptions:
subscription = self._sip_subscriptions.pop()
notification_center.remove_observer(self, sender=subscription)
try:
subscription.end()
except SIPCoreError:
pass
self.ended = True
notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self)
def add_sip_subscription(self, subscription):
self._sip_subscriptions.append(subscription)
NotificationCenter().add_observer(self, sender=subscription)
if self._xmpp_subscription.state == 'active':
pidf_doc = self._pidf
content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None
subscription.accept(content_type, pidf_doc)
else:
subscription.accept_pending()
+ log.msg('SIP subscription from %s to %s added to presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _build_pidf(self):
if not self._stanza_cache:
self._pidf = None
return None
pidf_doc = pidf.PIDF(str(self.xmpp_identity))
uri = self._stanza_cache.iterkeys().next()
person = pidf.Person("PID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest())
person.activities = rpid.Activities()
pidf_doc.add(person)
for stanza in self._stanza_cache.itervalues():
if not stanza.available:
status = pidf.Status('closed')
status.extended = 'offline'
else:
status = pidf.Status('open')
if stanza.show == 'away':
status.extended = 'away'
if 'away' not in person.activities:
person.activities.add('away')
elif stanza.show == 'xa':
status.extended = 'away'
if 'away' not in person.activities:
person.activities.add('away')
elif stanza.show == 'dnd':
status.extended = 'busy'
if 'busy' not in person.activities:
person.activities.add('busy')
else:
status.extended = 'available'
if stanza.sender.uri.resource:
resource = encode_resource(stanza.sender.uri.resource)
else:
# Workaround for clients not sending the resource under certain (unknown) circumstances
resource = hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()
service_id = "SID-%s" % resource
sip_uri = stanza.sender.uri.as_sip_uri()
sip_uri.parameters['gr'] = resource
sip_uri.parameters['xmpp'] = None
contact = pidf.Contact(str(sip_uri))
service = pidf.Service(service_id, status=status, contact=contact)
service.add(pidf.DeviceID(resource))
service.device_info = pidf.DeviceInfo(resource, description=stanza.sender.uri.resource)
service.capabilities = caps.ServiceCapabilities(text=True, message=True)
for lang, note in stanza.statuses.iteritems():
service.notes.add(pidf.PIDFNote(note, lang=lang))
pidf_doc.add(service)
if not person.activities:
person.activities = None
self._pidf = pidf_doc.toxml()
return self._pidf
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
@run_in_twisted_thread
def _NH_SIPIncomingSubscriptionDidEnd(self, notification):
subscription = notification.sender
notification.center.remove_observer(self, sender=subscription)
self._sip_subscriptions.remove(subscription)
if not self._sip_subscriptions:
self.end()
+ log.msg('SIP subscription from %s to %s removed from presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _NH_SIPIncomingSubscriptionNotifyDidFail(self, notification):
log.msg('Sending SIP NOTIFY failed from %s to %s for presence flow 0x%x: %s (%s)' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), notification.data.code, notification.data.reason))
def _NH_XMPPSubscriptionChangedState(self, notification):
if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active':
pidf_doc = self._pidf
content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None
for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'):
subscription.accept(content_type, pidf_doc)
def _NH_XMPPSubscriptionGotNotify(self, notification):
stanza = notification.data.presence
self._stanza_cache[stanza.sender.uri] = stanza
pidf_doc = self._build_pidf()
log.msg('Got XMPP NOTIFY from %s to %s for presence flow 0x%x' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self)))
for subscription in self._sip_subscriptions:
try:
subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc)
except SIPCoreError, e:
log.msg('Failed to send SIP NOTIFY from %s to %s for presence flow 0x%x: %s' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), e))
if not stanza.available:
# Only inform once about this device being unavailable
del self._stanza_cache[stanza.sender.uri]
def _NH_XMPPSubscriptionDidFail(self, notification):
notification.center.remove_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription = None
self.end()
_NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail
class InterruptSubscription(Exception): pass
class TerminateSubscription(Exception): pass
class SubscriptionError(Exception):
def __init__(self, error, timeout, refresh_interval=None, fatal=False):
self.error = error
self.refresh_interval = refresh_interval
self.timeout = timeout
self.fatal = fatal
class SIPSubscriptionDidFail(Exception):
def __init__(self, data):
self.data = data
class X2SPresenceHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity):
self.ended = False
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._sip_subscription = None
self._sip_subscription_proc = None
self._sip_subscription_timer = None
self._xmpp_subscription = None
def start(self):
notification_center = NotificationCenter()
self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.start()
self._command_proc = proc.spawn(self._run)
self._subscribe_sip()
notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self)
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_subscription is not None:
notification_center.remove_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.end()
self._xmpp_subscription = None
if self._sip_subscription:
self._unsubscribe_sip()
self.ended = True
notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self)
@run_in_green_thread
def _subscribe_sip(self):
command = Command('subscribe')
self._command_channel.send(command)
@run_in_green_thread
def _unsubscribe_sip(self):
command = Command('unsubscribe')
self._command_channel.send(command)
command.wait()
self._command_proc.kill()
self._command_proc = None
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_subscribe(self, command):
if self._sip_subscription_timer is not None and self._sip_subscription_timer.active():
self._sip_subscription_timer.cancel()
self._sip_subscription_timer = None
if self._sip_subscription_proc is not None:
subscription_proc = self._sip_subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._sip_subscription_timer is not None and self._sip_subscription_timer.active():
self._sip_subscription_timer.cancel()
self._sip_subscription_timer = None
if self._sip_subscription_proc is not None:
subscription_proc = self._sip_subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._sip_subscription_proc = None
command.signal()
def _process_pidf(self, body):
try:
pidf_doc = pidf.PIDF.parse(body)
except ParserError, e:
log.warn('Error parsing PIDF document: %s' % e)
return
# Build XML stanzas out of PIDF documents
try:
person = (p for p in pidf_doc.persons).next()
except StopIteration:
person = None
for service in pidf_doc.services:
sip_contact = self.sip_identity.uri.as_sip_uri()
if service.device_info is not None:
sip_contact.parameters['gr'] = 'urn:uuid:%s' % service.device_info.id
else:
sip_contact.parameters['gr'] = service.id
sender = Identity(FrozenURI.parse(sip_contact))
if service.status.extended is not None:
available = service.status.extended != 'offline'
else:
available = service.status.basic == 'open'
stanza = AvailabilityPresence(sender, self.xmpp_identity, available)
for note in service.notes:
stanza.statuses[note.lang] = note
if service.status.extended is not None:
if service.status.extended == 'away':
stanza.show = 'away'
elif service.status.extended == 'busy':
stanza.show = 'dnd'
elif person is not None and person.activities is not None:
activities = set(list(person.activities))
if 'away' in activities:
stanza.show = 'away'
elif set(('holiday', 'vacation')).intersection(activities):
stanza.show = 'xa'
elif 'busy' in activities:
stanza.show = 'dnd'
self._xmpp_subscription.send_presence(stanza)
def _sip_subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
account = AccountManager().sylkserver_account
refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval
try:
# Lookup routes
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError, e:
timeout = random.uniform(15, 30)
raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout)
timeout = time() + 30
for route in routes:
remaining_time = timeout - time()
if remaining_time > 0:
transport = route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
subscription_uri = self.sip_identity.uri.as_sip_uri()
subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()),
ToHeader(subscription_uri),
ContactHeader(contact_uri),
'presence',
RouteHeader(route.uri),
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=subscription)
raise SubscriptionError(error='Internal error', timeout=5)
self._sip_subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart':
break
except SIPSubscriptionDidFail, e:
notification_center.remove_observer(self, sender=subscription)
self._sip_subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120))
elif e.data.code == 403:
# Forbidden
raise SubscriptionError(error='Forbidden', timeout=None, fatal=True)
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > refresh_interval:
interval = e.data.min_expires
else:
interval = None
raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval)
elif e.data.code in (405, 406, 489):
raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True)
elif e.data.code == 1400:
raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True)
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, give up
raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True)
# At this point it is subscribed. Handle notifications and ending/failures.
try:
while True:
notification = self._data_channel.wait()
if notification.sender is not self._sip_subscription:
continue
if self._xmpp_subscription is None:
continue
if notification.name == 'SIPSubscriptionGotNotify':
if notification.data.event == 'presence':
subscription_state = notification.data.headers.get('Subscription-State').state
if subscription_state == 'active' and self._xmpp_subscription.state != 'active':
self._xmpp_subscription.accept()
elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active':
# The state went from active to pending, hide the presence state?
pass
if notification.data.body:
log.msg('Got SIP NOTIFY from %s to %s' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp')))
self._process_pidf(notification.data.body)
elif notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail, e:
if e.data.code == 0 and e.data.reason == 'rejected':
self._xmpp_subscription.reject()
else:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._sip_subscription)
except InterruptSubscription, e:
if not self.subscribed:
command.signal(e)
if self._sip_subscription is not None:
notification_center.remove_observer(self, sender=self._sip_subscription)
try:
self._sip_subscription.end(timeout=2)
except SIPCoreError:
pass
except TerminateSubscription, e:
if not self.subscribed:
command.signal(e)
if self._sip_subscription is not None:
try:
self._sip_subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._sip_subscription)
except SubscriptionError, e:
if not e.fatal:
self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval))
finally:
self.subscribed = False
self._sip_subscription = None
self._sip_subscription_proc = None
reactor.callLater(0, self.end)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
self._data_channel.send(notification)
def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification):
self.end()
def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification):
if self._sip_subscription is not None and self._sip_subscription.state.lower() == 'active':
self._xmpp_subscription.accept()
_NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 11:26 AM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3409143
Default Alt Text
(50 KB)

Event Timeline