diff --git a/sylk/applications/xmppgateway/media.py b/sylk/applications/xmppgateway/media.py index c4186b0..ff6091c 100644 --- a/sylk/applications/xmppgateway/media.py +++ b/sylk/applications/xmppgateway/media.py @@ -1,329 +1,332 @@ # Copyright (C) 2013 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib.twistedutil import block_on from sipsimple.account import AccountManager from sipsimple.conference import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import ContactHeader, FromHeader, ToHeader from sipsimple.core import Engine, SIPURI, SIPCoreError from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry as SIPMediaStreamRegistry from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource, decode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry as JingleMediaStreamRegistry from sylk.applications.xmppgateway.xmpp.stanzas import jingle from sylk.configuration import SIPConfig from sylk.session import Session __all__ = ['MediaSessionHandler'] class MediaSessionHandler(object): implements(IObserver) def __init__(self): self.started = False self.ended = False self._sip_identity = None self._xmpp_identity = None self._audio_bidge = AudioConference() self.sip_session = None self.jingle_session = None @classmethod def new_from_sip_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = JingleMediaStreamRegistry().get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject(488) return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) instance._sip_identity = Identity(sip_leg_uri) # Get URI representing the XMPP side request_uri = session._invitation.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) instance._xmpp_identity = Identity(xmpp_leg_uri) instance.sip_session = session instance._start_outgoing_jingle_session(streams) return instance @classmethod def new_from_jingle_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = SIPMediaStreamRegistry().get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject('unsupported-applications') return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) instance._xmpp_identity = session.remote_identity instance._sip_identity = session.local_identity instance.jingle_session = session instance._start_outgoing_sip_session(streams) return instance @property def sip_identity(self): return self._sip_identity @property def xmpp_identity(self): return self._xmpp_identity def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('MediaSessionHandlerDidStart', sender=self) def _get_started(self): return self.__dict__['started'] started = property(_get_started, _set_started) del _get_started, _set_started @run_in_green_thread def _start_outgoing_sip_session(self, streams): notification_center = NotificationCenter() # self.xmpp_identity is our local identity on the SIP side from_uri = self.xmpp_identity.uri.as_sip_uri() from_uri.parameters.pop('gr', None) # no GRUU in From header to_uri = self.sip_identity.uri.as_sip_uri() to_uri.parameters.pop('gr', None) # no GRUU in To header # TODO: need to fix GRUU in the proxy #contact_uri = self.xmpp_identity.uri.as_sip_uri() #contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) lookup = DNSLookup() settings = SIPSimpleSettings() account = AccountManager().sylkserver_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('MedialSessionHandlerDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) contact_header = ContactHeader(contact_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) self.sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=streams) @run_in_green_thread def _start_outgoing_jingle_session(self, streams): if self.xmpp_identity.uri.resource is not None: self.sip_session.reject() return xmpp_manager = XMPPManager() local_jid = self.sip_identity.uri.as_xmpp_jid() remote_jid = self.xmpp_identity.uri.as_xmpp_jid() # If this was an invitation to a conference, use the information in the Referred-By header if self.sip_identity.uri.host in xmpp_manager.muc_domains and self.sip_session.transfer_info and self.sip_session.transfer_info.referred_by: try: referred_by_uri = SIPURI.parse(self.sip_session.transfer_info.referred_by) except SIPCoreError: self.sip_session.reject(488) return else: inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) local_jid = inviter_uri.as_xmpp_jid() # Use disco to gather potential JIDs to call d = xmpp_manager.disco_client_protocol.requestItems(remote_jid, sender=local_jid) try: items = block_on(d) except Exception: items = [] if not items: self.sip_session.reject(480) return # Check which items support Jingle valid = [] for item in items: d = xmpp_manager.disco_client_protocol.requestInfo(item.entity, nodeIdentifier=item.nodeIdentifier, sender=local_jid) try: info = block_on(d) except Exception: continue if jingle.NS_JINGLE in info.features and jingle.NS_JINGLE_APPS_RTP in info.features: valid.append(item.entity) if not valid: self.sip_session.reject(480) return # TODO: start multiple sessions? self._xmpp_identity = Identity(FrozenURI.parse(valid[0])) notification_center = NotificationCenter() if self.sip_identity.uri.host in xmpp_manager.muc_domains: self.jingle_session = JingleSession(xmpp_manager.jingle_coin_protocol) else: self.jingle_session = JingleSession(xmpp_manager.jingle_protocol) notification_center.add_observer(self, sender=self.jingle_session) - self.jingle_session.connect(self.sip_identity, self.xmpp_identity, streams) + self.jingle_session.connect(self.sip_identity, self.xmpp_identity, streams, is_focus=self.sip_session.remote_focus) def end(self): if self.ended: return notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) if self.sip_session.direction == 'incoming' and not self.started: self.sip_session.reject() else: self.sip_session.end() self.sip_session = None if self.jingle_session is not None: notification_center.remove_observer(self, sender=self.jingle_session) if self.jingle_session.direction == 'incoming' and not self.started: self.jingle_session.reject() else: self.jingle_session.end() self.jingle_session = None self.ended = True if self.started: notification_center.post_notification('MediaSessionHandlerDidEnd', sender=self) else: notification_center.post_notification('MediaSessionHandlerDidFail', sender=self) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP session %s started" % notification.sender._invitation.call_id) if self.sip_session.direction == 'outgoing': # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) - self.jingle_session.accept(self.jingle_session.proposed_streams) + self.jingle_session.accept(self.jingle_session.proposed_streams, is_focus=self.sip_session.remote_focus) else: # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP session %s ended" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP session %s failed (%s)" % (notification.sender._invitation.call_id, notification.data.reason)) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionGotProposal(self, notification): self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_SIPSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.jingle_session.hold() else: self.jingle_session.unhold() + def _NH_SIPSessionGotConferenceInfo(self, notification): + self.jingle_session._send_conference_info(notification.data.conference_info.toxml()) + def _NH_JingleSessionDidStart(self, notification): log.msg("Jingle session %s started" % notification.sender.id) if self.jingle_session.direction == 'incoming': # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) else: # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.sip_session.accept(self.sip_session.proposed_streams) def _NH_JingleSessionDidEnd(self, notification): log.msg("Jingle session %s ended" % notification.sender.id) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidFail(self, notification): log.msg("Jingle session %s failed (%s)" % (notification.sender.id, notification.data.reason)) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.sip_session.hold() else: self.sip_session.unhold() diff --git a/sylk/applications/xmppgateway/xmpp/jingle/session.py b/sylk/applications/xmppgateway/xmpp/jingle/session.py index 3aa9533..f91ba85 100644 --- a/sylk/applications/xmppgateway/xmpp/jingle/session.py +++ b/sylk/applications/xmppgateway/xmpp/jingle/session.py @@ -1,741 +1,773 @@ # Copyright (C) 2013 AG Projects. See LICENSE for details # import random import string from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton +from cStringIO import StringIO from datetime import datetime from eventlib import api, coros, proc from eventlib.twistedutil import block_on +from lxml import etree from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SDPSession, SDPMediaStream, SDPConnection, SDPNegotiator from sipsimple.core import SIPCoreError from sipsimple.threading import run_in_twisted_thread from twisted.internet import reactor from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.xmlstream import TimeoutError as IqTimeoutError from zope.interface import implements from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sylk.applications.xmppgateway.xmpp.jingle.util import jingle_to_sdp, sdp_to_jingle from sylk.applications.xmppgateway.xmpp.stanzas import jingle from sylk.configuration import SIPConfig def random_id(): return ''.join(random.choice(string.ascii_letters+string.digits) for x in xrange(32)) class MediaStreamDidFailError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class Operation(object): __params__ = () def __init__(self, **params): for name, value in params.iteritems(): setattr(self, name, value) for param in set(self.__params__).difference(params): raise ValueError("missing operation parameter: '%s'" % param) self.channel = coros.queue() class AcceptOperation(Operation): - __params__ = ('streams',) + __params__ = ('streams', 'is_focus') class SendRingIndicationOperation(Operation): __params__ = () class RejectOperation(Operation): __params__ = ('reason',) class EndOperation(Operation): __params__ = () class HoldOperation(Operation): __params__ = () class UnholdOperation(Operation): __params__ = () class ProcessRemoteOperation(Operation): __params__ = ('notification',) class ConnectOperation(Operation): - __params__ = ('sender', 'recipient', 'streams') + __params__ = ('sender', 'recipient', 'streams', 'is_focus') + + +class SendConferenceInfoOperation(Operation): + __params__ = ('xml',) class JingleSession(object): implements(IObserver) jingle_stanza_timeout = 3 media_stream_timeout = 15 def __init__(self, protocol): self.account = AccountManager().sylkserver_account self._protocol = protocol self._id = None self._local_identity = None self._remote_identity = None self._local_jid = None self._remote_jid = None self._channel = coros.queue() self._current_operation = None self._proc = proc.spawn(self._run) self._timer = None self._sdp_negotiator = None self._pending_transport_info_stanzas = [] self.direction = None self.state = None self.streams = None self.proposed_streams = None self.start_time = None self.end_time = None self.on_hold = False + self.local_focus = False def init_incoming(self, stanza): self._id = stanza.jingle.sid self._local_identity = Identity(FrozenURI.parse(stanza.recipient)) self._remote_identity = Identity(FrozenURI.parse(stanza.sender)) self._local_jid = self._local_identity.uri.as_xmpp_jid() self._remote_jid = self._remote_identity.uri.as_xmpp_jid() remote_sdp = jingle_to_sdp(stanza.jingle) try: self._sdp_negotiator = SDPNegotiator.create_with_remote_offer(remote_sdp) except SIPCoreError, e: self._fail(originator='local', reason='general-error', description=str(e)) return self.proposed_streams = [] for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry(): try: stream = stream_type.new_from_sdp(self, remote_sdp, index) except InvalidStreamError: break except UnknownStreamError: continue else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self.direction = 'incoming' self.state = 'incoming' NotificationCenter().post_notification('JingleSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams)) else: self._fail(originator='local', reason='unsupported-applications') - def connect(self, sender_identity, recipient_identity, streams): - self._schedule_operation(ConnectOperation(sender=sender_identity, recipient=recipient_identity, streams=streams)) + def connect(self, sender_identity, recipient_identity, streams, is_focus=False): + self._schedule_operation(ConnectOperation(sender=sender_identity, recipient=recipient_identity, streams=streams, is_focus=is_focus)) def send_ring_indication(self): self._schedule_operation(SendRingIndicationOperation()) - def accept(self, streams): - self._schedule_operation(AcceptOperation(streams=streams)) + def accept(self, streams, is_focus=False): + self._schedule_operation(AcceptOperation(streams=streams, is_focus=is_focus)) def reject(self, reason='busy'): self._schedule_operation(RejectOperation(reason=reason)) def hold(self): self._schedule_operation(HoldOperation()) def unhold(self): self._schedule_operation(UnholdOperation()) def end(self): self._schedule_operation(EndOperation()) def add_stream(self): raise NotImplementedError def remove_stream(self): raise NotImplementedError @property def id(self): return self._id @property def local_identity(self): return self._local_identity @property def remote_identity(self): return self._remote_identity + @run_in_twisted_thread + def _send_conference_info(self, xml): + # This function is not meant for users to call, entities with knowledge about JingleSession + # internals will call it, such as the MediaSessionHandler + self._schedule_operation(SendConferenceInfoOperation(xml=xml)) + def _send_stanza(self, stanza): if self.direction == 'incoming': stanza.jingle.initiator = unicode(self._remote_jid) stanza.jingle.responder = unicode(self._local_jid) else: stanza.jingle.initiator = unicode(self._local_jid) stanza.jingle.responder = unicode(self._remote_jid) stanza.timeout = self.jingle_stanza_timeout return self._protocol.request(stanza) def _fail(self, originator='local', reason='general-error', description=None): reason = jingle.Reason(jingle.ReasonType(reason), text=description) stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason) self._send_stanza(stanza) self.state = 'terminated' failure_str = '%s%s' % (reason, ' %s' % description if description else '') NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=failure_str)) self._channel.send_exception(proc.ProcExit) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidInitialize(self, notification): if self._current_operation is not None: self._current_operation.channel.send(notification) def _NH_MediaStreamDidStart(self, notification): if self._current_operation is not None: self._current_operation.channel.send(notification) def _NH_MediaStreamDidFail(self, notification): if self._current_operation is not None: self._current_operation.channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) else: self.end() def _NH_XMPPGotJingleSessionAccept(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) def _NH_XMPPGotJingleSessionTerminate(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) def _NH_XMPPGotJingleSessionInfo(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) def _NH_XMPPGotJingleDescriptionInfo(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) def _NH_XMPPGotJingleTransportInfo(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) # Operation handling @run_in_twisted_thread def _schedule_operation(self, operation): self._channel.send(operation) def _run(self): while True: self._current_operation = op = self._channel.wait() try: handler = getattr(self, '_OH_%s' % op.__class__.__name__) handler(op) except BaseException: self._proc = None raise finally: self._current_operation = None def _OH_AcceptOperation(self, operation): if self.state != 'incoming': return notification_center = NotificationCenter() settings = SIPSimpleSettings() streams = operation.streams for stream in self.proposed_streams: if stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 remote_sdp = self._sdp_negotiator.current_remote local_ip = SIPConfig.local_ip.normalized local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(for_offer=False) else: media = SDPMediaStream.new(media) media.port = 0 media.attributes = [] local_sdp.media.append(media) try: self._sdp_negotiator.set_local_answer(local_sdp) self._sdp_negotiator.negotiate() except SIPCoreError, e: self._fail(originator='local', reason='incompatible-parameters', description=str(e)) return + self.local_focus = operation.is_focus + notification_center.post_notification('JingleSessionWillStart', sender=self) # Get active SDPs (negotiator may make changes) local_sdp = self._sdp_negotiator.active_local remote_sdp = self._sdp_negotiator.active_remote # Build the payload and send it over payload = sdp_to_jingle(local_sdp) payload.sid = self._id + if self.local_focus: + payload.conference_info = jingle.ConferenceInfo(True) stanza = self._protocol.sessionAccept(self._local_jid, self._remote_jid, payload) d = self._send_stanza(stanza) block_on(d) wait_count = 0 stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map.get(index, None) if stream is not None: if remote_media.port: wait_count += 1 stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 except (MediaStreamDidFailError, api.TimeoutError, IqTimeoutError, StanzaError), e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' elif isinstance(e, IqTimeoutError): error = 'timeout sending IQ stanza' elif isinstance(e, StanzaError): error = str(e.condition) else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', reason='failed-application', description=error) else: self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = datetime.now() notification_center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams)) def _OH_ConnectOperation(self, operation): if self.state is not None: return settings = SIPSimpleSettings() notification_center = NotificationCenter() self.direction = 'outgoing' self.state = 'connecting' self.proposed_streams = operation.streams + self.local_focus = operation.is_focus self._id = random_id() self._local_identity = operation.sender self._remote_identity = operation.recipient self._local_jid = self._local_identity.uri.as_xmpp_jid() self._remote_jid = self._remote_identity.uri.as_xmpp_jid() notification_center.post_notification('JingleSessionNewOutgoing', self, NotificationData(streams=operation.streams)) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 # Build local SDP and negotiator local_ip = SIPConfig.local_ip.normalized local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent) for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(for_offer=True) local_sdp.media.append(media) self._sdp_negotiator = SDPNegotiator.create_with_local_offer(local_sdp) # Build the payload and send it over payload = sdp_to_jingle(local_sdp) payload.sid = self._id + if self.local_focus: + payload.conference_info = jingle.ConferenceInfo(True) stanza = self._protocol.sessionInitiate(self._local_jid, self._remote_jid, payload) d = self._send_stanza(stanza) block_on(d) except (MediaStreamDidFailError, IqTimeoutError, StanzaError, SIPCoreError), e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, IqTimeoutError): error = 'timeout sending IQ stanza' elif isinstance(e, StanzaError): error = str(e.condition) elif isinstance(e, SIPCoreError): error = str(e) else: error = 'media stream failed: %s' % e.data.reason self.state = 'terminated' NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=error)) self._channel.send_exception(proc.ProcExit) else: self._timer = reactor.callLater(settings.sip.invite_timeout, self.end) def _OH_RejectOperation(self, operation): if self.state != 'incoming': return reason = jingle.Reason(jingle.ReasonType(operation.reason)) stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason) self._send_stanza(stanza) self.state = 'terminated' self._channel.send_exception(proc.ProcExit) def _OH_EndOperation(self, operation): if self.state not in ('connecting', 'connected'): return if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None prev_state = self.state self.state = 'terminating' notification_center = NotificationCenter() notification_center.post_notification('JingleSessionWillEnd', self) streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() if prev_state == 'connected': reason = jingle.Reason(jingle.ReasonType('success')) else: reason = jingle.Reason(jingle.ReasonType('cancel')) stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason) self._send_stanza(stanza) self.state = 'terminated' if prev_state == 'connected': self.end_time = datetime.now() notification_center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='local')) else: notification_center.post_notification('JingleSessionDidFail', self, NotificationData(originator='local', reason='cancel')) for stream in streams: stream.end() self._channel.send_exception(proc.ProcExit) def _OH_SendRingIndicationOperation(self, operation): if self.state != 'incoming': return stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('ringing')) self._send_stanza(stanza) def _OH_HoldOperation(self, operation): if self.state != 'connected': return if self.on_hold: return self.on_hold = True for stream in self.streams: stream.hold() stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('hold')) self._send_stanza(stanza) NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=False)) def _OH_UnholdOperation(self, operation): if self.state != 'connected': return if not self.on_hold: return self.on_hold = False for stream in self.streams: stream.unhold() stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('unhold')) self._send_stanza(stanza) NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) + def _OH_SendConferenceInfoOperation(self, operation): + if self.state != 'connected': + return + if not self.local_focus: + return + tree = etree.parse(StringIO(operation.xml)) + tree.getroot().attrib['sid'] = self._id # FIXME: non-standard, but Jitsi does it + data = etree.tostring(tree, xml_declaration=False) # Strip the XML heading + stanza = jingle.ConferenceInfoIq(sender=self._local_jid, recipient=self._remote_jid, payload=data) + stanza.timeout = self.jingle_stanza_timeout + self._protocol.request(stanza) + def _OH_ProcessRemoteOperation(self, operation): notification = operation.notification stanza = notification.data.stanza if notification.name == 'XMPPGotJingleSessionTerminate': if self.state not in ('incoming', 'connecting', 'connected_pending_accept', 'connected'): return if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None # Session ended remotely prev_state = self.state self.state = 'terminated' if prev_state == 'incoming': reason = stanza.jingle.reason.value if stanza.jingle.reason else 'cancel' notification.center.post_notification('JingleSessionDidFail', self, NotificationData(originator='remote', reason=reason)) else: notification.center.post_notification('JingleSessionWillEnd', self, NotificationData(originator='remote')) streams = self.proposed_streams if prev_state == 'connecting' else self.streams for stream in streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.end_time = datetime.now() notification.center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='remote')) self._channel.send_exception(proc.ProcExit) elif notification.name == 'XMPPGotJingleSessionInfo': info = stanza.jingle.info if not info: return if info == 'ringing': if self.state not in ('connecting', 'connected_pending_accept'): return notification.center.post_notification('JingleSessionGotRingIndication', self) elif info in ('hold', 'unhold'): if self.state != 'connected': return notification.center.post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=info=='hold', partial=False)) elif notification.name == 'XMPPGotJingleDescriptionInfo': if self.state != 'connecting': return # Add candidates acquired on transport-info stanzas for s in self._pending_transport_info_stanzas: for c in s.jingle.content: content = next(content for content in stanza.jingle.content if content.name == c.name) content.transport.candidates.extend(c.transport.candidates) if isinstance(content.transport, jingle.IceUdpTransport): if not content.transport.ufrag and c.transport.ufrag: content.transport.ufrag = c.transport.ufrag if not content.transport.password and c.transport.password: content.transport.password = c.transport.password remote_sdp = jingle_to_sdp(stanza.jingle) try: self._sdp_negotiator.set_remote_answer(remote_sdp) self._sdp_negotiator.negotiate() except SIPCoreError: # The description-info stanza may have been just a parameter change, not a full 'SDP' return if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None del self._pending_transport_info_stanzas[:] # Get active SDPs (negotiator may make changes) local_sdp = self._sdp_negotiator.active_local remote_sdp = self._sdp_negotiator.active_remote notification.center.post_notification('JingleSessionWillStart', sender=self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification.center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification.center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() try: with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 except (MediaStreamDidFailError, api.TimeoutError), e: for stream in self.proposed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', reason='failed-application', description=error) else: self.state = 'connected_pending_accept' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = datetime.now() # Hold the streams to prevent real RTP from flowing for stream in self.streams: stream.hold() elif notification.name == 'XMPPGotJingleSessionAccept': if self.state not in ('connecting', 'connected_pending_accept'): return if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None if self.state == 'connected_pending_accept': # We already negotiated ICE and media is 'flowing' (not really because streams are on hold) # unhold the streams and pretend the session just started for stream in self.streams: stream.unhold() self.state = 'connected' notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams)) return # Add candidates acquired on transport-info stanzas for s in self._pending_transport_info_stanzas: for c in s.jingle.content: content = next(content for content in stanza.jingle.content if content.name == c.name) content.transport.candidates.extend(c.transport.candidates) if isinstance(content.transport, jingle.IceUdpTransport): if not content.transport.ufrag and c.transport.ufrag: content.transport.ufrag = c.transport.ufrag if not content.transport.password and c.transport.password: content.transport.password = c.transport.password del self._pending_transport_info_stanzas[:] remote_sdp = jingle_to_sdp(stanza.jingle) try: self._sdp_negotiator.set_remote_answer(remote_sdp) self._sdp_negotiator.negotiate() except SIPCoreError, e: for stream in self.proposed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', reason='incompatible-parameters', description=str(e)) return # Get active SDPs (negotiator may make changes) local_sdp = self._sdp_negotiator.active_local remote_sdp = self._sdp_negotiator.active_remote notification.center.post_notification('JingleSessionWillStart', sender=self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification.center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification.center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() try: with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 except (MediaStreamDidFailError, api.TimeoutError), e: for stream in self.proposed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', reason='failed-application', description=error) else: self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = datetime.now() notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams)) elif notification.name == 'XMPPGotJingleTransportInfo': if self.state != 'connecting': # ICE trickling not supported yet, so only accept candidates before accept return self._pending_transport_info_stanzas.append(stanza) class JingleSessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='JingleSessionNewIncoming') notification_center.add_observer(self, name='JingleSessionNewOutgoing') notification_center.add_observer(self, name='JingleSessionDidFail') notification_center.add_observer(self, name='JingleSessionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='JingleSessionNewIncoming') notification_center.remove_observer(self, name='JingleSessionNewOutgoing') notification_center.remove_observer(self, name='JingleSessionDidFail') notification_center.remove_observer(self, name='JingleSessionDidEnd') def handle_notification(self, notification): if notification.name in ('JingleSessionNewIncoming', 'JingleSessionNewOutgoing'): session = notification.sender self.sessions[session.id] = session elif notification.name in ('JingleSessionDidFail', 'JingleSessionDidEnd'): session = notification.sender del self.sessions[session.id] diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py index 2f5e2f6..ac1b6de 100644 --- a/sylk/applications/xmppgateway/xmpp/protocols.py +++ b/sylk/applications/xmppgateway/xmpp/protocols.py @@ -1,381 +1,383 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import NotificationCenter, NotificationData from twisted.internet import defer, reactor from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.jid import JID from wokkel import disco, muc, ping, xmppim from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI from sylk.applications.xmppgateway.xmpp.stanzas import (RECEIPTS_NS, CHATSTATES_NS, MUC_USER_NS, ErrorStanza, NormalMessage, MessageReceipt, ChatMessage, ChatComposingIndication, AvailabilityPresence, SubscriptionPresence, ProbePresence, MUCAvailabilityPresence, GroupChatMessage, IncomingInvitationMessage) from sylk.applications.xmppgateway.xmpp.stanzas import jingle __all__ = ['DiscoProtocol', 'JingleProtocol', 'MessageProtocol', 'MUCServerProtocol', 'MUCPresenceProtocol', 'PresenceProtocol'] class MessageProtocol(xmppim.MessageProtocol): messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType not in self.messageTypes: message["type"] = 'normal' self.onMessage(message) def onMessage(self, msg): notification_center = NotificationCenter() sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) msg_type = msg.getAttribute('type') msg_id = msg.getAttribute('id', None) is_empty = msg.body is None and msg.html is None if msg_type == 'error': error_type = msg.error['type'] conditions = [(child.name, child.defaultUri) for child in msg.error.elements()] error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg_id) notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=NotificationData(error_message=error_message)) return if msg_type in (None, 'normal', 'chat') and not is_empty: body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) try: elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS) except StopIteration: use_receipt = False else: use_receipt = elem.name == u'request' if msg_type == 'chat': message = ChatMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt) notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=NotificationData(message=message)) else: message = NormalMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt) notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=NotificationData(message=message)) return # Check if it's a composing indication if msg_type == 'chat' and is_empty: for elem in msg.elements(): try: elem = next(c for c in msg.elements() if c.uri == CHATSTATES_NS) except StopIteration: pass else: composing_indication = ChatComposingIndication(sender, recipient, elem.name, id=msg_id) notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=NotificationData(composing_indication=composing_indication)) return # Check if it's a receipt acknowledgement if is_empty: try: elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS) except StopIteration: pass else: if elem.name == u'received' and msg_id is not None: receipt = MessageReceipt(sender, recipient, msg_id) notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=NotificationData(receipt=receipt)) class PresenceProtocol(xmppim.PresenceProtocol): def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') show = stanza.show statuses = stanza.statuses presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def _process_subscription_stanza(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') type = stanza.element.getAttribute('type') presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) NotificationCenter().post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def subscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def subscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def probeReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = ProbePresence(sender, recipient, id=id) NotificationCenter().post_notification('XMPPGotPresenceProbe', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) class MUCServerProtocol(xmppim.BasePresenceProtocol): messageTypes = None, 'normal', 'chat', 'groupchat' presenceTypeParserMap = {'available': muc.UserPresence, 'unavailable': muc.UserPresence} def connectionInitialized(self): self.xmlstream.addObserver('/presence/x[@xmlns="%s"]' % muc.NS_MUC, self._onPresence) self.xmlstream.addObserver('/message', self._onMessage) def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType == 'error': return if messageType not in self.messageTypes: message['type'] = 'normal' if messageType == 'groupchat': self.onGroupChat(message) else: to_uri = FrozenURI.parse('xmpp:'+message['to']) if to_uri.host in self.parent.domains: # Check if it's an invitation if message.x is not None and message.x.invite is not None and message.x.invite.uri == MUC_USER_NS: self.onInvitation(message) else: # TODO: give error, private messages not supported pass def onGroupChat(self, msg): sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) message = GroupChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None)) NotificationCenter().post_notification('XMPPMucGotGroupChat', sender=self.parent, data=NotificationData(message=message)) def onInvitation(self, msg): sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) invited_user_uri = FrozenURI.parse('xmpp:'+msg.x.invite['to']) invited_user = Identity(invited_user_uri) if msg.x.invite.reason is not None and msg.x.invite.reason.uri == MUC_USER_NS: reason = unicode(msg.x.invite.reason) else: reason = None invitation = IncomingInvitationMessage(sender, recipient, invited_user=invited_user, reason=reason, id=msg.getAttribute('id', None)) NotificationCenter().post_notification('XMPPMucGotInvitation', sender=self.parent, data=NotificationData(invitation=invitation)) def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=True, id=id) NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=False, id=id) NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) class DiscoProtocol(disco.DiscoHandler): def info(self, requestor, target, nodeIdentifier): """ Gather data for a disco info request. @param requestor: The entity that sent the request. @type requestor: L{JID} @param target: The entity the request was sent to. @type target: L{JID} @param nodeIdentifier: The optional node being queried, or C{''}. @type nodeIdentifier: C{unicode} @return: Deferred with the gathered results from sibling handlers. @rtype: L{defer.Deferred} """ xmpp_manager = self.parent.manager if target.host not in xmpp_manager.domains | xmpp_manager.muc_domains: return defer.fail(StanzaError('service-unavailable')) elements = [] elements.append(disco.DiscoFeature(disco.NS_DISCO_INFO)) elements.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS)) elements.append(disco.DiscoFeature('http://sylkserver.com')) if target.host in xmpp_manager.muc_domains: elements.append(disco.DiscoIdentity('conference', 'text', 'SylkServer Chat Service')) elements.append(disco.DiscoFeature('http://jabber.org/protocol/muc')) elements.append(disco.DiscoFeature('urn:ietf:rfc:3264')) + elements.append(disco.DiscoFeature('urn:xmpp:coin')) elements.append(disco.DiscoFeature(jingle.NS_JINGLE)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_AUDIO)) #elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_VIDEO)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_ICE_UDP_TRANSPORT)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_RAW_UDP_TRANSPORT)) if target.user: # We can't say much more here, because the actual conference may end up on a different server elements.append(disco.DiscoFeature('muc_temporary')) elements.append(disco.DiscoFeature('muc_unmoderated')) else: elements.append(disco.DiscoFeature(ping.NS_PING)) if not target.user: elements.append(disco.DiscoIdentity('gateway', 'simple', 'SylkServer')) elements.append(disco.DiscoIdentity('server', 'im', 'SylkServer')) else: elements.append(disco.DiscoIdentity('client', 'pc')) elements.append(disco.DiscoFeature('http://jabber.org/protocol/caps')) elements.append(disco.DiscoFeature('http://jabber.org/protocol/chatstates')) elements.append(disco.DiscoFeature('urn:ietf:rfc:3264')) + elements.append(disco.DiscoFeature('urn:xmpp:coin')) elements.append(disco.DiscoFeature(jingle.NS_JINGLE)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_AUDIO)) #elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_VIDEO)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_ICE_UDP_TRANSPORT)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_RAW_UDP_TRANSPORT)) return defer.succeed(elements) def items(self, requestor, target, nodeIdentifier): """ Gather data for a disco items request. @param requestor: The entity that sent the request. @type requestor: L{JID} @param target: The entity the request was sent to. @type target: L{JID} @param nodeIdentifier: The optional node being queried, or C{''}. @type nodeIdentifier: C{unicode} @return: Deferred with the gathered results from sibling handlers. @rtype: L{defer.Deferred} """ xmpp_manager = self.parent.manager items = [] if not target.user and target.host in xmpp_manager.domains: items.append(disco.DiscoItem(JID('%s.%s' % (XMPPGatewayConfig.muc_prefix, target.host)), name='Multi-User Chat')) return defer.succeed(items) class JingleProtocol(jingle.JingleHandler): # Functions here need to return immediately so that the IQ result is sent, so schedule them in the reactor # TODO: review and remove this, just post notifications? def onSessionInitiate(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleSessionInitiate', sender=self.parent, data=NotificationData(stanza=request, protocol=self)) def onSessionTerminate(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleSessionTerminate', sender=self.parent, data=NotificationData(stanza=request)) def onSessionAccept(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleSessionAccept', sender=self.parent, data=NotificationData(stanza=request)) def onSessionInfo(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleSessionInfo', sender=self.parent, data=NotificationData(stanza=request)) def onDescriptionInfo(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleDescriptionInfo', sender=self.parent, data=NotificationData(stanza=request)) def onTransportInfo(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleTransportInfo', sender=self.parent, data=NotificationData(stanza=request)) class MUCPresenceProtocol(xmppim.PresenceProtocol): """Protocol implementation to handle presence subscription to MUC URIs """ def subscribeReceived(self, stanza): """ Subscription request was received. """ self.subscribed(stanza.sender, sender=stanza.recipient) self.send_available(stanza) def unsubscribeReceived(self, stanza): """ Unsubscription request was received. """ self.unsubscribed(stanza.sender, sender=stanza.recipient) def probeReceived(self, stanza): """ Probe presence was received. """ self.send_available(stanza) def send_available(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) available = AvailabilityPresence(sender=recipient, recipient=sender) self.send(available.to_xml_element()) diff --git a/sylk/applications/xmppgateway/xmpp/stanzas/jingle.py b/sylk/applications/xmppgateway/xmpp/stanzas/jingle.py index 89cbbd0..3c7a133 100644 --- a/sylk/applications/xmppgateway/xmpp/stanzas/jingle.py +++ b/sylk/applications/xmppgateway/xmpp/stanzas/jingle.py @@ -1,650 +1,693 @@ # Copyright (c) AG Projects # Copyright (c) Uday Verma # Copyright (c) Ralph Meijer. # """ XMPP Jingle Protocol. This protocol is specified in * XEP-0166 - http://xmpp.org/extensions/xep-0166.html * XEP-0167 - http://xmpp.org/extensions/xep-0167.html * XEP-0176 - http://xmpp.org/extensions/xep-0176.html * XEP-0177 - http://xmpp.org/extensions/xep-0177.html """ from twisted.words.xish import domish from twisted.words.protocols.jabber import error from wokkel.generic import Request from wokkel.subprotocols import IQHandlerMixin, XMPPHandler NS_JINGLE_BASE = 'urn:xmpp:jingle' NS_JINGLE = NS_JINGLE_BASE + ':1' NS_JINGLE_ERRORS = NS_JINGLE_BASE + ':errors:1' NS_JINGLE_APPS_BASE = NS_JINGLE_BASE + ':apps' NS_JINGLE_APPS_RTP = NS_JINGLE_APPS_BASE + ':rtp:1' NS_JINGLE_APPS_RTP_INFO = NS_JINGLE_APPS_BASE + ':rtp:info:1' NS_JINGLE_APPS_RTP_AUDIO = NS_JINGLE_APPS_BASE + ':rtp:audio' NS_JINGLE_APPS_RTP_VIDEO = NS_JINGLE_APPS_BASE + ':rtp:video' +NS_JINGLE_APPS_COIN = NS_JINGLE_APPS_BASE + ':coin:1' + NS_JINGLE_ICE_UDP_TRANSPORT = NS_JINGLE_BASE + ':transports:ice-udp:1' NS_JINGLE_RAW_UDP_TRANSPORT = NS_JINGLE_BASE + ':transports:raw-udp:1' # XPath for Jingle IQ requests IQ_JINGLE_REQUEST = '/iq[@type="get" or @type="set"]/jingle[@xmlns="' + NS_JINGLE + '"]' class Parameter(object): """ A class representing a payload parameter """ def __init__(self, name, value): self.name, self.value = name, value @classmethod def fromElement(cls, element): return cls(element.getAttribute('name'), element.getAttribute('value')) def toElement(self, defaultUri=None): element = domish.Element((defaultUri, 'parameter')) element['name'] = self.name element['value'] = self.value or '' return element class Crypto(object): """ A crypto method which makes up the encryption to be used """ def __init__(self, crypto_suite, key_params, tag, session_params=None): self.crypto_suite, self.key_params, self.tag, self.session_params = crypto_suite, key_params, tag, session_params @classmethod def fromElement(cls, element): return cls(element.getAttribute('crypto-suite'), element.getAttribute('key-params'), element.getAttribute('tag'), element.getAttribute('session-params')) def toElement(self, defaultUri=None): element = domish.Element((defaultUri, 'crypto')) element['crypto-suite'] = self.crypto_suite element['key-params'] = self.key_params if self.session_params: element['session-params'] = self.session_params element['tag'] = self.tag return element class Encryption(object): """ A class representing encryption method """ def __init__(self, required=False, cryptos=None): self.required, self.cryptos = required, cryptos or [] @classmethod def fromElement(cls, element): cryptos = [] for child in element.elements(): if child.name == 'crypto': cryptos.append(Crypto.fromElement(child)) # TODO: parse ZRTP elements required = element.hasAttribute('required') and (element.getAttribute('required').lower() in ['true', '1']) return cls(required, cryptos) def toElement(self, defaultUri=None): element = domish.Element((defaultUri, 'encryption')) if self.required: element['required'] = '1' for c in self.cryptos: element.addChild(c.toElement(defaultUri)) return element class Bandwidth(object): """ A class representing the bandwidth element """ def __init__(self, typ, value): self.typ, self.value = typ, value @classmethod def fromElement(cls, element): return cls(element.getAttribute('type'), str(element)) def toElement(self, defaultUri=None): element = domish.Element((defaultUri, 'bandwidth')) element['type'] = self.typ element.addContent(self.value) return element class PayloadType(object): """ A class representing payload type """ def __init__(self, id, name, clockrate=0, channels=0, maxptime=None, ptime=None, parameters=None): self.id, self.name, self.clockrate, self.channels, \ self.maxptime, self.ptime, self.parameters = \ id, name, clockrate, channels, maxptime, ptime, parameters or [] @classmethod def fromElement(cls, element): def _sga(v, t): """ SafeGetAttribute """ try: return t(element.getAttribute(v)) except (TypeError, ValueError): return None params = [] for c in element.children: params.append(Parameter.fromElement(c)) return cls(int(element.getAttribute('id')), element.getAttribute('name'), _sga('clockrate', int) or 0, _sga('channels', int) or 0, _sga('maxptime', int) or 0, _sga('ptime', int) or 0, params) def toElement(self, defaultUri=None): element = domish.Element((defaultUri, 'payload-type')) def _aiv(k, v): """ AppendIfValid """ if v: element[k] = str(v) element['id'] = str(self.id) _aiv('name', self.name) _aiv('clockrate', self.clockrate) _aiv('channels', self.channels) _aiv('maxptime', self.maxptime) _aiv('ptime', self.ptime) for p in self.parameters: element.addChild(p.toElement()) return element class ICECandidate(object): """ A class representing an ICE candidate """ def __init__(self, component, foundation, generation, id, ip, network, port, priority, protocol, typ, related_addr=None, related_port=0): self.component, self.foundation, self.generation, \ self.id, self.ip, self.network, self.port, self.priority, \ self.protocol, self.typ, self.related_addr, self.related_port = \ component, foundation, generation, \ id, ip, network, port, priority, protocol, typ, \ related_addr, related_port @classmethod def fromElement(cls, element): def _gas(*names): """ GetAttributeS """ def default_val(t): return None if t is str else t() return [(t(element.getAttribute(name)) if element.hasAttribute(name) else default_val(t)) for name, t in names] return cls(*_gas(('component', int), ('foundation', int), ('generation', int), ('id', str), ('ip', str), ('network', int), ('port', int), ('priority', int), ('protocol', str), ('type', str), ('rel-addr', str), ('rel-port', int))) def toElement(self, defaultUri=None): element = domish.Element((defaultUri, 'candidate')) def _aas(*names): """ AddAttributeS """ for n, v in names: if v is not None: element[n] = str(v) _aas(*[('component', self.component), ('foundation', self.foundation), ('generation', self.generation), ('id', self.id), ('ip', self.ip), ('network', self.network), ('port', self.port), ('priority', self.priority), ('protocol', self.protocol), ('type', self.typ), ('rel-addr', self.related_addr), ('rel-port', self.related_port)]) return element class UDPCandidate(object): """ A class representing a UDP candidate """ def __init__(self, component, generation, id_, ip, port, protocol, type=None): self.component = component self.generation = generation self.id = id_ self.ip = ip self.port = port self.protocol = protocol self.type = type @classmethod def fromElement(cls, element): def _gas(*names): """ GetAttributeS """ def default_val(t): return None if t is str else t() return [(t(element.getAttribute(name)) if element.hasAttribute(name) else default_val(t)) for name, t in names] return cls(*_gas(('component', int), ('generation', int), ('id', str), ('ip', str), ('port', int), ('protocol', str), ('type', str))) def toElement(self, defaultUri=None): element = domish.Element((defaultUri, 'candidate')) def _aas(*names): """ AddAttributeS """ for n, v in names: if v: element[n] = str(v) _aas(*[('component', self.component), ('generation', self.generation), ('id', self.id), ('ip', self.ip), ('port', self.port), ('protocol', self.protocol), ('type', self.type)]) return element class ICERemoteCandidate(object): """ A class represeting a remote candidate entity """ def __init__(self, component, ip, port): self.component, self.ip, self.port = component, ip, port @classmethod def fromElement(cls, element): return cls(int(element.getAttribute('component') or '0'), element.getAttribute('ip'), int(element.getAttribute('port') or '0')) def toElement(self, defaultUri=None): element = domish.Element((defaultUri, 'remote-candidate')) element['component'] = str(self.component) element['ip'] = self.ip element['port'] = str(self.port) return element class IceUdpTransport(object): """ Represents the ICE-UDP transport type """ def __init__(self, pwd=None, ufrag=None, candidates=None, remote_candidate=None): self.password, self.ufrag, self.candidates, self.remote_candidate = \ pwd, ufrag, candidates or [], remote_candidate @classmethod def fromElement(cls, element): password = element.getAttribute('pwd') or None ufrag = element.getAttribute('ufrag') or None candidates = [] remote_candidate = None for child in element.elements(): if child.name == 'remote-candidate' and remote_candidate is None: remote_candidate = ICERemoteCandidate.fromElement(child) elif child.name == 'candidate': candidates.append(ICECandidate.fromElement(child)) return cls(pwd=password, ufrag=ufrag, candidates=candidates, remote_candidate=remote_candidate) def toElement(self, defaultUri=None): element = domish.Element((defaultUri or NS_JINGLE_ICE_UDP_TRANSPORT, 'transport')) if self.password: element['pwd'] = self.password if self.ufrag: element['ufrag'] = self.ufrag if self.remote_candidate: element.addChild(self.remote_candidate.toElement()) elif self.candidates: for c in self.candidates: element.addChild(c.toElement()) return element class RawUdpTransport(object): """ Represents the Raw-UDP transport type """ def __init__(self, candidates=None): self.candidates = candidates or [] @classmethod def fromElement(cls, element): candidates = [] for child in element.elements(): if child.name == 'candidate': candidates.append(UDPCandidate.fromElement(child)) return cls(candidates=candidates) def toElement(self, defaultUri=None): element = domish.Element((defaultUri or NS_JINGLE_RAW_UDP_TRANSPORT, 'transport')) for c in self.candidates: element.addChild(c.toElement()) return element class RTPDescription(object): """ A class representing a RTP description """ def __init__(self, name=None, media=None, ssrc=None, payloads=None, encryption=None, bandwidth=None): self.name, self.media, self.ssrc, self.payloads, \ self.encryption, self.bandwidth = \ name, media, ssrc, payloads or [], encryption, bandwidth @classmethod def fromElement(cls, element): plds = [] encryption, bandwidth = None, None for child in element.elements(): if child.name == 'payload-type': plds.append(PayloadType.fromElement(child)) if child.name == 'encryption': encryption = Encryption.fromElement(child) if child.name == 'bandwidth': bandwidth = Bandwidth.fromElement(child) return cls(element.getAttribute('name'), element.getAttribute('media'), element.getAttribute('ssrc'), plds, encryption, bandwidth) def toElement(self, defaultUri=None): element = domish.Element((defaultUri or NS_JINGLE_APPS_RTP, 'description')) if self.name: element['name'] = self.name if self.media: element['media'] = self.media for p in self.payloads: element.addChild(p.toElement(defaultUri)) if self.encryption: element.addChild(self.encryption.toElement(defaultUri)) if self.bandwidth: element.addChild(self.bandwidth.toElement(defaultUri)) return element class Content(object): """ A class indicating a single content item within a jingle request. """ def __init__(self, creator, name, disposition=None, senders=None): self.creator, self.name, self.disposition, self.senders = \ creator, name, disposition, senders self.description = None self.transport = None @classmethod def fromElement(cls, element): creator = element.getAttribute('creator') name = element.getAttribute('name') disposition = element.getAttribute('disposition') senders = element.getAttribute('senders') description, transport = None, None for c in element.elements(): if c.name == 'description' and c.uri == NS_JINGLE_APPS_RTP: description = RTPDescription.fromElement(c) elif c.name == 'transport' and c.uri == NS_JINGLE_ICE_UDP_TRANSPORT: transport = IceUdpTransport.fromElement(c) elif c.name == 'transport' and c.uri == NS_JINGLE_RAW_UDP_TRANSPORT: transport = RawUdpTransport.fromElement(c) ret = cls(creator, name, disposition, senders) ret.description = description ret.transport = transport return ret def toElement(self): element = domish.Element((None, 'content')) element['creator'] = self.creator element['name'] = self.name if self.disposition: element['disposition'] = self.disposition if self.senders: element['senders'] = self.senders if self.description: element.addChild(self.description.toElement()) if self.transport: element.addChild(self.transport.toElement()) return element class EmptyType(unicode): @classmethod def fromElement(cls, element): return cls(element.name) def toElement(self): return domish.Element((None, self)) class ReasonType(EmptyType): pass class AlternativeSessionReason(unicode): def __new__(cls, value): obj = unicode.__new__(cls, 'alternative-session') obj.sid = value return obj @classmethod def fromElement(cls, element): return cls(element.firstChildElement().children[0]) def toElement(self): element = domish.Element((None, self)) element.addElement('sid', content=self.sid) return element class Reason(object): def __init__(self, reason, text=None): self.value = reason self.text = text @classmethod def fromElement(cls, element): reason = None text = None for c in element.children: if c.name == 'text': text = c.children[0] elif c.name == 'alternative-session': reason = AlternativeSessionReason.fromElement(c) else: reason = ReasonType.fromElement(c) return cls(reason, text) def toElement(self): element = domish.Element((None, 'reason')) element.addChild(self.value.toElement()) if self.text: element.addElement('text', content=self.text) return element class Info(unicode): @classmethod def fromElement(cls, element): return cls(element.name) def toElement(self): return domish.Element((NS_JINGLE_APPS_RTP_INFO, self)) class MuteInfo(Info): def __new__(cls, value, creator, name): obj = unicode.__new__(cls, value) obj.creator = creator obj.name = name return obj @classmethod def fromElement(cls, element): return cls(element.name, element['creator'], element['name']) def toElement(self): element = super(MuteInfo, self).toElement() element['creator'] = self.creator element['name'] = self.name return element +class ConferenceInfo(object): + + def __init__(self, isfocus): + self.isfocus = isfocus + + @classmethod + def fromElement(cls, element): + return cls(element.getAttribute('isfocus')) + + def toElement(self, defaultUri=None): + #element = domish.Element((defaultUri or NS_JINGLE_APPS_COIN, 'conference-info')) + # TODO: Jitsi sends an empty string here, lets do the same until they fix it + element = domish.Element(("", 'conference-info')) + element['isfocus'] = 'true' if self.isfocus else 'false' + return element + + class Jingle(object): """ A class representing a Jingle element within an IQ request """ - def __init__(self, action, sid, initiator=None, responder=None, content=None, reason=None, info=None): + def __init__(self, action, sid, initiator=None, responder=None, content=None, reason=None, info=None, conference_info=None): self.action = action self.sid = sid self.initiator = initiator self.responder = responder self.reason = reason self.info = info if not hasattr(content, '__iter__'): if content is not None: self.content = [content] else: self.content = [] else: self.content = content + self.conference_info = conference_info @classmethod def fromElement(cls, element): action = element.getAttribute('action') initiator = element.getAttribute('initiator') responder = element.getAttribute('responder') sid = element.getAttribute('sid') content = [] reason = None info = None + conference_info = None + for c in element.elements(): if c.name == 'content': content.append(Content.fromElement(c)) elif c.name == 'reason': reason = Reason.fromElement(c) elif c.uri == NS_JINGLE_APPS_RTP_INFO: if c.name in ('mute', 'unmute'): info = MuteInfo.fromElement(c) else: info = Info.fromElement(c) - return cls(action, sid, initiator, responder, content=content, reason=reason, info=info) + elif c.name == 'conference-info' and c.uri == NS_JINGLE_APPS_COIN: + conference_info = ConferenceInfo.fromElement(c) + return cls(action, sid, initiator, responder, content=content, reason=reason, info=info, conference_info=conference_info) def toElement(self): element = domish.Element((NS_JINGLE, 'jingle')) element['action'] = self.action element['sid'] = self.sid if self.initiator: element['initiator'] = self.initiator if self.responder: element['responder'] = self.responder for c in self.content: element.addChild(c.toElement()) if self.reason: element.addChild(self.reason.toElement()) if self.info: element.addChild(self.info.toElement()) + if self.conference_info: + element.addChild(self.conference_info.toElement()) return element class JingleIq(Request): stanzaKind = 'iq' stanzaType = 'set' timeout = None childParsers = {(NS_JINGLE, 'jingle'): '_parseJingleElement'} def __init__(self, sender=None, recipient=None, jingle=None): Request.__init__(self, recipient, sender, self.stanzaType) self.jingle = jingle def _parseJingleElement(self, element): self.jingle = Jingle.fromElement(element) def toElement(self): element = Request.toElement(self) element.addChild(self.jingle.toElement()) return element +class ConferenceInfoIq(Request): + stanzaKind = 'iq' + stanzaType = 'set' + timeout = None + + def __init__(self, sender=None, recipient=None, payload=None): + if not payload: + raise ValueError('conference info payload cannot be empty') + Request.__init__(self, recipient, sender, self.stanzaType) + self.payload = payload + + def toElement(self): + element = Request.toElement(self) + element.addRawXml(self.payload) + return element + + class JingleHandler(XMPPHandler, IQHandlerMixin): iqHandlers = {IQ_JINGLE_REQUEST: '_onJingleRequest'} def connectionInitialized(self): self.xmlstream.addObserver(IQ_JINGLE_REQUEST, self.handleRequest) def sessionTerminate(self, sender, recipient, sid, reason=None): jingle = Jingle('session-terminate', sid, reason=reason) return JingleIq(sender=sender, recipient=recipient, jingle=jingle) def sessionInfo(self, sender, recipient, sid, info=None): jingle = Jingle('session-info', sid, info=info) return JingleIq(sender=sender, recipient=recipient, jingle=jingle) def sessionAccept(self, sender, recipient, payload): payload.action = 'session-accept' return JingleIq(sender=sender, recipient=recipient, jingle=payload) def sessionInitiate(self, sender, recipient, payload): payload.action = 'session-initiate' return JingleIq(sender=sender, recipient=recipient, jingle=payload) def _onJingleRequest(self, iq): request = JingleIq.fromElement(iq) method_name = 'on'+''.join(item.capitalize() for item in request.jingle.action.lower().split('-')) handler = getattr(self, method_name, None) if callable(handler): handler(request) else: raise error.StanzaError('bad-request')