diff --git a/sylk/applications/xmppgateway/__init__.py b/sylk/applications/xmppgateway/__init__.py index 5245870..1560c3d 100644 --- a/sylk/applications/xmppgateway/__init__.py +++ b/sylk/applications/xmppgateway/__init__.py @@ -1,496 +1,544 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter from application.python import Null from sipsimple.core import SIPURI, SIPCoreError from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.applications import SylkApplication from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource from sylk.applications.xmppgateway.im import SIPMessageSender, SIPMessageError, ChatSessionHandler from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.presence import S2XPresenceHandler, X2SPresenceHandler +from sylk.applications.xmppgateway.media import MediaSessionHandler from sylk.applications.xmppgateway.muc import X2SMucInvitationHandler, S2XMucInvitationHandler, X2SMucHandler from sylk.applications.xmppgateway.util import format_uri from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, NormalMessage class XMPPGatewayApplication(SylkApplication): implements(IObserver) def __init__(self): self.xmpp_manager = XMPPManager() self.pending_sessions = {} self.chat_sessions = set() + self.media_sessions = set() self.s2x_muc_sessions = {} self.x2s_muc_sessions = {} self.s2x_presence_subscriptions = {} self.x2s_presence_subscriptions = {} self.s2x_muc_add_participant_handlers = {} self.x2s_muc_add_participant_handlers = {} def start(self): - NotificationCenter().add_observer(self, sender=self.xmpp_manager) + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=self.xmpp_manager) + notification_center.add_observer(self, name='JingleSessionNewIncoming') self.xmpp_manager.start() def stop(self): - NotificationCenter().remove_observer(self, sender=self.xmpp_manager) + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=self.xmpp_manager) + notification_center.add_observer(self, name='JingleSessionNewIncoming') self.xmpp_manager.stop() def incoming_session(self, session): log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) - try: - msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next() - except StopIteration: - log.msg('Session rejected: Only MSRP media is supported') - session.reject(488, 'Only MSRP media is supported') - return + stream_types = set([stream.type for stream in session.proposed_streams]) + if 'chat' in stream_types: + self.incoming_chat_session(session) + elif 'audio' in stream_types or 'video' in stream_types: + self.incoming_media_session(session) + else: + log.msg('Session rejected: Unsupported media: %s' % stream_types) + session.reject(488) + + def incoming_chat_session(self, session): # Check if this session is really an invitation to add a participant to a conference room / muc if session.remote_identity.uri.host in self.xmpp_manager.muc_domains and 'isfocus' in session._invitation.remote_contact_header.parameters: try: referred_by_uri = SIPURI.parse(session.transfer_info.referred_by) except SIPCoreError: log.msg("SIP multiparty session invitation %s failed: invalid Referred-By header" % session._invitation.call_id) session.reject(488) return muc_uri = FrozenURI(session.remote_identity.uri.user, session.remote_identity.uri.host) inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) recipient_uri = FrozenURI(session.local_identity.uri.user, session.local_identity.uri.host) sender = Identity(muc_uri) recipient = Identity(recipient_uri) inviter = Identity(inviter_uri) try: handler = self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] except KeyError: handler = S2XMucInvitationHandler(session, sender, recipient, inviter) self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] = handler NotificationCenter().add_observer(self, sender=handler) handler.start() else: log.msg("SIP multiparty session invitation %s failed: there is another invitation in progress from %s to %s" % (session._invitation.call_id, format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'))) session.reject(480) return # Check domain if session.remote_identity.uri.host not in XMPPGatewayConfig.domains: log.msg('Session rejected: From domain is not a local XMPP domain') session.reject(606, 'Not Acceptable') return # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) # Get URI representing the XMPP side request_uri = session._invitation.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: pass else: # There is another pending session with same identifiers, can't accept this one log.msg('Session rejected: other session with same identifiers in progress') session.reject(488) return sip_identity = Identity(sip_leg_uri, session.remote_identity.display_name) handler = ChatSessionHandler.new_from_sip_session(sip_identity, session) NotificationCenter().add_observer(self, sender=handler) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler if xmpp_leg_uri.resource is not None: # Incoming session target contained GRUU, so create XMPPChatSession immediately xmpp_session = XMPPChatSession(local_identity=handler.sip_identity, remote_identity=Identity(xmpp_leg_uri)) handler.xmpp_identity = xmpp_session.remote_identity handler.xmpp_session = xmpp_session + def incoming_media_session(self, session): + if session.remote_identity.uri.host in self.xmpp_manager.muc_domains: + # Sessions to MUC are not allowed yet + session.reject(403) + return + if session.remote_identity.uri.host not in self.xmpp_manager.domains: + log.msg('Session rejected: From domain is not a local XMPP domain') + session.reject(606, 'Not Acceptable') + return + + handler = MediaSessionHandler.new_from_sip_session(session) + if handler is not None: + NotificationCenter().add_observer(self, sender=handler) + def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return log.msg('SIP subscription from %s to %s' % (format_uri(from_header.uri, 'sip'), format_uri(to_header.uri, 'xmpp'))) if subscribe_request.event != 'presence': log.msg('SIP subscription rejected: only presence event is supported') subscribe_request.reject(489) return # Check domain remote_identity_uri = data.headers['From'].uri if remote_identity_uri.host not in XMPPGatewayConfig.domains: log.msg('SIP subscription rejected: From domain is not a local XMPP domain') subscribe_request.reject(606) return # Get URI representing the SIP side sip_leg_uri = FrozenURI(remote_identity_uri.user, remote_identity_uri.host) # Get URI representing the XMPP side request_uri = data.request_uri xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host) try: handler = self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: sip_identity = Identity(sip_leg_uri, data.headers['From'].display_name) xmpp_identity = Identity(xmpp_leg_uri) handler = S2XPresenceHandler(sip_identity, xmpp_identity) self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] = handler NotificationCenter().add_observer(self, sender=handler) handler.start() handler.add_sip_subscription(subscribe_request) def incoming_referral(self, refer_request, data): refer_request.reject(405) def incoming_sip_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null).content_type from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (content_type, from_header, to_header): message_request.answer(400) return log.msg('New SIP Message from %s to %s' % (from_header.uri, to_header.uri)) # Check domain if from_header.uri.host not in XMPPGatewayConfig.domains: log.msg('Message rejected: From domain is not a local XMPP domain') message_request.answer(606) return if content_type == 'message/cpim': try: cpim_message = CPIMMessage.parse(data.body) except CPIMParserError: log.msg('Message rejected: CPIM parse error') message_request.answer(400) return else: body = cpim_message.body content_type = cpim_message.content_type sender = cpim_message.sender or from_header from_uri = sender.uri else: body = data.body from_uri = from_header.uri to_uri = str(to_header.uri) message_request.answer(200) if from_uri.parameters.get('gr', None) is None: from_uri = SIPURI.new(from_uri) from_uri.parameters['gr'] = generate_sylk_resource() sender = Identity(FrozenURI.parse(from_uri)) recipient = Identity(FrozenURI.parse(to_uri)) if content_type in ('text/plain', 'text/html'): if content_type == 'text/plain': html_body = None else: html_body = body body = None if XMPPGatewayConfig.use_msrp_for_chat: message = NormalMessage(sender, recipient, body, html_body, use_receipt=False) self.xmpp_manager.send_stanza(message) else: message = ChatMessage(sender, recipient, body, html_body, use_receipt=False) self.xmpp_manager.send_stanza(message) elif content_type == IsComposingDocument.content_type: if not XMPPGatewayConfig.use_msrp_for_chat: try: msg = IsComposingMessage.parse(body) except ParserError: pass else: state = 'composing' if msg.state == 'active' else 'paused' message = ChatComposingIndication(sender, recipient, state, use_receipt=False) self.xmpp_manager.send_stanza(message) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Out of band XMPP stanza handling @run_in_green_thread def _NH_XMPPGotChatMessage(self, notification): # This notification is only processed here untill the ChatSessionHandler # has both (SIP and XMPP) sessions established message = notification.data.message sender = message.sender recipient = message.recipient if XMPPGatewayConfig.use_msrp_for_chat: if recipient.uri.resource is None: # If recipient resource is not set the session is started from # the XMPP side sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Not found, need to create a new handler and a outgoing SIP session xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler NotificationCenter().add_observer(self, sender=handler) handler.enqueue_xmpp_message(message) else: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI(sender.uri.user, sender.uri.host) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI(recipient.uri.user, recipient.uri.host) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # It's a new XMPP session to a full JID, disregard the full JID and start a new SIP session to the bare JID xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler NotificationCenter().add_observer(self, sender=handler) handler.enqueue_xmpp_message(message) else: # Found handle, create XMPP session and establish session session = XMPPChatSession(local_identity=recipient, remote_identity=sender) handler.enqueue_xmpp_message(message) handler.xmpp_identity = session.remote_identity handler.xmpp_session = session else: sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) @run_in_green_thread def _NH_XMPPGotNormalMessage(self, notification): message = notification.data.message sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) @run_in_green_thread def _NH_XMPPGotComposingIndication(self, notification): composing_indication = notification.data.composing_indication sender = composing_indication.sender recipient = composing_indication.recipient if not XMPPGatewayConfig.use_msrp_for_chat: state = 'active' if composing_indication.state == 'composing' else 'idle' body = IsComposingMessage(state=state, refresh=composing_indication.interval or 30).toxml() message = NormalMessage(sender, recipient, body, IsComposingDocument.content_type) sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) def _NH_XMPPGotPresenceSubscriptionRequest(self, notification): stanza = notification.data.stanza # Disregard the resource part, the presence request could be a probe instead of a subscribe sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: handler = self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] except KeyError: xmpp_identity = stanza.sender xmpp_identity.uri = sender_uri_bare sip_identity = stanza.recipient handler = X2SPresenceHandler(sip_identity, xmpp_identity) self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] = handler notification.center.add_observer(self, sender=handler) handler.start() def _NH_XMPPGotMucJoinRequest(self, notification): stanza = notification.data.stanza muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) nickname = stanza.recipient.uri.resource try: handler = self.x2s_muc_sessions[(stanza.sender.uri, muc_uri)] except KeyError: xmpp_identity = stanza.sender sip_identity = stanza.recipient sip_identity.uri = muc_uri handler = X2SMucHandler(sip_identity, xmpp_identity, nickname) handler._first_stanza = stanza notification.center.add_observer(self, sender=handler) handler.start() # Check if there was a pending join request on the SIP side try: handler = self.s2x_muc_add_participant_handlers[(muc_uri, FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host))] except KeyError: pass else: handler.stop() def _NH_XMPPGotMucAddParticipantRequest(self, notification): sender = notification.data.sender recipient = notification.data.recipient participant = notification.data.participant muc_uri = FrozenURI(recipient.uri.user, recipient.uri.host) sender_uri = FrozenURI(sender.uri.user, sender.uri.host) participant_uri = FrozenURI(participant.uri.user, participant.uri.host) sender = Identity(sender_uri) recipient = Identity(muc_uri) participant = Identity(participant_uri) try: handler = self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] except KeyError: handler = X2SMucInvitationHandler(sender, recipient, participant) self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] = handler notification.center.add_observer(self, sender=handler) handler.start() # Chat session handling def _NH_ChatSessionDidStart(self, notification): handler = notification.sender log.msg('Chat session established sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) for k,v in self.pending_sessions.items(): if v is handler: del self.pending_sessions[k] break self.chat_sessions.add(handler) def _NH_ChatSessionDidEnd(self, notification): handler = notification.sender log.msg('Chat session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.chat_sessions.remove(handler) notification.center.remove_observer(self, sender=handler) def _NH_ChatSessionDidFail(self, notification): handler = notification.sender uris = None for k,v in self.pending_sessions.items(): if v is handler: uris = k del self.pending_sessions[k] break sip_uri, xmpp_uri = uris log.msg('Chat session failed sip:%s <--> xmpp:%s (%s)' % (sip_uri, xmpp_uri, notification.data.reason)) notification.center.remove_observer(self, sender=handler) # Presence handling def _NH_S2XPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) def _NH_S2XPresenceHandlerDidEnd(self, notification): handler = notification.sender self.s2x_presence_subscriptions.pop((handler.sip_identity.uri, handler.xmpp_identity.uri), None) notification.center.remove_observer(self, sender=handler) log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) def _NH_X2SPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) def _NH_X2SPresenceHandlerDidEnd(self, notification): handler = notification.sender self.x2s_presence_subscriptions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification.center.remove_observer(self, sender=handler) log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) # MUC handling def _NH_X2SMucHandlerDidStart(self, notification): handler = notification.sender log.msg('Multiparty session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_muc_sessions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler def _NH_X2SMucHandlerDidEnd(self, notification): handler = notification.sender log.msg('Multiparty session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_muc_sessions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification.center.remove_observer(self, sender=handler) def _NH_X2SMucInvitationHandlerDidStart(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.msg('%s invited %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'))) def _NH_X2SMucInvitationHandlerDidEnd(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.msg('%s added %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'))) del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] notification.center.remove_observer(self, sender=handler) def _NH_X2SMucInvitationHandlerDidFail(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'), notification.data.failure)) del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] notification.center.remove_observer(self, sender=handler) def _NH_S2XMucInvitationHandlerDidStart(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.msg("%s invited %s to multiparty chat %s" % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'))) def _NH_S2XMucInvitationHandlerDidEnd(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.msg('%s added %s to multiparty chat %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'))) del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] notification.center.remove_observer(self, sender=handler) def _NH_S2XMucInvitationHandlerDidFail(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'), str(notification.data.failure))) del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] notification.center.remove_observer(self, sender=handler) + # Media sessions + + def _NH_JingleSessionNewIncoming(self, notification): + session = notification.sender + handler = MediaSessionHandler.new_from_jingle_session(session) + if handler is not None: + notification.center.add_observer(self, sender=handler) + + def _NH_MediaSessionHandlerDidStart(self, notification): + handler = notification.sender + log.msg('Media session started sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) + self.media_sessions.add(handler) + + def _NH_MediaSessionHandlerDidEnd(self, notification): + handler = notification.sender + log.msg('Media session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) + self.media_sessions.remove(handler) + notification.center.remove_observer(self, sender=handler) + + def _NH_MediaSessionHandlerDidFail(self, notification): + handler = notification.sender + log.msg('Media session failed sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) + notification.center.remove_observer(self, sender=handler) + diff --git a/sylk/applications/xmppgateway/media.py b/sylk/applications/xmppgateway/media.py new file mode 100644 index 0000000..46fe1aa --- /dev/null +++ b/sylk/applications/xmppgateway/media.py @@ -0,0 +1,315 @@ +# Copyright (C) 2013 AG Projects. See LICENSE for details +# + +from application.notification import IObserver, NotificationCenter, NotificationData +from application.python import Null +from eventlib.twistedutil import block_on +from sipsimple.account import AccountManager +from sipsimple.conference import AudioConference +from sipsimple.configuration.settings import SIPSimpleSettings +from sipsimple.core import ContactHeader, FromHeader, ToHeader +from sipsimple.core import Engine, SIPURI +from sipsimple.lookup import DNSLookup, DNSLookupError +from sipsimple.streams import MediaStreamRegistry as SIPMediaStreamRegistry +from sipsimple.threading import run_in_twisted_thread +from sipsimple.threading.green import run_in_green_thread +from zope.interface import implements + +from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource, decode_resource +from sylk.applications.xmppgateway.logger import log +from sylk.applications.xmppgateway.xmpp import XMPPManager +from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession +from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry as JingleMediaStreamRegistry +from sylk.applications.xmppgateway.xmpp.stanzas import jingle +from sylk.configuration import SIPConfig +from sylk.session import Session + + +__all__ = ['MediaSessionHandler'] + + +class MediaSessionHandler(object): + implements(IObserver) + + def __init__(self): + self.started = False + self.ended = False + + self._sip_identity = None + self._xmpp_identity = None + + self._audio_bidge = AudioConference() + self.sip_session = None + self.jingle_session = None + + @classmethod + def new_from_sip_session(cls, session): + proposed_stream_types = set([stream.type for stream in session.proposed_streams]) + streams = [] + for stream_type in proposed_stream_types: + try: + klass = JingleMediaStreamRegistry().get(stream_type) + except Exception: + continue + streams.append(klass()) + if not streams: + session.reject(488) + return None + session.send_ring_indication() + instance = cls() + NotificationCenter().add_observer(instance, sender=session) + # Get URI representing the SIP side + contact_uri = session._invitation.remote_contact_header.uri + if contact_uri.parameters.get('gr') is not None: + sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) + else: + tmp = session.remote_identity.uri + sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) + instance._sip_identity = Identity(sip_leg_uri) + # Get URI representing the XMPP side + request_uri = session._invitation.request_uri + remote_resource = request_uri.parameters.get('gr', None) + if remote_resource is not None: + try: + remote_resource = decode_resource(remote_resource) + except (TypeError, UnicodeError): + remote_resource = None + xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) + instance._xmpp_identity = Identity(xmpp_leg_uri) + instance.sip_session = session + instance._start_outgoing_jingle_session(streams) + return instance + + @classmethod + def new_from_jingle_session(cls, session): + proposed_stream_types = set([stream.type for stream in session.proposed_streams]) + streams = [] + for stream_type in proposed_stream_types: + try: + klass = SIPMediaStreamRegistry().get(stream_type) + except Exception: + continue + streams.append(klass()) + if not streams: + session.reject('unsupported-applications') + return None + session.send_ring_indication() + instance = cls() + NotificationCenter().add_observer(instance, sender=session) + instance._xmpp_identity = session.remote_identity + instance._sip_identity = session.local_identity + instance.jingle_session = session + instance._start_outgoing_sip_session(streams) + return instance + + @property + def sip_identity(self): + return self._sip_identity + + @property + def xmpp_identity(self): + return self._xmpp_identity + + def _set_started(self, value): + old_value = self.__dict__.get('started', False) + self.__dict__['started'] = value + if not old_value and value: + NotificationCenter().post_notification('MediaSessionHandlerDidStart', sender=self) + def _get_started(self): + return self.__dict__['started'] + started = property(_get_started, _set_started) + del _get_started, _set_started + + @run_in_green_thread + def _start_outgoing_sip_session(self, streams): + notification_center = NotificationCenter() + # self.xmpp_identity is our local identity on the SIP side + from_uri = self.xmpp_identity.uri.as_sip_uri() + del from_uri.parameters['gr'] # no GRUU in From header + to_uri = self.sip_identity.uri.as_sip_uri() + del to_uri.parameters['gr'] # no GRUU in To header + # TODO: need to fix GRUU in the proxy + #contact_uri = self.xmpp_identity.uri.as_sip_uri() + #contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) + lookup = DNSLookup() + settings = SIPSimpleSettings() + account = AccountManager().sylkserver_account + if account.sip.outbound_proxy is not None: + uri = SIPURI(host=account.sip.outbound_proxy.host, + port=account.sip.outbound_proxy.port, + parameters={'transport': account.sip.outbound_proxy.transport}) + else: + uri = to_uri + try: + routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() + except DNSLookupError: + log.warning('DNS lookup error while looking for %s proxy' % uri) + notification_center.post_notification('MedialSessionHandlerDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) + return + route = routes.pop(0) + from_header = FromHeader(from_uri) + to_header = ToHeader(to_uri) + transport = route.transport + parameters = {} if transport=='udp' else {'transport': transport} + contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) + contact_header = ContactHeader(contact_uri) + self.sip_session = Session(account) + notification_center.add_observer(self, sender=self.sip_session) + self.sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=streams) + + @run_in_green_thread + def _start_outgoing_jingle_session(self, streams): + if self.xmpp_identity.uri.resource is not None: + self.sip_session.reject() + return + xmpp_manager = XMPPManager() + local_jid = self.sip_identity.uri.as_xmpp_jid() + remote_jid = self.xmpp_identity.uri.as_xmpp_jid() + + # Use disco to gather potential JIDs to call + d = xmpp_manager.disco_client_protocol.requestItems(remote_jid, sender=local_jid) + try: + items = block_on(d) + except Exception: + items = [] + if not items: + self.sip_session.reject(480) + return + + # Check which items support Jingle + valid = [] + for item in items: + d = xmpp_manager.disco_client_protocol.requestInfo(item.entity, nodeIdentifier=item.nodeIdentifier, sender=local_jid) + try: + info = block_on(d) + except Exception: + continue + if jingle.NS_JINGLE in info.features and jingle.NS_JINGLE_APPS_RTP in info.features: + valid.append(item.entity) + if not valid: + self.sip_session.reject(480) + return + + # TODO: start multiple sessions? + self._xmpp_identity = Identity(FrozenURI.parse(valid[0])) + + notification_center = NotificationCenter() + self.jingle_session = JingleSession(xmpp_manager.jingle_protocol) + notification_center.add_observer(self, sender=self.jingle_session) + self.jingle_session.connect(self.sip_identity, self.xmpp_identity, streams) + + def end(self): + if self.ended: + return + notification_center = NotificationCenter() + if self.sip_session is not None: + notification_center.remove_observer(self, sender=self.sip_session) + if self.sip_session.direction == 'incoming' and not self.started: + self.sip_session.reject() + else: + self.sip_session.end() + self.sip_session = None + if self.jingle_session is not None: + notification_center.remove_observer(self, sender=self.jingle_session) + if self.jingle_session.direction == 'incoming' and not self.started: + self.jingle_session.reject() + else: + self.jingle_session.end() + self.jingle_session = None + self.ended = True + if self.started: + notification_center.post_notification('MediaSessionHandlerDidEnd', sender=self) + else: + notification_center.post_notification('MediaSessionHandlerDidFail', sender=self) + + @run_in_twisted_thread + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_SIPSessionDidStart(self, notification): + log.msg("SIP session %s started" % notification.sender._invitation.call_id) + if self.sip_session.direction == 'outgoing': + # Time to accept the Jingle session and bridge them together + try: + audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') + except StopIteration: + pass + else: + self._audio_bidge.add(audio_stream) + self.jingle_session.accept(self.jingle_session.proposed_streams) + else: + # Both sessions have been accepted now + self.started = True + try: + audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') + except StopIteration: + pass + else: + self._audio_bidge.add(audio_stream) + + def _NH_SIPSessionDidEnd(self, notification): + log.msg("SIP session %s ended" % notification.sender._invitation.call_id) + notification.center.remove_observer(self, sender=self.sip_session) + self.sip_session = None + self.end() + + def _NH_SIPSessionDidFail(self, notification): + log.msg("SIP session %s failed (%s)" % (notification.sender._invitation.call_id, notification.data.reason)) + notification.center.remove_observer(self, sender=self.sip_session) + self.sip_session = None + self.end() + + def _NH_SIPSessionGotProposal(self, notification): + self.sip_session.reject_proposal() + + def _NH_SIPSessionTransferNewIncoming(self, notification): + self.sip_session.reject_transfer(403) + + def _NH_SIPSessionDidChangeHoldState(self, notification): + if notification.data.originator == 'remote': + if notification.data.on_hold: + self.jingle_session.hold() + else: + self.jingle_session.unhold() + + def _NH_JingleSessionDidStart(self, notification): + log.msg("Jingle session %s started" % notification.sender.id) + if self.jingle_session.direction == 'incoming': + # Both sessions have been accepted now + self.started = True + try: + audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') + except StopIteration: + pass + else: + self._audio_bidge.add(audio_stream) + else: + # Time to accept the Jingle session and bridge them together + try: + audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') + except StopIteration: + pass + else: + self._audio_bidge.add(audio_stream) + self.sip_session.accept(self.sip_session.proposed_streams) + + def _NH_JingleSessionDidEnd(self, notification): + log.msg("Jingle session %s ended" % notification.sender.id) + notification.center.remove_observer(self, sender=self.jingle_session) + self.jingle_session = None + self.end() + + def _NH_JingleSessionDidFail(self, notification): + log.msg("Jingle session %s failed (%s)" % (notification.sender.id, notification.data.reason)) + notification.center.remove_observer(self, sender=self.jingle_session) + self.jingle_session = None + self.end() + + def _NH_JingleSessionDidChangeHoldState(self, notification): + if notification.data.originator == 'remote': + if notification.data.on_hold: + self.sip_session.hold() + else: + self.sip_session.unhold() + diff --git a/sylk/applications/xmppgateway/xmpp/__init__.py b/sylk/applications/xmppgateway/xmpp/__init__.py index b1cab7c..162ccd2 100644 --- a/sylk/applications/xmppgateway/xmpp/__init__.py +++ b/sylk/applications/xmppgateway/xmpp/__init__.py @@ -1,257 +1,319 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from twisted.internet import reactor +from wokkel.disco import DiscoClientProtocol from wokkel.generic import FallbackHandler, VersionHandler from wokkel.ping import PingHandler from wokkel.server import ServerService from zope.interface import implements from sylk import __version__ as SYLK_VERSION from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import FrozenURI from sylk.applications.xmppgateway.logger import log -from sylk.applications.xmppgateway.xmpp.protocols import DiscoProtocol, MessageProtocol, MUCServerProtocol, PresenceProtocol +from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession, JingleSessionManager +from sylk.applications.xmppgateway.xmpp.protocols import DiscoProtocol, JingleProtocol, MessageProtocol, MUCServerProtocol, PresenceProtocol from sylk.applications.xmppgateway.xmpp.server import SylkInternalComponent, SylkRouter, SylkS2SServerFactory, xmpp_logger from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager, XMPPMucSessionManager from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscriptionManager class XMPPManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): config = XMPPGatewayConfig self.stopped = False self.domains = set(config.domains) self.muc_domains = set(['%s.%s' % (config.muc_prefix, domain) for domain in self.domains]) router = SylkRouter() self._server_service = ServerService(router) self._server_service.domains = self.domains | self.muc_domains self._server_service.logTraffic = False # done manually self._s2s_factory = SylkS2SServerFactory(self._server_service) self._s2s_factory.logTraffic = False # done manually # Setup internal components self._internal_component = SylkInternalComponent(router) self._internal_component.domains = self.domains self._internal_component.manager = self self._muc_component = SylkInternalComponent(router) self._muc_component.domains = self.muc_domains self._muc_component.manager = self # Setup protocols self.message_protocol = MessageProtocol() self.message_protocol.setHandlerParent(self._internal_component) self.presence_protocol = PresenceProtocol() self.presence_protocol.setHandlerParent(self._internal_component) self.disco_protocol = DiscoProtocol() self.disco_protocol.setHandlerParent(self._internal_component) + self.disco_client_protocol = DiscoClientProtocol() + self.disco_client_protocol.setHandlerParent(self._internal_component) + self.muc_protocol = MUCServerProtocol() self.muc_protocol.setHandlerParent(self._muc_component) self.disco_muc_protocol = DiscoProtocol() self.disco_muc_protocol.setHandlerParent(self._muc_component) self.version_protocol = VersionHandler('SylkServer', SYLK_VERSION) self.version_protocol.setHandlerParent(self._internal_component) self.fallback_protocol = FallbackHandler() self.fallback_protocol.setHandlerParent(self._internal_component) self.fallback_muc_protocol = FallbackHandler() self.fallback_muc_protocol.setHandlerParent(self._muc_component) self.ping_protocol = PingHandler() self.ping_protocol.setHandlerParent(self._internal_component) + self.jingle_protocol = JingleProtocol() + self.jingle_protocol.setHandlerParent(self._internal_component) + self._s2s_listener = None self.chat_session_manager = XMPPChatSessionManager() self.muc_session_manager = XMPPMucSessionManager() self.subscription_manager = XMPPSubscriptionManager() + self.jingle_session_manager = JingleSessionManager() def start(self): self.stopped = False xmpp_logger.start() config = XMPPGatewayConfig if config.trace_xmpp and xmpp_logger._xmpptrace_filename is not None: log.msg('Logging XMPP trace to file "%s"' % xmpp_logger._xmpptrace_filename) self._s2s_listener = reactor.listenTCP(config.local_port, self._s2s_factory, interface=config.local_ip) listen_address = self._s2s_listener.getHost() log.msg("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port)) self.chat_session_manager.start() self.muc_session_manager.start() self.subscription_manager.start() + self.jingle_session_manager.start() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._internal_component) notification_center.add_observer(self, sender=self._muc_component) self._internal_component.startService() self._muc_component.startService() def stop(self): self.stopped = True self._s2s_listener.stopListening() + self.jingle_session_manager.stop() self.subscription_manager.stop() self.muc_session_manager.stop() self.chat_session_manager.stop() self._internal_component.stopService() self._muc_component.stopService() notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._internal_component) notification_center.remove_observer(self, sender=self._muc_component) xmpp_logger.stop() def send_stanza(self, stanza): if self.stopped: return self._internal_component.send(stanza.to_xml_element()) def send_muc_stanza(self, stanza): if self.stopped: return self._muc_component.send(stanza.to_xml_element()) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Process message stanzas def _NH_XMPPGotChatMessage(self, notification): message = notification.data.message try: session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)] except KeyError: notification.center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data) else: session.channel.send(message) def _NH_XMPPGotNormalMessage(self, notification): notification.center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data) def _NH_XMPPGotComposingIndication(self, notification): composing_indication = notification.data.composing_indication try: session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)] except KeyError: notification.center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data) else: session.channel.send(composing_indication) def _NH_XMPPGotErrorMessage(self, notification): error_message = notification.data.error_message try: session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)] except KeyError: notification.center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data) else: session.channel.send(error_message) def _NH_XMPPGotReceipt(self, notification): receipt = notification.data.receipt try: session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)] except KeyError: pass else: session.channel.send(receipt) # Process presence stanzas def _NH_XMPPGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: # Ignore incoming presence stanzas if there is no subscription pass else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceSubscriptionStatus(self, notification): stanza = notification.data.presence_stanza if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None: # Skip directed presence return if stanza.type in ('subscribed', 'unsubscribed'): try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: pass else: subscription.channel.send(stanza) elif stanza.type in ('subscribe', 'unsubscribe'): try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: if stanza.type == 'subscribe': notification.center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza)) else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceProbe(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: notification.center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza)) else: subscription.channel.send(stanza) # Process muc stanzas def _NH_XMPPMucGotGroupChat(self, notification): message = notification.data.message muc_uri = FrozenURI(message.recipient.uri.user, message.recipient.uri.host) try: session = self.muc_session_manager.incoming[(muc_uri, message.sender.uri)] except KeyError: # Ignore groupchat messages if there was no session created pass else: session.channel.send(message) def _NH_XMPPMucGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if not stanza.sender.uri.resource: return muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) try: session = self.muc_session_manager.incoming[(muc_uri, stanza.sender.uri)] except KeyError: if stanza.available: notification.center.post_notification('XMPPGotMucJoinRequest', sender=self, data=NotificationData(stanza=stanza)) else: notification.center.post_notification('XMPPGotMucLeaveRequest', sender=self, data=NotificationData(stanza=stanza)) else: session.channel.send(stanza) def _NH_XMPPMucGotInvitation(self, notification): invitation = notification.data.invitation data = NotificationData(sender=invitation.sender, recipient=invitation.recipient, participant=invitation.invited_user) notification.center.post_notification('XMPPGotMucAddParticipantRequest', sender=self, data=data) + # Jingle + + def _NH_XMPPGotJingleSessionInitiate(self, notification): + stanza = notification.data.stanza + try: + self.jingle_session_manager.sessions[stanza.jingle.sid] + except KeyError: + session = JingleSession(self.jingle_protocol) + session.init_incoming(stanza) + session.send_ring_indication() + + def _NH_XMPPGotJingleSessionTerminate(self, notification): + stanza = notification.data.stanza + try: + session = self.jingle_session_manager.sessions[stanza.jingle.sid] + except KeyError: + return + session.handle_notification(notification) + + def _NH_XMPPGotJingleSessionInfo(self, notification): + stanza = notification.data.stanza + try: + session = self.jingle_session_manager.sessions[stanza.jingle.sid] + except KeyError: + return + session.handle_notification(notification) + + def _NH_XMPPGotJingleSessionAccept(self, notification): + stanza = notification.data.stanza + try: + session = self.jingle_session_manager.sessions[stanza.jingle.sid] + except KeyError: + return + session.handle_notification(notification) + + def _NH_XMPPGotJingleDescriptionInfo(self, notification): + stanza = notification.data.stanza + try: + session = self.jingle_session_manager.sessions[stanza.jingle.sid] + except KeyError: + return + session.handle_notification(notification) + + def _NH_XMPPGotJingleTransportInfo(self, notification): + stanza = notification.data.stanza + try: + session = self.jingle_session_manager.sessions[stanza.jingle.sid] + except KeyError: + return + session.handle_notification(notification) + diff --git a/sylk/applications/xmppgateway/xmpp/jingle/__init__.py b/sylk/applications/xmppgateway/xmpp/jingle/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sylk/applications/xmppgateway/xmpp/jingle/session.py b/sylk/applications/xmppgateway/xmpp/jingle/session.py new file mode 100644 index 0000000..611a585 --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/jingle/session.py @@ -0,0 +1,730 @@ +# Copyright (C) 2013 AG Projects. See LICENSE for details +# + +import random +import string + +from application.notification import IObserver, NotificationCenter, NotificationData +from application.python import Null +from application.python.types import Singleton +from datetime import datetime +from eventlib import api, coros, proc +from eventlib.twistedutil import block_on +from sipsimple.account import AccountManager +from sipsimple.configuration.settings import SIPSimpleSettings +from sipsimple.core import SDPSession, SDPMediaStream, SDPConnection, SDPNegotiator +from sipsimple.core import SIPCoreError +from sipsimple.threading import run_in_twisted_thread +from twisted.internet import reactor +from twisted.words.protocols.jabber.error import StanzaError +from twisted.words.protocols.jabber.xmlstream import TimeoutError as IqTimeoutError +from zope.interface import implements + +from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI +from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError +from sylk.applications.xmppgateway.xmpp.jingle.util import jingle_to_sdp, sdp_to_jingle +from sylk.applications.xmppgateway.xmpp.stanzas import jingle +from sylk.configuration import SIPConfig + + +def random_id(): + return ''.join(random.choice(string.ascii_letters+string.digits) for x in xrange(32)) + + +class MediaStreamDidFailError(Exception): + def __init__(self, stream, data): + self.stream = stream + self.data = data + + +class Operation(object): + __params__ = () + + def __init__(self, **params): + for name, value in params.iteritems(): + setattr(self, name, value) + for param in set(self.__params__).difference(params): + raise ValueError("missing operation parameter: '%s'" % param) + self.channel = coros.queue() + + +class AcceptOperation(Operation): + __params__ = ('streams',) + + +class SendRingIndicationOperation(Operation): + __params__ = () + + +class RejectOperation(Operation): + __params__ = ('reason',) + + +class EndOperation(Operation): + __params__ = () + + +class HoldOperation(Operation): + __params__ = () + + +class UnholdOperation(Operation): + __params__ = () + + +class ProcessRemoteOperation(Operation): + __params__ = ('notification',) + + +class ConnectOperation(Operation): + __params__ = ('sender', 'recipient', 'streams') + + +class JingleSession(object): + implements(IObserver) + + jingle_stanza_timeout = 3 + media_stream_timeout = 15 + + def __init__(self, protocol): + self.account = AccountManager().sylkserver_account + self._protocol = protocol + + self._id = None + self._local_identity = None + self._remote_identity = None + self._local_jid = None + self._remote_jid = None + + self._channel = coros.queue() + self._current_operation = None + self._proc = proc.spawn(self._run) + self._timer = None + + self._sdp_negotiator = None + self._pending_transport_info_stanzas = [] + + self.direction = None + self.state = None + self.streams = None + self.proposed_streams = None + self.start_time = None + self.end_time = None + self.on_hold = False + + def init_incoming(self, stanza): + self._id = stanza.jingle.sid + self._local_identity = Identity(FrozenURI.parse(stanza.recipient)) + self._remote_identity = Identity(FrozenURI.parse(stanza.sender)) + self._local_jid = self._local_identity.uri.as_xmpp_jid() + self._remote_jid = self._remote_identity.uri.as_xmpp_jid() + + remote_sdp = jingle_to_sdp(stanza.jingle) + try: + self._sdp_negotiator = SDPNegotiator.create_with_remote_offer(remote_sdp) + except SIPCoreError, e: + self._fail(originator='local', reason='general-error', description=str(e)) + return + + self.proposed_streams = [] + for index, media_stream in enumerate(remote_sdp.media): + if media_stream.port != 0: + for stream_type in MediaStreamRegistry(): + try: + stream = stream_type.new_from_sdp(self, remote_sdp, index) + except InvalidStreamError: + break + except UnknownStreamError: + continue + else: + stream.index = index + self.proposed_streams.append(stream) + break + + if self.proposed_streams: + self.direction = 'incoming' + self.state = 'incoming' + NotificationCenter().post_notification('JingleSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams)) + else: + self._fail(originator='local', reason='unsupported-applications') + + def connect(self, sender_identity, recipient_identity, streams): + self._schedule_operation(ConnectOperation(sender=sender_identity, recipient=recipient_identity, streams=streams)) + + def send_ring_indication(self): + self._schedule_operation(SendRingIndicationOperation()) + + def accept(self, streams): + self._schedule_operation(AcceptOperation(streams=streams)) + + def reject(self, reason='busy'): + self._schedule_operation(RejectOperation(reason=reason)) + + def hold(self): + self._schedule_operation(HoldOperation()) + + def unhold(self): + self._schedule_operation(UnholdOperation()) + + def end(self): + self._schedule_operation(EndOperation()) + + def add_stream(self): + raise NotImplementedError + + def remove_stream(self): + raise NotImplementedError + + @property + def id(self): + return self._id + + @property + def local_identity(self): + return self._local_identity + + @property + def remote_identity(self): + return self._remote_identity + + def _send_stanza(self, stanza): + if self.direction == 'incoming': + stanza.jingle.initiator = unicode(self._remote_jid) + stanza.jingle.responder = unicode(self._local_jid) + else: + stanza.jingle.initiator = unicode(self._local_jid) + stanza.jingle.responder = unicode(self._remote_jid) + stanza.timeout = self.jingle_stanza_timeout + return self._protocol.request(stanza) + + def _fail(self, originator='local', reason='general-error', description=None): + reason = jingle.Reason(jingle.ReasonType(reason), text=description) + stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason) + self._send_stanza(stanza) + self.state = 'terminated' + failure_str = '%s%s' % (reason, ' %s' % description if description else '') + NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=failure_str)) + self._channel.send_exception(proc.ProcExit) + + @run_in_twisted_thread + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_MediaStreamDidInitialize(self, notification): + if self._current_operation is not None: + self._current_operation.channel.send(notification) + + def _NH_MediaStreamDidStart(self, notification): + if self._current_operation is not None: + self._current_operation.channel.send(notification) + + def _NH_MediaStreamDidFail(self, notification): + if self._current_operation is not None: + self._current_operation.channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) + else: + self.end() + + def _NH_XMPPGotJingleSessionAccept(self, notification): + self._schedule_operation(ProcessRemoteOperation(notification=notification)) + + def _NH_XMPPGotJingleSessionTerminate(self, notification): + self._schedule_operation(ProcessRemoteOperation(notification=notification)) + + def _NH_XMPPGotJingleSessionInfo(self, notification): + self._schedule_operation(ProcessRemoteOperation(notification=notification)) + + def _NH_XMPPGotJingleDescriptionInfo(self, notification): + self._schedule_operation(ProcessRemoteOperation(notification=notification)) + + def _NH_XMPPGotJingleTransportInfo(self, notification): + self._schedule_operation(ProcessRemoteOperation(notification=notification)) + + # Operation handling + + @run_in_twisted_thread + def _schedule_operation(self, operation): + self._channel.send(operation) + + def _run(self): + while True: + self._current_operation = op = self._channel.wait() + try: + handler = getattr(self, '_OH_%s' % op.__class__.__name__) + handler(op) + except BaseException: + self._proc = None + raise + finally: + self._current_operation = None + + def _OH_AcceptOperation(self, operation): + if self.state != 'incoming': + return + + notification_center = NotificationCenter() + settings = SIPSimpleSettings() + streams = operation.streams + + for stream in self.proposed_streams: + if stream in streams: + notification_center.add_observer(self, sender=stream) + stream.initialize(self, direction='incoming') + + try: + wait_count = len(self.proposed_streams) + while wait_count > 0: + notification = operation.channel.wait() + if notification.name == 'MediaStreamDidInitialize': + wait_count -= 1 + + remote_sdp = self._sdp_negotiator.current_remote + local_ip = SIPConfig.local_ip.normalized + local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent) + stream_map = dict((stream.index, stream) for stream in self.proposed_streams) + for index, media in enumerate(remote_sdp.media): + stream = stream_map.get(index, None) + if stream is not None: + media = stream.get_local_media(for_offer=False) + else: + media = SDPMediaStream.new(media) + media.port = 0 + media.attributes = [] + local_sdp.media.append(media) + try: + self._sdp_negotiator.set_local_answer(local_sdp) + self._sdp_negotiator.negotiate() + except SIPCoreError, e: + self._fail(originator='local', reason='incompatible-parameters', description=str(e)) + return + + notification_center.post_notification('JingleSessionWillStart', sender=self) + + # Get active SDPs (negotiator may make changes) + local_sdp = self._sdp_negotiator.active_local + remote_sdp = self._sdp_negotiator.active_remote + + # Build the payload and send it over + payload = sdp_to_jingle(local_sdp) + payload.sid = self._id + stanza = self._protocol.sessionAccept(self._local_jid, self._remote_jid, payload) + d = self._send_stanza(stanza) + block_on(d) + + wait_count = 0 + stream_map = dict((stream.index, stream) for stream in self.proposed_streams) + for index, local_media in enumerate(local_sdp.media): + remote_media = remote_sdp.media[index] + stream = stream_map.get(index, None) + if stream is not None: + if remote_media.port: + wait_count += 1 + stream.start(local_sdp, remote_sdp, index) + else: + notification_center.remove_observer(self, sender=stream) + self.proposed_streams.remove(stream) + del stream_map[stream.index] + stream.deactivate() + stream.end() + removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] + for stream in removed_streams: + notification_center.remove_observer(self, sender=stream) + self.proposed_streams.remove(stream) + del stream_map[stream.index] + stream.deactivate() + stream.end() + with api.timeout(self.media_stream_timeout): + while wait_count > 0: + notification = operation.channel.wait() + if notification.name == 'MediaStreamDidStart': + wait_count -= 1 + except (MediaStreamDidFailError, api.TimeoutError, IqTimeoutError, StanzaError), e: + for stream in self.proposed_streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + if isinstance(e, api.TimeoutError): + error = 'media stream timed out while starting' + elif isinstance(e, IqTimeoutError): + error = 'timeout sending IQ stanza' + elif isinstance(e, StanzaError): + error = str(e.condition) + else: + error = 'media stream failed: %s' % e.data.reason + self._fail(originator='local', reason='failed-application', description=error) + else: + self.state = 'connected' + self.streams = self.proposed_streams + self.proposed_streams = None + self.start_time = datetime.now() + notification_center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams)) + + def _OH_ConnectOperation(self, operation): + if self.state is not None: + return + + settings = SIPSimpleSettings() + notification_center = NotificationCenter() + + self.direction = 'outgoing' + self.state = 'connecting' + self.proposed_streams = operation.streams + self._id = random_id() + self._local_identity = operation.sender + self._remote_identity = operation.recipient + self._local_jid = self._local_identity.uri.as_xmpp_jid() + self._remote_jid = self._remote_identity.uri.as_xmpp_jid() + + notification_center.post_notification('JingleSessionNewOutgoing', self, NotificationData(streams=operation.streams)) + + for stream in self.proposed_streams: + notification_center.add_observer(self, sender=stream) + stream.initialize(self, direction='outgoing') + + try: + wait_count = len(self.proposed_streams) + while wait_count > 0: + notification = operation.channel.wait() + if notification.name == 'MediaStreamDidInitialize': + wait_count -= 1 + # Build local SDP and negotiator + local_ip = SIPConfig.local_ip.normalized + local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent) + for index, stream in enumerate(self.proposed_streams): + stream.index = index + media = stream.get_local_media(for_offer=True) + local_sdp.media.append(media) + self._sdp_negotiator = SDPNegotiator.create_with_local_offer(local_sdp) + # Build the payload and send it over + payload = sdp_to_jingle(local_sdp) + payload.sid = self._id + stanza = self._protocol.sessionInitiate(self._local_jid, self._remote_jid, payload) + d = self._send_stanza(stanza) + block_on(d) + except (MediaStreamDidFailError, IqTimeoutError, StanzaError, SIPCoreError), e: + for stream in self.proposed_streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + if isinstance(e, IqTimeoutError): + error = 'timeout sending IQ stanza' + elif isinstance(e, StanzaError): + error = str(e.condition) + elif isinstance(e, SIPCoreError): + error = str(e) + else: + error = 'media stream failed: %s' % e.data.reason + self.state = 'terminated' + NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=error)) + self._channel.send_exception(proc.ProcExit) + else: + self._timer = reactor.callLater(settings.sip.invite_timeout, self.end) + + def _OH_RejectOperation(self, operation): + if self.state != 'incoming': + return + reason = jingle.Reason(jingle.ReasonType(operation.reason)) + stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason) + self._send_stanza(stanza) + self.state = 'terminated' + self._channel.send_exception(proc.ProcExit) + + def _OH_EndOperation(self, operation): + if self.state not in ('connecting', 'connected'): + return + + if self._timer is not None and self._timer.active(): + self._timer.cancel() + self._timer = None + + prev_state = self.state + self.state = 'terminating' + notification_center = NotificationCenter() + notification_center.post_notification('JingleSessionWillEnd', self) + + streams = (self.streams or []) + (self.proposed_streams or []) + for stream in streams[:]: + try: + notification_center.remove_observer(self, sender=stream) + except KeyError: + streams.remove(stream) + else: + stream.deactivate() + + if prev_state == 'connected': + reason = jingle.Reason(jingle.ReasonType('success')) + else: + reason = jingle.Reason(jingle.ReasonType('cancel')) + stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason) + self._send_stanza(stanza) + + self.state = 'terminated' + if prev_state == 'connected': + self.end_time = datetime.now() + notification_center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='local')) + else: + notification_center.post_notification('JingleSessionDidFail', self, NotificationData(originator='local', reason='cancel')) + + for stream in streams: + stream.end() + self._channel.send_exception(proc.ProcExit) + + def _OH_SendRingIndicationOperation(self, operation): + if self.state != 'incoming': + return + stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('ringing')) + self._send_stanza(stanza) + + def _OH_HoldOperation(self, operation): + if self.state != 'connected': + return + if self.on_hold: + return + self.on_hold = True + for stream in self.streams: + stream.hold() + stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('hold')) + self._send_stanza(stanza) + NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=False)) + + def _OH_UnholdOperation(self, operation): + if self.state != 'connected': + return + if not self.on_hold: + return + self.on_hold = False + for stream in self.streams: + stream.unhold() + stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('unhold')) + self._send_stanza(stanza) + NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) + + def _OH_ProcessRemoteOperation(self, operation): + notification = operation.notification + stanza = notification.data.stanza + if notification.name == 'XMPPGotJingleSessionTerminate': + if self.state not in ('incoming', 'connecting', 'connected'): + return + if self._timer is not None and self._timer.active(): + self._timer.cancel() + self._timer = None + # Session ended remotely + prev_state = self.state + self.state = 'terminated' + if prev_state == 'incoming': + reason = stanza.jingle.reason.value if stanza.jingle.reason else 'cancel' + notification.center.post_notification('JingleSessionDidFail', self, NotificationData(originator='remote', reason=reason)) + else: + notification.center.post_notification('JingleSessionWillEnd', self, NotificationData(originator='remote')) + streams = self.proposed_streams if prev_state == 'connecting' else self.streams + for stream in streams: + notification.center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + self.end_time = datetime.now() + notification.center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='remote')) + self._channel.send_exception(proc.ProcExit) + elif notification.name == 'XMPPGotJingleSessionInfo': + info = stanza.jingle.info + if not info: + return + if info == 'ringing': + if self.state != 'connecting': + return + notification.center.post_notification('JingleSessionGotRingIndication', self) + elif info in ('hold', 'unhold'): + if self.state != 'connected': + return + notification.center.post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=info=='hold', partial=False)) + elif notification.name == 'XMPPGotJingleDescriptionInfo': + if self.state != 'connecting': + return + + # Add candidates acquired on transport-info stanzas + for s in self._pending_transport_info_stanzas: + for c in s.jingle.content: + content = next(content for content in stanza.jingle.content if content.name == c.name) + content.transport.candidates.extend(c.transport.candidates) + if isinstance(content.transport, jingle.IceUdpTransport): + if not content.transport.ufrag and c.transport.ufrag: + content.transport.ufrag = c.transport.ufrag + if not content.transport.password and c.transport.password: + content.transport.password = c.transport.password + + remote_sdp = jingle_to_sdp(stanza.jingle) + try: + self._sdp_negotiator.set_remote_answer(remote_sdp) + self._sdp_negotiator.negotiate() + except SIPCoreError: + # The description-info stanza may have been just a parameter change, not a full 'SDP' + return + + if self._timer is not None and self._timer.active(): + self._timer.cancel() + self._timer = None + + del self._pending_transport_info_stanzas[:] + + # Get active SDPs (negotiator may make changes) + local_sdp = self._sdp_negotiator.active_local + remote_sdp = self._sdp_negotiator.active_remote + + notification.center.post_notification('JingleSessionWillStart', sender=self) + stream_map = dict((stream.index, stream) for stream in self.proposed_streams) + for index, local_media in enumerate(local_sdp.media): + remote_media = remote_sdp.media[index] + stream = stream_map[index] + if remote_media.port: + stream.start(local_sdp, remote_sdp, index) + else: + notification.center.remove_observer(self, sender=stream) + self.proposed_streams.remove(stream) + del stream_map[stream.index] + stream.deactivate() + stream.end() + removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] + for stream in removed_streams: + notification.center.remove_observer(self, sender=stream) + self.proposed_streams.remove(stream) + del stream_map[stream.index] + stream.deactivate() + stream.end() + + try: + with api.timeout(self.media_stream_timeout): + wait_count = len(self.proposed_streams) + while wait_count > 0: + notification = operation.channel.wait() + if notification.name == 'MediaStreamDidStart': + wait_count -= 1 + except (MediaStreamDidFailError, api.TimeoutError), e: + for stream in self.proposed_streams: + notification.center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + if isinstance(e, api.TimeoutError): + error = 'media stream timed out while starting' + else: + error = 'media stream failed: %s' % e.data.reason + self._fail(originator='local', reason='failed-application', description=error) + else: + self.state = 'connected' + self.streams = self.proposed_streams + self.proposed_streams = None + self.start_time = datetime.now() + notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams)) + elif notification.name == 'XMPPGotJingleSessionAccept': + if self.state != 'connecting': + return + if self._timer is not None and self._timer.active(): + self._timer.cancel() + self._timer = None + + # Add candidates acquired on transport-info stanzas + for s in self._pending_transport_info_stanzas: + for c in s.jingle.content: + content = next(content for content in stanza.jingle.content if content.name == c.name) + content.transport.candidates.extend(c.transport.candidates) + if isinstance(content.transport, jingle.IceUdpTransport): + if not content.transport.ufrag and c.transport.ufrag: + content.transport.ufrag = c.transport.ufrag + if not content.transport.password and c.transport.password: + content.transport.password = c.transport.password + del self._pending_transport_info_stanzas[:] + + remote_sdp = jingle_to_sdp(stanza.jingle) + try: + self._sdp_negotiator.set_remote_answer(remote_sdp) + self._sdp_negotiator.negotiate() + except SIPCoreError, e: + for stream in self.proposed_streams: + notification.center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + self._fail(originator='remote', reason='incompatible-parameters', description=str(e)) + return + + # Get active SDPs (negotiator may make changes) + local_sdp = self._sdp_negotiator.active_local + remote_sdp = self._sdp_negotiator.active_remote + + notification.center.post_notification('JingleSessionWillStart', sender=self) + stream_map = dict((stream.index, stream) for stream in self.proposed_streams) + for index, local_media in enumerate(local_sdp.media): + remote_media = remote_sdp.media[index] + stream = stream_map[index] + if remote_media.port: + stream.start(local_sdp, remote_sdp, index) + else: + notification.center.remove_observer(self, sender=stream) + self.proposed_streams.remove(stream) + del stream_map[stream.index] + stream.deactivate() + stream.end() + removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] + for stream in removed_streams: + notification.center.remove_observer(self, sender=stream) + self.proposed_streams.remove(stream) + del stream_map[stream.index] + stream.deactivate() + stream.end() + + try: + with api.timeout(self.media_stream_timeout): + wait_count = len(self.proposed_streams) + while wait_count > 0: + notification = operation.channel.wait() + if notification.name == 'MediaStreamDidStart': + wait_count -= 1 + except (MediaStreamDidFailError, api.TimeoutError), e: + for stream in self.proposed_streams: + notification.center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + if isinstance(e, api.TimeoutError): + error = 'media stream timed out while starting' + else: + error = 'media stream failed: %s' % e.data.reason + self._fail(originator='local', reason='failed-application', description=error) + else: + self.state = 'connected' + self.streams = self.proposed_streams + self.proposed_streams = None + self.start_time = datetime.now() + notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams)) + elif notification.name == 'XMPPGotJingleTransportInfo': + if self.state != 'connecting': + # ICE trickling not supported yet, so only accept candidates before accept + return + self._pending_transport_info_stanzas.append(stanza) + + +class JingleSessionManager(object): + __metaclass__ = Singleton + implements(IObserver) + + def __init__(self): + self.sessions = {} + + def start(self): + notification_center = NotificationCenter() + notification_center.add_observer(self, name='JingleSessionNewIncoming') + notification_center.add_observer(self, name='JingleSessionNewOutgoing') + notification_center.add_observer(self, name='JingleSessionDidFail') + notification_center.add_observer(self, name='JingleSessionDidEnd') + + def stop(self): + notification_center = NotificationCenter() + notification_center.remove_observer(self, name='JingleSessionNewIncoming') + notification_center.remove_observer(self, name='JingleSessionNewOutgoing') + notification_center.remove_observer(self, name='JingleSessionDidFail') + notification_center.remove_observer(self, name='JingleSessionDidEnd') + + def handle_notification(self, notification): + if notification.name in ('JingleSessionNewIncoming', 'JingleSessionNewOutgoing'): + session = notification.sender + self.sessions[session.id] = session + elif notification.name in ('JingleSessionDidFail', 'JingleSessionDidEnd'): + session = notification.sender + del self.sessions[session.id] + diff --git a/sylk/applications/xmppgateway/xmpp/jingle/streams/__init__.py b/sylk/applications/xmppgateway/xmpp/jingle/streams/__init__.py new file mode 100644 index 0000000..b88723f --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/jingle/streams/__init__.py @@ -0,0 +1,120 @@ +# Copyright (C) 2009-2011 AG Projects. See LICENSE for details. +# + +""" +This module automatically registers media streams to a stream registry +allowing for a plug and play mechanism of various types of media +negoticated in a SIP session that can be added to this library by using +a generic API. + +For actual usage see rtp.py and msrp.py that implement media streams +based on their respective RTP and MSRP protocols. +""" + + +from operator import attrgetter +from application.python.types import Singleton +from zope.interface import Interface, Attribute + + +class StreamError(Exception): pass +class InvalidStreamError(StreamError): pass +class UnknownStreamError(StreamError): pass + + +# The MediaStream interface +# +class IMediaStream(Interface): + type = Attribute("A string identifying the stream type (ex: audio, video, ...)") + priority = Attribute("An integer value indicating the stream priority relative to the other streams types (higher numbers have higher priority).") + + session = Attribute("Session object to which this stream is attached") + + hold_supported = Attribute("True if the stream supports hold") + on_hold_by_local = Attribute("True if the stream is on hold by the local party") + on_hold_by_remote = Attribute("True if the stream is on hold by the remote") + on_hold = Attribute("True if either on_hold_by_local or on_hold_by_remote is true") + + # this should be a classmethod, but zopeinterface complains if we decorate it with @classmethod -Dan + def new_from_sdp(cls, session, remote_sdp, stream_index): + pass + + def get_local_media(self, for_offer): + pass + + def initialize(self, session, direction): + pass + + def start(self, local_sdp, remote_sdp, stream_index): + pass + + def deactivate(self): + pass + + def end(self): + pass + + def validate_update(self, remote_sdp, stream_index): + pass + + def update(self, local_sdp, remote_sdp, stream_index): + pass + + def hold(self): + pass + + def unhold(self): + pass + + def reset(self, stream_index): + pass + +# The MediaStream registry +# +class StreamDescriptor(object): + def __init__(self, type): + self.type = type + def __get__(self, obj, objtype): + return self if obj is None else obj.get(self.type) + def __set__(self, obj, value): + raise AttributeError('cannot set attribute') + def __delete__(self, obj): + raise AttributeError('cannot delete attribute') + + +class MediaStreamRegistry(object): + __metaclass__ = Singleton + + def __init__(self): + self.__types__ = [] + + def __iter__(self): + return iter(self.__types__) + + def add(self, cls): + if cls.priority is not None and cls not in self.__types__: + self.__types__.append(cls) + self.__types__.sort(key=attrgetter('priority'), reverse=True) + setattr(self.__class__, cls.type.title().translate(None, ' -_') + 'Stream', StreamDescriptor(cls.type)) + + def get(self, type): + try: + return next(cls for cls in self.__types__ if cls.type == type) + except StopIteration: + raise UnknownStreamError("unknown stream type: %s" % type) + + +class MediaStreamRegistrar(type): + """Metaclass for adding a MediaStream to the media stream's class registry""" + def __init__(cls, name, bases, dic): + super(MediaStreamRegistrar, cls).__init__(name, bases, dic) + MediaStreamRegistry().add(cls) + + +# Import the streams defined in submodules +# +from sylk.applications.xmppgateway.xmpp.jingle.streams import rtp +from sylk.applications.xmppgateway.xmpp.jingle.streams.rtp import * + +__all__ = ['StreamError', 'InvalidStreamError', 'UnknownStreamError', 'IMediaStream', 'MediaStreamRegistry', 'MediaStreamRegistrar'] + rtp.__all__ + diff --git a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py new file mode 100644 index 0000000..182a359 --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py @@ -0,0 +1,441 @@ +# Copyright (C) 2009-2011 AG Projects. See LICENSE for details. +# + +""" +Handling of RTP media streams according to RFC3550, RFC3605, RFC3581, +RFC2833 and RFC3711, RFC3489 and draft-ietf-mmusic-ice-19. +""" + +__all__ = ['AudioStream'] + +from threading import RLock + +from application.notification import IObserver, NotificationCenter, NotificationData +from application.python import Null +from zope.interface import implements + +from sipsimple.audio import AudioBridge, AudioDevice, IAudioPort +from sipsimple.configuration.settings import SIPSimpleSettings +from sipsimple.core import AudioTransport, PJSIPError, RTPTransport, SIPCoreError + +from sylk.applications.xmppgateway.xmpp.jingle.streams import IMediaStream, InvalidStreamError, MediaStreamRegistrar, UnknownStreamError +from sylk.configuration import SIPConfig + + +class AudioStream(object): + __metaclass__ = MediaStreamRegistrar + + implements(IMediaStream, IAudioPort, IObserver) + + _streams = [] + + type = 'audio' + priority = 1 + + hold_supported = True + + def __init__(self): + from sipsimple.application import SIPApplication + self.mixer = SIPApplication.voice_audio_mixer + self.bridge = AudioBridge(self.mixer) + self.device = AudioDevice(self.mixer) + self.notification_center = NotificationCenter() + self.on_hold_by_local = False + self.on_hold_by_remote = False + self.direction = None + self.state = "NULL" + self._audio_transport = None + self._hold_request = None + self._ice_state = "NULL" + self._lock = RLock() + self._rtp_transport = None + self.session = None + self._try_ice = False + self._try_forced_srtp = False + self._use_srtp = False + + self.bridge.add(self.device) + + # Audio properties + # + + @property + def codec(self): + return self._audio_transport.codec if self._audio_transport else None + + @property + def consumer_slot(self): + return self._audio_transport.slot if self._audio_transport else None + + @property + def producer_slot(self): + return self._audio_transport.slot if self._audio_transport and not self.muted else None + + @property + def sample_rate(self): + return self._audio_transport.sample_rate if self._audio_transport else None + + @property + def statistics(self): + return self._audio_transport.statistics if self._audio_transport else None + + def _get_muted(self): + return self.__dict__.get('muted', False) + def _set_muted(self, value): + if not isinstance(value, bool): + raise ValueError("illegal value for muted property: %r" % (value,)) + if value == self.muted: + return + old_producer_slot = self.producer_slot + self.__dict__['muted'] = value + notification_center = NotificationCenter() + data = NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot) + notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=data) + muted = property(_get_muted, _set_muted) + del _get_muted, _set_muted + + # RTP properties + # + + @property + def local_rtp_address(self): + return self._rtp_transport.local_rtp_address if self._rtp_transport else None + + @property + def local_rtp_port(self): + return self._rtp_transport.local_rtp_port if self._rtp_transport else None + + @property + def remote_rtp_address(self): + if self._ice_state == "IN_USE": + return self._rtp_transport.remote_rtp_address_received if self._rtp_transport else None + else: + return self._rtp_transport.remote_rtp_address_sdp if self._rtp_transport else None + + @property + def remote_rtp_port(self): + if self._ice_state == "IN_USE": + return self._rtp_transport.remote_rtp_port_received if self._rtp_transport else None + else: + return self._rtp_transport.remote_rtp_port_sdp if self._rtp_transport else None + + @property + def local_rtp_candidate_type(self): + return self._rtp_transport.local_rtp_candidate_type if self._rtp_transport else None + + @property + def remote_rtp_candidate_type(self): + return self._rtp_transport.remote_rtp_candidate_type if self._rtp_transport else None + + @property + def srtp_active(self): + return self._rtp_transport.srtp_active if self._rtp_transport else False + + @property + def ice_active(self): + return self._ice_state == "IN_USE" + + # Generic properties + # + + @property + def on_hold(self): + return self.on_hold_by_local or self.on_hold_by_remote + + # Public methods + # + + @classmethod + def new_from_sdp(cls, session, remote_sdp, stream_index): + # TODO: actually validate the SDP + settings = SIPSimpleSettings() + remote_stream = remote_sdp.media[stream_index] + if remote_stream.media != 'audio': + raise UnknownStreamError + if remote_stream.transport not in ('RTP/AVP', 'RTP/SAVP'): + raise InvalidStreamError("expected RTP/AVP or RTP/SAVP transport in audio stream, got %s" % remote_stream.transport) + if session.account.rtp.srtp_encryption == "mandatory" and not remote_stream.has_srtp: + raise InvalidStreamError("SRTP is locally mandatory but it's not remotely enabled") + supported_codecs = session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list + if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs): + raise InvalidStreamError("no compatible codecs found") + stream = cls() + stream._incoming_remote_sdp = remote_sdp + stream._incoming_stream_index = stream_index + stream._incoming_stream_has_srtp = remote_stream.has_srtp + stream._incoming_stream_has_srtp_forced = remote_stream.transport == 'RTP/SAVP' + return stream + + def initialize(self, session, direction): + with self._lock: + self._streams.append(self) + if self.state != "NULL": + raise RuntimeError("AudioStream.initialize() may only be called in the NULL state") + self.state = "INITIALIZING" + self.session = session + if hasattr(self, "_incoming_remote_sdp"): + # ICE attributes could come at the session level or at the media level + remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index] + self._try_ice = (remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates + self._use_srtp = self._incoming_stream_has_srtp + self._try_forced_srtp = self._incoming_stream_has_srtp_forced + if self._incoming_stream_has_srtp_forced and not self._use_srtp: + self.state = "ENDED" + self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason="SRTP is remotely mandatory but it's not locally enabled")) + return + del self._incoming_stream_has_srtp + del self._incoming_stream_has_srtp_forced + else: + # TODO: Always use ICE? New settings object? + #self._try_ice = self.session.account.nat_traversal.use_ice + self._try_ice = True + self._use_srtp = self.session.account.rtp.srtp_encryption != "disabled" + self._try_forced_srtp = self.session.account.rtp.srtp_encryption == "mandatory" + self._init_rtp_transport() + + def get_local_media(self, for_offer): + with self._lock: + if self.state not in ["INITIALIZED", "WAIT_ICE", "ESTABLISHED"]: + raise RuntimeError("AudioStream.get_local_media() may only be " + + "called in the INITIALIZED, WAIT_ICE or ESTABLISHED states") + if for_offer: + old_direction = self._audio_transport.direction + if old_direction is None: + new_direction = "sendrecv" + elif "send" in old_direction: + new_direction = ("sendonly" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "sendrecv") + else: + new_direction = ("inactive" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "recvonly") + else: + new_direction = None + return self._audio_transport.get_local_media(for_offer, new_direction) + + def start(self, local_sdp, remote_sdp, stream_index): + with self._lock: + if self.state != "INITIALIZED": + raise RuntimeError("AudioStream.start() may only be " + + "called in the INITIALIZED state") + settings = SIPSimpleSettings() + self._audio_transport.start(local_sdp, remote_sdp, stream_index, no_media_timeout=settings.rtp.timeout, + media_check_interval=settings.rtp.timeout) + self._check_hold(self._audio_transport.direction, True) + if self._try_ice: + self.state = 'WAIT_ICE' + else: + self.state = 'ESTABLISHED' + self.notification_center.post_notification('MediaStreamDidStart', sender=self) + + def validate_update(self, remote_sdp, stream_index): + with self._lock: + # TODO: implement + return True + + def update(self, local_sdp, remote_sdp, stream_index): + with self._lock: + connection = remote_sdp.media[stream_index].connection or remote_sdp.connection + if connection.address != self._rtp_transport.remote_rtp_address_sdp or self._rtp_transport.remote_rtp_port_sdp != remote_sdp.media[stream_index].port: + settings = SIPSimpleSettings() + old_consumer_slot = self.consumer_slot + old_producer_slot = self.producer_slot + self.notification_center.remove_observer(self, sender=self._audio_transport) + self._audio_transport.stop() + try: + self._audio_transport = AudioTransport(self.mixer, self._rtp_transport, remote_sdp, stream_index, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) + except SIPCoreError, e: + self.state = "ENDED" + self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=e.args[0])) + return + self.notification_center.add_observer(self, sender=self._audio_transport) + self._audio_transport.start(local_sdp, remote_sdp, stream_index, no_media_timeout=settings.rtp.timeout, media_check_interval=settings.rtp.timeout) + self.notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=True, + old_consumer_slot=old_consumer_slot, new_consumer_slot=self.consumer_slot, + old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot)) + if connection.address == '0.0.0.0' and remote_sdp.media[stream_index].direction == 'sendrecv': + self._audio_transport.update_direction('recvonly') + self._check_hold(self._audio_transport.direction, False) + self.notification_center.post_notification('AudioStreamDidChangeRTPParameters', sender=self) + else: + new_direction = local_sdp.media[stream_index].direction + self._audio_transport.update_direction(new_direction) + self._check_hold(new_direction, False) + self._hold_request = None + + def hold(self): + with self._lock: + if self.on_hold_by_local or self._hold_request == 'hold': + return + if self.state == "ESTABLISHED" and self.direction != "inactive": + self.bridge.remove(self) + self._hold_request = 'hold' + + def unhold(self): + with self._lock: + if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold': + return + if self.state == "ESTABLISHED" and self._hold_request == 'hold': + self.bridge.add(self) + self._hold_request = None if self._hold_request == 'hold' else 'unhold' + + def deactivate(self): + pass + + def end(self): + with self._lock: + if self.state != "ENDED": + if self._audio_transport is not None: + self.notification_center.post_notification('MediaStreamWillEnd', sender=self) + self._audio_transport.stop() + self.notification_center.remove_observer(self, sender=self._audio_transport) + self._audio_transport = None + self._rtp_transport = None + self.state = "ENDED" + self.notification_center.post_notification('MediaStreamDidEnd', sender=self) + else: + self.state = "ENDED" + self.bridge.stop() + self.session = None + + def reset(self, stream_index): + with self._lock: + if self.direction == "inactive" and not self.on_hold_by_local: + new_direction = "sendrecv" + self._audio_transport.update_direction(new_direction) + self._check_hold(new_direction, False) + # TODO: do a full reset, re-creating the AudioTransport, so that a new offer + # would contain all codecs and ICE would be renegotiated -Saul + + def send_dtmf(self, digit): + with self._lock: + if self.state != "ESTABLISHED": + raise RuntimeError("AudioStream.send_dtmf() cannot be used in %s state" % self.state) + try: + self._audio_transport.send_dtmf(digit) + except PJSIPError, e: + if not e.args[0].endswith("(PJ_ETOOMANY)"): + raise + + # Notification handling + # + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_RTPTransportDidFail(self, notification): + with self._lock: + self.notification_center.remove_observer(self, sender=notification.sender) + if self.state == "ENDED": + return + self._try_next_rtp_transport(notification.data.reason) + + def _NH_RTPTransportDidInitialize(self, notification): + settings = SIPSimpleSettings() + rtp_transport = notification.sender + with self._lock: + if not rtp_transport.use_ice: + self.notification_center.remove_observer(self, sender=rtp_transport) + if self.state == "ENDED": + return + del self._rtp_args + del self._stun_servers + try: + if hasattr(self, "_incoming_remote_sdp"): + try: + audio_transport = AudioTransport(self.mixer, rtp_transport, self._incoming_remote_sdp, self._incoming_stream_index, + codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) + finally: + del self._incoming_remote_sdp + del self._incoming_stream_index + else: + audio_transport = AudioTransport(self.mixer, rtp_transport, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) + except SIPCoreError, e: + self.state = "ENDED" + self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=e.args[0])) + return + self._rtp_transport = rtp_transport + self._audio_transport = audio_transport + self.notification_center.add_observer(self, sender=audio_transport) + self.state = "INITIALIZED" + self.notification_center.post_notification('MediaStreamDidInitialize', sender=self) + + def _NH_RTPAudioStreamGotDTMF(self, notification): + self.notification_center.post_notification('AudioStreamGotDTMF', sender=self, data=NotificationData(digit=notification.data.digit)) + + def _NH_RTPAudioTransportDidTimeout(self, notification): + self.notification_center.post_notification('AudioStreamDidTimeout', sender=self) + + def _NH_RTPTransportICENegotiationStateDidChange(self, notification): + self.notification_center.post_notification('AudioStreamICENegotiationStateDidChange', sender=self, data=notification.data) + + def _NH_RTPTransportICENegotiationDidSucceed(self, notification): + self._ice_state = "IN_USE" + rtp_transport = notification.sender + self.notification_center.remove_observer(self, sender=rtp_transport) + with self._lock: + if self.state != "WAIT_ICE": + return + self.notification_center.post_notification('AudioStreamICENegotiationDidSucceed', sender=self, data=notification.data) + self.state = 'ESTABLISHED' + self.notification_center.post_notification('MediaStreamDidStart', sender=self) + + def _NH_RTPTransportICENegotiationDidFail(self, notification): + self._ice_state = "FAILED" + rtp_transport = notification.sender + self.notification_center.remove_observer(self, sender=rtp_transport) + with self._lock: + if self.state != "WAIT_ICE": + return + self.notification_center.post_notification('AudioStreamICENegotiationDidFail', sender=self, data=notification.data) + self.state = 'ESTABLISHED' + self.notification_center.post_notification('MediaStreamDidStart', sender=self) + + # Private methods + # + + def _init_rtp_transport(self, stun_servers=None): + self._rtp_args = dict() + self._rtp_args["use_srtp"] = self._use_srtp + self._rtp_args["srtp_forced"] = self._use_srtp and self._try_forced_srtp + self._rtp_args["use_ice"] = self._try_ice + self._rtp_args["local_rtp_address"] = SIPConfig.local_ip.normalized + self._stun_servers = [(None, None)] + if stun_servers: + self._stun_servers.extend(reversed(stun_servers)) + self._try_next_rtp_transport() + + def _try_next_rtp_transport(self, failure_reason=None): + if self._stun_servers: + stun_address, stun_port = self._stun_servers.pop() + observer_added = False + try: + rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args) + self.notification_center.add_observer(self, sender=rtp_transport) + observer_added = True + rtp_transport.set_INIT() + except SIPCoreError, e: + if observer_added: + self.notification_center.remove_observer(self, sender=rtp_transport) + self._try_next_rtp_transport(e.args[0]) + else: + self.state = "ENDED" + self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=failure_reason)) + + def _check_hold(self, direction, is_initial): + was_on_hold_by_local = self.on_hold_by_local + was_on_hold_by_remote = self.on_hold_by_remote + was_inactive = self.direction == "inactive" + self.direction = direction + inactive = self.direction == "inactive" + self.on_hold_by_local = was_on_hold_by_local if inactive else direction == "sendonly" + self.on_hold_by_remote = "send" not in direction + if (is_initial or was_on_hold_by_local or was_inactive) and not inactive and not self.on_hold_by_local and self._hold_request != 'hold': + self.bridge.add(self) + if not was_on_hold_by_local and self.on_hold_by_local: + self.notification_center.post_notification('AudioStreamDidChangeHoldState', sender=self, data=NotificationData(originator="local", on_hold=True)) + if was_on_hold_by_local and not self.on_hold_by_local: + self.notification_center.post_notification('AudioStreamDidChangeHoldState', sender=self, data=NotificationData(originator="local", on_hold=False)) + if not was_on_hold_by_remote and self.on_hold_by_remote: + self.notification_center.post_notification('AudioStreamDidChangeHoldState', sender=self, data=NotificationData(originator="remote", on_hold=True)) + if was_on_hold_by_remote and not self.on_hold_by_remote: + self.notification_center.post_notification('AudioStreamDidChangeHoldState', sender=self, data=NotificationData(originator="remote", on_hold=False)) + diff --git a/sylk/applications/xmppgateway/xmpp/jingle/util.py b/sylk/applications/xmppgateway/xmpp/jingle/util.py new file mode 100644 index 0000000..fe8d00f --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/jingle/util.py @@ -0,0 +1,155 @@ +# Copyright (C) 2013 AG Projects. See LICENSE for details +# + +import re + +from collections import defaultdict +from itertools import count +from sipsimple.core import SDPSession, SDPMediaStream, SDPAttribute, SDPConnection + +from sylk.applications.xmppgateway.xmpp.stanzas import jingle + +__all__ = ['jingle_to_sdp', 'sdp_to_jingle'] + + +# IPv4 only for now, I'm sorry +ipv4_re = re.compile("^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}$") + +def content_to_sdpstream(content): + if content.description is None or content.transport is None: + raise ValueError + media_stream = SDPMediaStream(str(content.description.media), 0, 'RTP/AVP') + formats = [] + attributes = [] + for item in content.description.payloads: + formats.append(item.id) + attributes.append(SDPAttribute('rtpmap', '%d %s/%d' % (item.id, str(item.name), item.clockrate))) + if item.maxptime: + attributes.append(SDPAttribute('maxptime', str(item.maxptime))) + if item.ptime: + attributes.append(SDPAttribute('ptime', str(item.ptime))) + if item.parameters: + parameters_str = ';'.join(('%s=%s' % (p.name, p.value) for p in item.parameters)) + attributes.append(SDPAttribute('fmtp', '%d %s' % (item.id, str(parameters_str)))) + media_stream.formats = map(str, formats) + media_stream.attributes = attributes # set attributes so that _codec_list is generated + if content.description.encryption: + if content.description.encryption.required: + media_stream.transport = 'RTP/SAVP' + for crypto in content.description.encryption.cryptos: + crypto_str = '%s %s %s' % (crypto.tag, crypto.crypto_suite, crypto.key_params) + if crypto.session_params: + crypto_str += ' %s' % crypto.session_params + media_stream.attributes.append(SDPAttribute('crypto', str(crypto_str))) + if isinstance(content.transport, jingle.IceUdpTransport): + if content.transport.ufrag: + media_stream.attributes.append(SDPAttribute('ice-ufrag', str(content.transport.ufrag))) + if content.transport.password: + media_stream.attributes.append(SDPAttribute('ice-pwd', str(content.transport.password))) + for candidate in content.transport.candidates: + if not ipv4_re.match(candidate.ip): + continue + candidate_str = '%s %d %s %d %s %d typ %s' % (candidate.foundation, candidate.component, candidate.protocol.upper(), candidate.priority, candidate.ip, candidate.port, candidate.typ) + if candidate.related_addr and candidate.related_port: + candidate_str += ' raddr %s rport %d' % (candidate.related_addr, candidate.related_port) + media_stream.attributes.append(SDPAttribute('candidate', str(candidate_str))) + if content.transport.remote_candidate: + remote_candidate = content.transport.remote_candidate + remote_candidates_str = '%d %s %d' % (remote_candidate.component, remote_candidate.ip, remote_candidate.port) + media_stream.attributes.append(SDPAttribute('remote-candidates', str(remote_candidates_str))) + elif isinstance(content.transport, jingle.RawUdpTransport): + # Nothing to do here + pass + else: + raise ValueError + # Set the proper connection information, pick the first RTP candidate and use that + try: + candidate = next(c for c in content.transport.candidates if c.component == 1 and ipv4_re.match(c.ip)) + except StopIteration: + raise ValueError + media_stream.connection = SDPConnection(str(candidate.ip)) + media_stream.port = candidate.port + + return media_stream + + +def jingle_to_sdp(payload): + sdp = SDPSession('127.0.0.1') + for c in payload.content: + try: + media_stream = content_to_sdpstream(c) + except ValueError: + continue + sdp.media.append(media_stream) + return sdp + + +ice_candidate_re = re.compile(r"""^(?P[a-zA-Z0-9+/]+) (?P\d+) (?P[a-zA-Z]+) (?P\d+) (?P[0-9a-fA-F.:]+) (?P\d+) typ (?P[a-zA-Z]+)(?: raddr (?P[0-9a-fA-F.:]+) rport (?P\d+))?$""", re.MULTILINE) +crypto_re = re.compile(r"""^(?P\d+) (?P[a-zA-Z0-9\_]+) (?P[a-zA-Z0-9\:\+]+)(?: (?P[a-zA-Z0-9\:\+]+))?$""", re.MULTILINE) + +def sdpstream_to_content(sdp, index): + media_stream = sdp.media[index] + content = jingle.Content('initiator', media_stream.media) + content.description = jingle.RTPDescription(media=media_stream.media) + try: + ptime = next(attr.value for attr in media_stream.attributes if attr.name=='ptime') + except StopIteration: + ptime = None + try: + maxptime = next(attr.value for attr in media_stream.attributes if attr.name=='maxptime') + except StopIteration: + maxptime = None + rtp_mappings = media_stream.rtp_mappings.copy() + MediaCodec = rtp_mappings[0].__class__ + rtpmap_lines = '\n'.join(attr.value for attr in media_stream.attributes if attr.name=='rtpmap') + rtpmap_codecs = dict([(int(type), MediaCodec(name, rate)) for type, name, rate in media_stream.rtpmap_re.findall(rtpmap_lines)]) + rtp_mappings.update(rtpmap_codecs) + for item in media_stream.formats: + codec = rtp_mappings.get(int(item), None) + if codec is not None: + pt = jingle.PayloadType(int(item), codec.name, codec.rate, 1, ptime=ptime, maxptime=maxptime) + for attr in (attr for attr in media_stream.attributes if attr.name=='fmtp' and attr.value.startswith(item)): + value = attr.value.split(' ', 1)[1] + for v in value.split(';'): + fmtp_name, sep, fmtp_value = v.partition('=') + pt.parameters.append(jingle.Parameter(fmtp_name, fmtp_value)) + content.description.payloads.append(pt) + content.description.encryption = jingle.Encryption(required=media_stream.transport=='RTP/SAVP') + crypto_lines = '\n'.join(attr.value for attr in media_stream.attributes if attr.name=='crypto') + for tag, suite, key_params, session_params in crypto_re.findall(crypto_lines): + content.description.encryption.cryptos.append(jingle.Crypto(suite, key_params, tag, session_params)) + if media_stream.has_ice_candidates: + foundation_counter = count(1) + foundation_map = defaultdict(foundation_counter.next) + id_counter = count(100) + if not media_stream.has_ice_attributes and not sdp.has_ice_attributes: + raise ValueError + ufrag_attr = next(attr for attr in media_stream.attributes+sdp.attributes if attr.name=='ice-ufrag') + pwd_attr = next(attr for attr in media_stream.attributes+sdp.attributes if attr.name=='ice-pwd') + content.transport = jingle.IceUdpTransport(ufrag=ufrag_attr.value, pwd=pwd_attr.value) + candidate_lines = '\n'.join(attr.value for attr in media_stream.attributes if attr.name=='candidate') + for foundation, component, protocol, priority, ip, port, type, raddr, rport in ice_candidate_re.findall(candidate_lines): + candidate = jingle.ICECandidate(component, foundation_map[foundation], 0, next(id_counter), ip, 0, port, priority, protocol.lower(), type, raddr or None, rport or None) + content.transport.candidates.append(candidate) + # TODO: translate remote-candidate + else: + content.transport = jingle.RawUdpTransport() + connection = media_stream.connection or sdp.connection + if not connection: + raise ValueError + content.transport.candidates.append(jingle.UDPCandidate(1, 0, 100, connection.address, media_stream.port, 'UDP')) + # TODO: component for RTCP + return content + + +def sdp_to_jingle(sdp): + payload = jingle.Jingle(None, None) + # action and sid will be filled up by the session + for index, media_stream in enumerate(sdp.media): + try: + content = sdpstream_to_content(sdp, index) + except ValueError: + continue + payload.content.append(content) + return payload + diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py index a567185..d848a0a 100644 --- a/sylk/applications/xmppgateway/xmpp/protocols.py +++ b/sylk/applications/xmppgateway/xmpp/protocols.py @@ -1,294 +1,343 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import NotificationCenter, NotificationData -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.jid import JID from wokkel import disco, ping from wokkel.muc import UserPresence from wokkel.xmppim import BasePresenceProtocol, MessageProtocol, PresenceProtocol from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI from sylk.applications.xmppgateway.xmpp.stanzas import (RECEIPTS_NS, CHATSTATES_NS, MUC_USER_NS, ErrorStanza, NormalMessage, MessageReceipt, ChatMessage, ChatComposingIndication, AvailabilityPresence, SubscriptionPresence, ProbePresence, MUCAvailabilityPresence, GroupChatMessage, IncomingInvitationMessage) +from sylk.applications.xmppgateway.xmpp.stanzas import jingle -__all__ = ['DiscoProtocol', 'MessageProtocol', 'MUCServerProtocol', 'PresenceProtocol'] +__all__ = ['DiscoProtocol', 'JingleProtocol', 'MessageProtocol', 'MUCServerProtocol', 'PresenceProtocol'] class MessageProtocol(MessageProtocol): messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType not in self.messageTypes: message["type"] = 'normal' self.onMessage(message) def onMessage(self, msg): notification_center = NotificationCenter() sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) msg_type = msg.getAttribute('type') msg_id = msg.getAttribute('id', None) is_empty = msg.body is None and msg.html is None if msg_type == 'error': error_type = msg.error['type'] conditions = [(child.name, child.defaultUri) for child in msg.error.elements()] error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg_id) notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=NotificationData(error_message=error_message)) return if msg_type in (None, 'normal', 'chat') and not is_empty: body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) try: elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS) except StopIteration: use_receipt = False else: use_receipt = elem.name == u'request' if msg_type == 'chat': message = ChatMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt) notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=NotificationData(message=message)) else: message = NormalMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt) notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=NotificationData(message=message)) return # Check if it's a composing indication if msg_type == 'chat' and is_empty: for elem in msg.elements(): try: elem = next(c for c in msg.elements() if c.uri == CHATSTATES_NS) except StopIteration: pass else: composing_indication = ChatComposingIndication(sender, recipient, elem.name, id=msg_id) notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=NotificationData(composing_indication=composing_indication)) return # Check if it's a receipt acknowledgement if is_empty: try: elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS) except StopIteration: pass else: if elem.name == u'received' and msg_id is not None: receipt = MessageReceipt(sender, recipient, msg_id) notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=NotificationData(receipt=receipt)) class PresenceProtocol(PresenceProtocol): def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') show = stanza.show statuses = stanza.statuses presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def _process_subscription_stanza(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') type = stanza.element.getAttribute('type') presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) NotificationCenter().post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def subscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def subscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def probeReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = ProbePresence(sender, recipient, id=id) NotificationCenter().post_notification('XMPPGotPresenceProbe', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) class MUCServerProtocol(BasePresenceProtocol): messageTypes = None, 'normal', 'chat', 'groupchat' presenceTypeParserMap = {'available': UserPresence, 'unavailable': UserPresence} def connectionInitialized(self): BasePresenceProtocol.connectionInitialized(self) self.xmlstream.addObserver('/message', self._onMessage) def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType == 'error': return if messageType not in self.messageTypes: message['type'] = 'normal' if messageType == 'groupchat': self.onGroupChat(message) else: to_uri = FrozenURI.parse('xmpp:'+message['to']) if to_uri.host in self.parent.domains: # Check if it's an invitation if message.x is not None and message.x.invite is not None and message.x.invite.uri == MUC_USER_NS: self.onInvitation(message) else: # TODO: give error, private messages not supported pass def onGroupChat(self, msg): sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) message = GroupChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None)) NotificationCenter().post_notification('XMPPMucGotGroupChat', sender=self.parent, data=NotificationData(message=message)) def onInvitation(self, msg): sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) invited_user_uri = FrozenURI.parse('xmpp:'+msg.x.invite['to']) invited_user = Identity(invited_user_uri) if msg.x.invite.reason is not None and msg.x.invite.reason.uri == MUC_USER_NS: reason = unicode(msg.x.invite.reason) else: reason = None invitation = IncomingInvitationMessage(sender, recipient, invited_user=invited_user, reason=reason, id=msg.getAttribute('id', None)) NotificationCenter().post_notification('XMPPMucGotInvitation', sender=self.parent, data=NotificationData(invitation=invitation)) def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=True, id=id) NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=False, id=id) NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) class DiscoProtocol(disco.DiscoHandler): def info(self, requestor, target, nodeIdentifier): """ Gather data for a disco info request. @param requestor: The entity that sent the request. @type requestor: L{JID} @param target: The entity the request was sent to. @type target: L{JID} @param nodeIdentifier: The optional node being queried, or C{''}. @type nodeIdentifier: C{unicode} @return: Deferred with the gathered results from sibling handlers. @rtype: L{defer.Deferred} """ xmpp_manager = self.parent.manager if target.host not in xmpp_manager.domains | xmpp_manager.muc_domains: return defer.fail(StanzaError('service-unavailable')) elements = [] + elements.append(disco.DiscoFeature(disco.NS_DISCO_INFO)) + elements.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS)) + elements.append(disco.DiscoFeature('http://sylkserver.com')) if target.host in xmpp_manager.muc_domains: elements.append(disco.DiscoIdentity('conference', 'text', 'SylkServer Chat Service')) elements.append(disco.DiscoFeature('http://jabber.org/protocol/muc')) if target.user: # We can't say much more here, because the actual conference may end up on a different server elements.append(disco.DiscoFeature('muc_temporary')) elements.append(disco.DiscoFeature('muc_unmoderated')) else: elements.append(disco.DiscoFeature(ping.NS_PING)) if not target.user: elements.append(disco.DiscoIdentity('gateway', 'simple', 'SylkServer')) elements.append(disco.DiscoIdentity('server', 'im', 'SylkServer')) else: - elements.append(disco.DiscoIdentity('account', 'registered')) + elements.append(disco.DiscoIdentity('client', 'pc')) + elements.append(disco.DiscoFeature('urn:ietf:rfc:3264')) elements.append(disco.DiscoFeature('http://jabber.org/protocol/caps')) - - elements.append(disco.DiscoFeature(disco.NS_DISCO_INFO)) - elements.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS)) - elements.append(disco.DiscoFeature('http://sylkserver.com')) + elements.append(disco.DiscoFeature('http://jabber.org/protocol/chatstates')) + elements.append(disco.DiscoFeature(jingle.NS_JINGLE)) + elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP)) + elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_AUDIO)) + #elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_VIDEO)) + elements.append(disco.DiscoFeature(jingle.NS_JINGLE_ICE_UDP_TRANSPORT)) + elements.append(disco.DiscoFeature(jingle.NS_JINGLE_RAW_UDP_TRANSPORT)) return defer.succeed(elements) def items(self, requestor, target, nodeIdentifier): """ Gather data for a disco items request. @param requestor: The entity that sent the request. @type requestor: L{JID} @param target: The entity the request was sent to. @type target: L{JID} @param nodeIdentifier: The optional node being queried, or C{''}. @type nodeIdentifier: C{unicode} @return: Deferred with the gathered results from sibling handlers. @rtype: L{defer.Deferred} """ xmpp_manager = self.parent.manager items = [] if not target.user and target.host in xmpp_manager.domains: items.append(disco.DiscoItem(JID('%s.%s' % (XMPPGatewayConfig.muc_prefix, target.host)), name='Multi-User Chat')) return defer.succeed(items) + +class JingleProtocol(jingle.JingleHandler): + # Functions here need to return immediately so that the IQ result is sent, so schedule them in the reactor + # TODO: review and remove this, just post notifications? + + def onSessionInitiate(self, request): + reactor.callLater(0, NotificationCenter().post_notification, + 'XMPPGotJingleSessionInitiate', + sender=self.parent, + data=NotificationData(stanza=request)) + + def onSessionTerminate(self, request): + reactor.callLater(0, NotificationCenter().post_notification, + 'XMPPGotJingleSessionTerminate', + sender=self.parent, + data=NotificationData(stanza=request)) + + def onSessionAccept(self, request): + reactor.callLater(0, NotificationCenter().post_notification, + 'XMPPGotJingleSessionAccept', + sender=self.parent, + data=NotificationData(stanza=request)) + + def onSessionInfo(self, request): + reactor.callLater(0, NotificationCenter().post_notification, + 'XMPPGotJingleSessionInfo', + sender=self.parent, + data=NotificationData(stanza=request)) + + def onDescriptionInfo(self, request): + reactor.callLater(0, NotificationCenter().post_notification, + 'XMPPGotJingleDescriptionInfo', + sender=self.parent, + data=NotificationData(stanza=request)) + + def onTransportInfo(self, request): + reactor.callLater(0, NotificationCenter().post_notification, + 'XMPPGotJingleTransportInfo', + sender=self.parent, + data=NotificationData(stanza=request)) + diff --git a/sylk/applications/xmppgateway/xmpp/stanzas/jingle.py b/sylk/applications/xmppgateway/xmpp/stanzas/jingle.py new file mode 100644 index 0000000..794be29 --- /dev/null +++ b/sylk/applications/xmppgateway/xmpp/stanzas/jingle.py @@ -0,0 +1,649 @@ + +# Copyright (c) AG Projects +# Copyright (c) Uday Verma +# Copyright (c) Ralph Meijer. +# + +""" +XMPP Jingle Protocol. + +This protocol is specified in + * XEP-0166 - http://xmpp.org/extensions/xep-0166.html + * XEP-0167 - http://xmpp.org/extensions/xep-0167.html + * XEP-0176 - http://xmpp.org/extensions/xep-0176.html + * XEP-0177 - http://xmpp.org/extensions/xep-0177.html +""" + +from twisted.words.xish import domish +from twisted.words.protocols.jabber import error + +from wokkel.generic import Request +from wokkel.subprotocols import IQHandlerMixin, XMPPHandler + + +NS_JINGLE_BASE = 'urn:xmpp:jingle' + +NS_JINGLE = NS_JINGLE_BASE + ':1' +NS_JINGLE_ERRORS = NS_JINGLE_BASE + ':errors:1' + +NS_JINGLE_APPS_RTP = NS_JINGLE_BASE + ':apps:rtp:1' +NS_JINGLE_APPS_RTP_INFO = NS_JINGLE_BASE + ':apps:rtp:info:1' +NS_JINGLE_APPS_RTP_AUDIO = NS_JINGLE_BASE + ':apps:rtp:audio' +NS_JINGLE_APPS_RTP_VIDEO = NS_JINGLE_BASE + ':apps:rtp:video' + +NS_JINGLE_ICE_UDP_TRANSPORT = NS_JINGLE_BASE + ':transports:ice-udp:1' +NS_JINGLE_RAW_UDP_TRANSPORT = NS_JINGLE_BASE + ':transports:raw-udp:1' + +# XPath for Jingle IQ requests +IQ_JINGLE_REQUEST = '/iq[@type="get" or @type="set"]/jingle[@xmlns="' + NS_JINGLE + '"]' + + +class Parameter(object): + """ + A class representing a payload parameter + """ + def __init__(self, name, value): + self.name, self.value = name, value + + @classmethod + def fromElement(cls, element): + return cls(element.getAttribute('name'), element.getAttribute('value')) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri, 'parameter')) + element['name'] = self.name + element['value'] = self.value or '' + return element + + +class Crypto(object): + """ + A crypto method which makes up the encryption to be used + """ + def __init__(self, crypto_suite, key_params, tag, session_params=None): + self.crypto_suite, self.key_params, self.tag, self.session_params = crypto_suite, key_params, tag, session_params + + @classmethod + def fromElement(cls, element): + return cls(element.getAttribute('crypto-suite'), + element.getAttribute('key-params'), + element.getAttribute('tag'), + element.getAttribute('session-params')) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri, 'crypto')) + element['crypto-suite'] = self.crypto_suite + element['key-params'] = self.key_params + if self.session_params: + element['session-params'] = self.session_params + element['tag'] = self.tag + return element + + +class Encryption(object): + """ + A class representing encryption method + """ + def __init__(self, required=False, cryptos=None): + self.required, self.cryptos = required, cryptos or [] + + @classmethod + def fromElement(cls, element): + cryptos = [] + for child in element.elements(): + if child.name == 'crypto': + cryptos.append(Crypto.fromElement(child)) + # TODO: parse ZRTP elements + required = element.hasAttribute('required') and (element.getAttribute('required').lower() in ['true', '1']) + return cls(required, cryptos) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri, 'encryption')) + if self.required: + element['required'] = '1' + + for c in self.cryptos: + element.addChild(c.toElement(defaultUri)) + return element + + +class Bandwidth(object): + """ + A class representing the bandwidth element + """ + def __init__(self, typ, value): + self.typ, self.value = typ, value + + @classmethod + def fromElement(cls, element): + return cls(element.getAttribute('type'), str(element)) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri, 'bandwidth')) + element['type'] = self.typ + element.addContent(self.value) + return element + + +class PayloadType(object): + """ + A class representing payload type + """ + def __init__(self, id, name, clockrate=0, channels=0, maxptime=None, ptime=None, parameters=None): + self.id, self.name, self.clockrate, self.channels, \ + self.maxptime, self.ptime, self.parameters = \ + id, name, clockrate, channels, maxptime, ptime, parameters or [] + + @classmethod + def fromElement(cls, element): + def _sga(v, t): + """ + SafeGetAttribute + """ + try: + return t(element.getAttribute(v)) + except (TypeError, ValueError): + return None + + params = [] + for c in element.children: + params.append(Parameter.fromElement(c)) + + return cls(int(element.getAttribute('id')), + element.getAttribute('name'), + _sga('clockrate', int) or 0, + _sga('channels', int) or 0, + _sga('maxptime', int) or 0, + _sga('ptime', int) or 0, + params) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri, 'payload-type')) + + def _aiv(k, v): + """ + AppendIfValid + """ + if v: + element[k] = str(v) + + element['id'] = str(self.id) + + _aiv('name', self.name) + _aiv('clockrate', self.clockrate) + _aiv('channels', self.channels) + _aiv('maxptime', self.maxptime) + _aiv('ptime', self.ptime) + + for p in self.parameters: + element.addChild(p.toElement()) + + return element + + +class ICECandidate(object): + """ + A class representing an ICE candidate + """ + def __init__(self, component, foundation, generation, + id, ip, network, port, priority, protocol, typ, + related_addr=None, related_port=0): + self.component, self.foundation, self.generation, \ + self.id, self.ip, self.network, self.port, self.priority, \ + self.protocol, self.typ, self.related_addr, self.related_port = \ + component, foundation, generation, \ + id, ip, network, port, priority, protocol, typ, \ + related_addr, related_port + + @classmethod + def fromElement(cls, element): + def _gas(*names): + """ + GetAttributeS + """ + def default_val(t): + return None if t is str else t() + + return [(t(element.getAttribute(name)) if element.hasAttribute(name) else default_val(t)) for name, t in names] + + return cls(*_gas(('component', int), ('foundation', int), + ('generation', int), ('id', str), ('ip', str), + ('network', int), ('port', int), ('priority', int), ('protocol', str), + ('type', str), ('rel-addr', str), ('rel-port', int))) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri, 'candidate')) + def _aas(*names): + """ + AddAttributeS + """ + for n, v in names: + if v is not None: + element[n] = str(v) + + _aas(*[('component', self.component), + ('foundation', self.foundation), + ('generation', self.generation), + ('id', self.id), + ('ip', self.ip), + ('network', self.network), + ('port', self.port), + ('priority', self.priority), + ('protocol', self.protocol), + ('type', self.typ), + ('rel-addr', self.related_addr), + ('rel-port', self.related_port)]) + return element + + +class UDPCandidate(object): + """ + A class representing a UDP candidate + """ + def __init__(self, component, generation, id_, ip, port, protocol, type=None): + self.component = component + self.generation = generation + self.id = id_ + self.ip = ip + self.port = port + self.protocol = protocol + self.type = type + + @classmethod + def fromElement(cls, element): + def _gas(*names): + """ + GetAttributeS + """ + def default_val(t): + return None if t is str else t() + + return [(t(element.getAttribute(name)) if element.hasAttribute(name) else default_val(t)) for name, t in names] + + return cls(*_gas(('component', int), ('generation', int), + ('id', str), ('ip', str), + ('port', int), ('protocol', str), + ('type', str))) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri, 'candidate')) + def _aas(*names): + """ + AddAttributeS + """ + for n, v in names: + if v: + element[n] = str(v) + + _aas(*[('component', self.component), + ('generation', self.generation), + ('id', self.id), + ('ip', self.ip), + ('port', self.port), + ('protocol', self.protocol), + ('type', self.type)]) + return element + + +class ICERemoteCandidate(object): + """ + A class represeting a remote candidate entity + """ + def __init__(self, component, ip, port): + self.component, self.ip, self.port = component, ip, port + + @classmethod + def fromElement(cls, element): + return cls(int(element.getAttribute('component') or '0'), + element.getAttribute('ip'), + int(element.getAttribute('port') or '0')) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri, 'remote-candidate')) + element['component'] = str(self.component) + element['ip'] = self.ip + element['port'] = str(self.port) + return element + + +class IceUdpTransport(object): + """ + Represents the ICE-UDP transport type + """ + def __init__(self, pwd=None, ufrag=None, candidates=None, remote_candidate=None): + self.password, self.ufrag, self.candidates, self.remote_candidate = \ + pwd, ufrag, candidates or [], remote_candidate + + @classmethod + def fromElement(cls, element): + password = element.getAttribute('pwd') or None + ufrag = element.getAttribute('ufrag') or None + + candidates = [] + remote_candidate = None + for child in element.elements(): + if child.name == 'remote-candidate' and remote_candidate is None: + remote_candidate = ICERemoteCandidate.fromElement(child) + elif child.name == 'candidate': + candidates.append(ICECandidate.fromElement(child)) + + return cls(pwd=password, ufrag=ufrag, candidates=candidates, + remote_candidate=remote_candidate) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri or NS_JINGLE_ICE_UDP_TRANSPORT, 'transport')) + if self.password: + element['pwd'] = self.password + if self.ufrag: + element['ufrag'] = self.ufrag + + if self.remote_candidate: + element.addChild(self.remote_candidate.toElement()) + elif self.candidates: + for c in self.candidates: + element.addChild(c.toElement()) + + return element + + +class RawUdpTransport(object): + """ + Represents the Raw-UDP transport type + """ + def __init__(self, candidates=None): + self.candidates = candidates or [] + + @classmethod + def fromElement(cls, element): + candidates = [] + for child in element.elements(): + if child.name == 'candidate': + candidates.append(UDPCandidate.fromElement(child)) + + return cls(candidates=candidates) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri or NS_JINGLE_RAW_UDP_TRANSPORT, 'transport')) + for c in self.candidates: + element.addChild(c.toElement()) + return element + + +class RTPDescription(object): + """ + A class representing a RTP description + """ + def __init__(self, name=None, media=None, ssrc=None, payloads=None, + encryption=None, bandwidth=None): + self.name, self.media, self.ssrc, self.payloads, \ + self.encryption, self.bandwidth = \ + name, media, ssrc, payloads or [], encryption, bandwidth + + @classmethod + def fromElement(cls, element): + plds = [] + encryption, bandwidth = None, None + + for child in element.elements(): + if child.name == 'payload-type': + plds.append(PayloadType.fromElement(child)) + if child.name == 'encryption': + encryption = Encryption.fromElement(child) + if child.name == 'bandwidth': + bandwidth = Bandwidth.fromElement(child) + + return cls(element.getAttribute('name'), + element.getAttribute('media'), + element.getAttribute('ssrc'), plds, encryption, + bandwidth) + + def toElement(self, defaultUri=None): + element = domish.Element((defaultUri or NS_JINGLE_APPS_RTP, 'description')) + if self.name: + element['name'] = self.name + if self.media: + element['media'] = self.media + for p in self.payloads: + element.addChild(p.toElement(defaultUri)) + if self.encryption: + element.addChild(self.encryption.toElement(defaultUri)) + if self.bandwidth: + element.addChild(self.bandwidth.toElement(defaultUri)) + + return element + + +class Content(object): + """ + A class indicating a single content item within a jingle request. + """ + def __init__(self, creator, name, disposition=None, senders=None): + self.creator, self.name, self.disposition, self.senders = \ + creator, name, disposition, senders + self.description = None + self.transport = None + + @classmethod + def fromElement(cls, element): + creator = element.getAttribute('creator') + name = element.getAttribute('name') + disposition = element.getAttribute('disposition') + senders = element.getAttribute('senders') + + description, transport = None, None + for c in element.elements(): + if c.name == 'description' and c.uri == NS_JINGLE_APPS_RTP: + description = RTPDescription.fromElement(c) + elif c.name == 'transport' and c.uri == NS_JINGLE_ICE_UDP_TRANSPORT: + transport = IceUdpTransport.fromElement(c) + elif c.name == 'transport' and c.uri == NS_JINGLE_RAW_UDP_TRANSPORT: + transport = RawUdpTransport.fromElement(c) + + ret = cls(creator, name, disposition, senders) + ret.description = description + ret.transport = transport + return ret + + def toElement(self): + element = domish.Element((None, 'content')) + element['creator'] = self.creator + element['name'] = self.name + if self.disposition: + element['disposition'] = self.disposition + if self.senders: + element['senders'] = self.senders + if self.description: + element.addChild(self.description.toElement()) + if self.transport: + element.addChild(self.transport.toElement()) + return element + + +class EmptyType(unicode): + @classmethod + def fromElement(cls, element): + return cls(element.name) + + def toElement(self): + return domish.Element((None, self)) + +class ReasonType(EmptyType): + pass + +class AlternativeSessionReason(unicode): + def __new__(cls, value): + obj = unicode.__new__(cls, 'alternative-session') + obj.sid = value + return obj + + @classmethod + def fromElement(cls, element): + return cls(element.firstChildElement().children[0]) + + def toElement(self): + element = domish.Element((None, self)) + element.addElement('sid', content=self.sid) + return element + +class Reason(object): + def __init__(self, reason, text=None): + self.value = reason + self.text = text + + @classmethod + def fromElement(cls, element): + reason = None + text = None + for c in element.children: + if c.name == 'text': + text = c.children[0] + elif c.name == 'alternative-session': + reason = AlternativeSessionReason.fromElement(c) + else: + reason = ReasonType.fromElement(c) + return cls(reason, text) + + def toElement(self): + element = domish.Element((None, 'reason')) + element.addChild(self.value.toElement()) + if self.text: + element.addElement('text', content=self.text) + return element + +class Info(unicode): + + @classmethod + def fromElement(cls, element): + return cls(element.name) + + def toElement(self): + return domish.Element((NS_JINGLE_APPS_RTP_INFO, self)) + +class MuteInfo(Info): + + def __new__(cls, value, creator, name): + obj = unicode.__new__(cls, value) + obj.creator = creator + obj.name = name + return obj + + @classmethod + def fromElement(cls, element): + return cls(element.name, element['creator'], element['name']) + + def toElement(self): + element = super(MuteInfo, self).toElement() + element['creator'] = self.creator + element['name'] = self.name + return element + + +class Jingle(object): + """ + A class representing a Jingle element within an IQ request + """ + def __init__(self, action, sid, initiator=None, responder=None, content=None, reason=None, info=None): + self.action = action + self.sid = sid + self.initiator = initiator + self.responder = responder + self.reason = reason + self.info = info + if not hasattr(content, '__iter__'): + if content is not None: + self.content = [content] + else: + self.content = [] + else: + self.content = content + + @classmethod + def fromElement(cls, element): + action = element.getAttribute('action') + initiator = element.getAttribute('initiator') + responder = element.getAttribute('responder') + sid = element.getAttribute('sid') + + content = [] + reason = None + info = None + for c in element.elements(): + if c.name == 'content': + content.append(Content.fromElement(c)) + elif c.name == 'reason': + reason = Reason.fromElement(c) + elif c.uri == NS_JINGLE_APPS_RTP_INFO: + if c.name in ('mute', 'unmute'): + info = MuteInfo.fromElement(c) + else: + info = Info.fromElement(c) + return cls(action, sid, initiator, responder, content=content, reason=reason, info=info) + + def toElement(self): + element = domish.Element((NS_JINGLE, 'jingle')) + element['action'] = self.action + element['sid'] = self.sid + if self.initiator: + element['initiator'] = self.initiator + if self.responder: + element['responder'] = self.responder + for c in self.content: + element.addChild(c.toElement()) + if self.reason: + element.addChild(self.reason.toElement()) + if self.info: + element.addChild(self.info.toElement()) + return element + + +class JingleIq(Request): + stanzaKind = 'iq' + stanzaType = 'set' + timeout = None + childParsers = {(NS_JINGLE, 'jingle'): '_parseJingleElement'} + + def __init__(self, sender=None, recipient=None, jingle=None): + Request.__init__(self, recipient, sender, self.stanzaType) + self.jingle = jingle + + def _parseJingleElement(self, element): + self.jingle = Jingle.fromElement(element) + + def toElement(self): + element = Request.toElement(self) + element.addChild(self.jingle.toElement()) + return element + + +class JingleHandler(XMPPHandler, IQHandlerMixin): + + iqHandlers = {IQ_JINGLE_REQUEST: '_onJingleRequest'} + + def connectionInitialized(self): + self.xmlstream.addObserver(IQ_JINGLE_REQUEST, self.handleRequest) + + def sessionTerminate(self, sender, recipient, sid, reason=None): + jingle = Jingle('session-terminate', sid, reason=reason) + return JingleIq(sender=sender, recipient=recipient, jingle=jingle) + + def sessionInfo(self, sender, recipient, sid, info=None): + jingle = Jingle('session-info', sid, info=info) + return JingleIq(sender=sender, recipient=recipient, jingle=jingle) + + def sessionAccept(self, sender, recipient, payload): + payload.action = 'session-accept' + return JingleIq(sender=sender, recipient=recipient, jingle=payload) + + def sessionInitiate(self, sender, recipient, payload): + payload.action = 'session-initiate' + return JingleIq(sender=sender, recipient=recipient, jingle=payload) + + def _onJingleRequest(self, iq): + request = JingleIq.fromElement(iq) + method_name = 'on'+''.join(item.capitalize() for item in request.jingle.action.lower().split('-')) + handler = getattr(self, method_name, None) + if callable(handler): + handler(request) + else: + raise error.StanzaError('bad-request') +