diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py index 67d6888..ffd29e5 100644 --- a/sylk/applications/xmppgateway/im.py +++ b/sylk/applications/xmppgateway/im.py @@ -1,447 +1,445 @@ from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from collections import deque from eventlib import coros from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Message as SIPMessageRequest from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.msrp.chat import ChatIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage from sylk.session import Session __all__ = 'ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError' SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout class ChatSessionHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self): self.started = False self.ended = False self.sip_session = None self.msrp_stream = None self._sip_session_timer = None self.use_receipts = False self.xmpp_session = None self._xmpp_message_queue = deque() self._pending_msrp_chunks = {} self._pending_xmpp_stanzas = {} - def _get_started(self): + @property + def started(self): return self.__dict__['started'] - def _set_started(self, value): + @started.setter + def started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('ChatSessionDidStart', sender=self) self._send_queued_messages() - started = property(_get_started, _set_started) - del _get_started, _set_started - - def _get_xmpp_session(self): + @property + def xmpp_session(self): return self.__dict__['xmpp_session'] - def _set_xmpp_session(self, session): + @xmpp_session.setter + def xmpp_session(self, session): self.__dict__['xmpp_session'] = session if session is not None: # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) NotificationCenter().add_observer(self, sender=session) session.start() # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) - xmpp_session = property(_get_xmpp_session, _set_xmpp_session) - del _get_xmpp_session, _set_xmpp_session - @classmethod def new_from_sip_session(cls, sip_identity, session): instance = cls() instance.sip_identity = sip_identity instance._start_incoming_sip_session(session) return instance @classmethod def new_from_xmpp_stanza(cls, xmpp_identity, recipient): instance = cls() instance.xmpp_identity = xmpp_identity instance._start_outgoing_sip_session(recipient) return instance @run_in_green_thread def _start_incoming_sip_session(self, session): self.sip_session = session self.msrp_stream = next(stream for stream in session.proposed_streams if stream.type=='chat') notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.accept([self.msrp_stream]) @run_in_green_thread def _start_outgoing_sip_session(self, target_uri): notification_center = NotificationCenter() # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = target_uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return self.msrp_stream = MediaStreamRegistry.get('chat')() route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self.msrp_stream]) def end(self): if self.ended: return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.cancel() self._sip_session_timer = None notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session.end() self.sip_session = None self.msrp_stream = None if self.xmpp_session is not None: notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session.end() self.xmpp_session = None self.ended = True if self.started: notification_center.post_notification('ChatSessionDidEnd', sender=self) else: notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='Ended before actually started')) def enqueue_xmpp_message(self, message): self._xmpp_message_queue.append(message) if self.started: self._send_queued_messages() def _send_queued_messages(self): sender = None while self._xmpp_message_queue: message = self._xmpp_message_queue.popleft() if message.body is None: continue sender_uri = message.sender.uri.as_sip_uri() sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8')) sender = ChatIdentity(sender_uri) self.msrp_stream.send_message(message.body, 'text/plain', sender=sender, message_id=str(message.id), notify_progress=message.use_receipt) if sender: self.msrp_stream.send_composing_indication('idle', 30, sender=sender) def _inactivity_timeout(self): log.info("Ending SIP session %s due to inactivity" % self.sip_session.call_id) self.sip_session.end() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.info("SIP session %s started" % self.sip_session.call_id) self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout) if self.sip_session.direction == 'outgoing': # Time to set sip_identity and create the XMPPChatSession contact_uri = self.sip_session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = self.sip_session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name) session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) self.xmpp_session = session # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: if self.xmpp_session is not None: # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: # Try to wakeup XMPP clients sender = self.sip_identity tmp = self.sip_session.local_identity.uri recipient_uri = FrozenURI(tmp.user, tmp.host) recipient = Identity(recipient_uri) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, ' ', 'text/plain')) # Send queued messages self._send_queued_messages() def _NH_SIPSessionDidEnd(self, notification): log.info("SIP session %s ended" % self.sip_session.call_id) notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.info("SIP session %s failed" % self.sip_session.call_id) notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.content else: html_body = message.content body = None if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) chunk = notification.data.chunk if self.started: self.xmpp_session.send_message(body, html_body, message_id=chunk.message_id) if self.use_receipts: self._pending_msrp_chunks[chunk.message_id] = chunk else: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') else: sender = self.sip_identity recipient_uri = FrozenURI.parse(message.recipients[0].uri) recipient = Identity(recipient_uri, message.recipients[0].display_name) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, body, html_body)) self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_ChatStreamGotComposingIndication(self, notification): # Notification is sent by the MSRP stream if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) if not self.started: return state = None if notification.data.state == 'active': state = 'composing' elif notification.data.state == 'idle': state = 'paused' if state is not None: self.xmpp_session.send_composing_indication(state) def _NH_ChatStreamDidDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_ChatStreamDidNotDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_error(message, 'TODO', []) # TODO def _NH_XMPPChatSessionDidStart(self, notification): if self.sip_session is not None: # Session is now established on both ends self.started = True def _NH_XMPPChatSessionDidEnd(self, notification): notification.center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session = None self.end() def _NH_XMPPChatSessionGotMessage(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': self._xmpp_message_queue.append(notification.data.message) return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = ChatIdentity(sender_uri) self.use_receipts = message.use_receipt if not message.use_receipt: notify_progress = False else: notify_progress = True self._pending_xmpp_stanzas[message.id] = message # Prefer plaintext self.msrp_stream.send_message(message.body, 'text/plain', sender=sender, message_id=str(message.id), notify_progress=notify_progress) self.msrp_stream.send_composing_indication('idle', 30, sender=sender) def _NH_XMPPChatSessionGotComposingIndication(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message state = None if message.state == 'composing': state = 'active' elif message.state == 'paused': state = 'idle' if state is not None: sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = ChatIdentity(sender_uri) self.msrp_stream.send_composing_indication(state, 30, sender=sender) if message.use_receipt: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_XMPPChatSessionDidDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason) def chunks(text, size): for i in xrange(0, len(text), size): yield text[i:i+size] class SIPMessageError(Exception): def __init__(self, code, reason): Exception.__init__(self, reason) self.code = code self.reason = reason class SIPMessageSender(object): implements(IObserver) def __init__(self, message): # TODO: sometimes we may want to send it to the GRUU, for example when a XMPP client # replies to one of our messages. MESSAGE requests don't need a Contact header, though # so how should we communicate our GRUU to the recipient? self.from_uri = message.sender.uri.as_sip_uri() self.from_uri.parameters.pop('gr', None) # No GRUU in From header self.to_uri = message.recipient.uri.as_sip_uri() self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU self.body = message.body self.content_type = 'text/plain' self._requests = set() self._channel = coros.queue() @run_in_waitable_green_thread def send(self): lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: msg = 'DNS lookup error while looking for %s proxy' % uri log.warning(msg) raise SIPMessageError(0, msg) else: route = routes.pop(0) from_header = FromHeader(self.from_uri) to_header = ToHeader(self.to_uri) route_header = RouteHeader(route.uri) notification_center = NotificationCenter() for chunk in chunks(self.body, 1000): request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body) notification_center.add_observer(self, sender=request) self._requests.add(request) request.send() error = None count = len(self._requests) while count > 0: notification = self._channel.wait() if notification.name == 'SIPMessageDidFail': error = (notification.data.code, notification.data.reason) count -= 1 self._requests.clear() if error is not None: raise SIPMessageError(*error) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPMessageDidSucceed(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._channel.send(notification) def _NH_SIPMessageDidFail(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._channel.send(notification) diff --git a/sylk/applications/xmppgateway/media.py b/sylk/applications/xmppgateway/media.py index 99df9c6..50699a8 100644 --- a/sylk/applications/xmppgateway/media.py +++ b/sylk/applications/xmppgateway/media.py @@ -1,326 +1,327 @@ from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib.twistedutil import block_on from sipsimple.audio import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import FromHeader, ToHeader from sipsimple.core import SIPURI, SIPCoreError from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry as SIPMediaStreamRegistry from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry as JingleMediaStreamRegistry from sylk.applications.xmppgateway.xmpp.stanzas import jingle from sylk.session import Session __all__ = 'MediaSessionHandler', class MediaSessionHandler(object): implements(IObserver) def __init__(self): self.started = False self.ended = False self._sip_identity = None self._xmpp_identity = None self._audio_bidge = AudioConference() self.sip_session = None self.jingle_session = None @classmethod def new_from_sip_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = JingleMediaStreamRegistry.get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject(488) return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) instance._sip_identity = Identity(sip_leg_uri) # Get URI representing the XMPP side request_uri = session.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) instance._xmpp_identity = Identity(xmpp_leg_uri) instance.sip_session = session instance._start_outgoing_jingle_session(streams) return instance @classmethod def new_from_jingle_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = SIPMediaStreamRegistry.get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject('unsupported-applications') return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) instance._xmpp_identity = session.remote_identity instance._sip_identity = session.local_identity instance.jingle_session = session instance._start_outgoing_sip_session(streams) return instance @property def sip_identity(self): return self._sip_identity @property def xmpp_identity(self): return self._xmpp_identity - def _set_started(self, value): + @property + def started(self): + return self.__dict__['started'] + + @started.setter + def started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('MediaSessionHandlerDidStart', sender=self) - def _get_started(self): - return self.__dict__['started'] - started = property(_get_started, _set_started) - del _get_started, _set_started @run_in_green_thread def _start_outgoing_sip_session(self, streams): notification_center = NotificationCenter() # self.xmpp_identity is our local identity on the SIP side from_uri = self.xmpp_identity.uri.as_sip_uri() from_uri.parameters.pop('gr', None) # no GRUU in From header to_uri = self.sip_identity.uri.as_sip_uri() to_uri.parameters.pop('gr', None) # no GRUU in To header # TODO: need to fix GRUU in the proxy #contact_uri = self.xmpp_identity.uri.as_sip_uri() #contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('MedialSessionHandlerDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) self.sip_session.connect(from_header, to_header, route=route, streams=streams) @run_in_green_thread def _start_outgoing_jingle_session(self, streams): if self.xmpp_identity.uri.resource is not None: self.sip_session.reject() return xmpp_manager = XMPPManager() local_jid = self.sip_identity.uri.as_xmpp_jid() remote_jid = self.xmpp_identity.uri.as_xmpp_jid() # If this was an invitation to a conference, use the information in the Referred-By header if self.sip_identity.uri.host in xmpp_manager.muc_domains and self.sip_session.transfer_info and self.sip_session.transfer_info.referred_by: try: referred_by_uri = SIPURI.parse(self.sip_session.transfer_info.referred_by) except SIPCoreError: self.sip_session.reject(488) return else: inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) local_jid = inviter_uri.as_xmpp_jid() # Use disco to gather potential JIDs to call d = xmpp_manager.disco_client_protocol.requestItems(remote_jid, sender=local_jid) try: items = block_on(d) except Exception: items = [] if not items: self.sip_session.reject(480) return # Check which items support Jingle valid = [] for item in items: d = xmpp_manager.disco_client_protocol.requestInfo(item.entity, nodeIdentifier=item.nodeIdentifier, sender=local_jid) try: info = block_on(d) except Exception: continue if jingle.NS_JINGLE in info.features and jingle.NS_JINGLE_APPS_RTP in info.features: valid.append(item.entity) if not valid: self.sip_session.reject(480) return # TODO: start multiple sessions? self._xmpp_identity = Identity(FrozenURI.parse(valid[0])) notification_center = NotificationCenter() if self.sip_identity.uri.host in xmpp_manager.muc_domains: self.jingle_session = JingleSession(xmpp_manager.jingle_coin_protocol) else: self.jingle_session = JingleSession(xmpp_manager.jingle_protocol) notification_center.add_observer(self, sender=self.jingle_session) self.jingle_session.connect(self.sip_identity, self.xmpp_identity, streams, is_focus=self.sip_session.remote_focus) def end(self): if self.ended: return notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) if self.sip_session.direction == 'incoming' and not self.started: self.sip_session.reject() else: self.sip_session.end() self.sip_session = None if self.jingle_session is not None: notification_center.remove_observer(self, sender=self.jingle_session) if self.jingle_session.direction == 'incoming' and not self.started: self.jingle_session.reject() else: self.jingle_session.end() self.jingle_session = None self.ended = True if self.started: notification_center.post_notification('MediaSessionHandlerDidEnd', sender=self) else: notification_center.post_notification('MediaSessionHandlerDidFail', sender=self) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.info("SIP session %s started" % self.sip_session.call_id) if self.sip_session.direction == 'outgoing': # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.jingle_session.accept(self.jingle_session.proposed_streams, is_focus=self.sip_session.remote_focus) else: # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) def _NH_SIPSessionDidEnd(self, notification): log.info("SIP session %s ended" % self.sip_session.call_id) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionDidFail(self, notification): log.info("SIP session %s failed (%s)" % (self.sip_session.call_id, notification.data.reason)) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_SIPSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.jingle_session.hold() else: self.jingle_session.unhold() def _NH_SIPSessionGotConferenceInfo(self, notification): self.jingle_session._send_conference_info(notification.data.conference_info.toxml()) def _NH_JingleSessionDidStart(self, notification): log.info("Jingle session %s started" % notification.sender.id) if self.jingle_session.direction == 'incoming': # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) else: # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.sip_session.accept(self.sip_session.proposed_streams) def _NH_JingleSessionDidEnd(self, notification): log.info("Jingle session %s ended" % notification.sender.id) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidFail(self, notification): log.info("Jingle session %s failed (%s)" % (notification.sender.id, notification.data.reason)) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.sip_session.hold() else: self.sip_session.unhold() diff --git a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py index 39677b5..2acfa68 100644 --- a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py +++ b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py @@ -1,442 +1,441 @@ """ Handling of RTP media streams according to RFC3550, RFC3605, RFC3581, RFC2833 and RFC3711, RFC3489 and draft-ietf-mmusic-ice-19. """ from threading import RLock from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from zope.interface import implements from sipsimple.audio import AudioBridge, AudioDevice, IAudioPort from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioTransport, PJSIPError, RTPTransport, SIPCoreError from sipsimple.streams.rtp import RTPStreamEncryption from sylk.applications.xmppgateway.xmpp.jingle.streams import IMediaStream, InvalidStreamError, MediaStreamRegistrar, UnknownStreamError __all__ = 'AudioStream', class AudioStream(object): __metaclass__ = MediaStreamRegistrar implements(IMediaStream, IAudioPort, IObserver) type = 'audio' priority = 1 hold_supported = True def __init__(self): from sipsimple.application import SIPApplication self.mixer = SIPApplication.voice_audio_mixer self.bridge = AudioBridge(self.mixer) self.device = AudioDevice(self.mixer) self.notification_center = NotificationCenter() self.on_hold_by_local = False self.on_hold_by_remote = False self.direction = None self.state = 'NULL' self._transport = None self._hold_request = None self._ice_state = 'NULL' self._lock = RLock() self._rtp_transport = None self.session = None self.encryption = RTPStreamEncryption(self) self._srtp_encryption = None self._try_ice = False self._initialized = False self._done = False self._failure_reason = None self.bridge.add(self.device) # Audio properties # @property def codec(self): return self._transport.codec if self._transport else None @property def consumer_slot(self): return self._transport.slot if self._transport else None @property def producer_slot(self): return self._transport.slot if self._transport and not self.muted else None @property def sample_rate(self): return self._transport.sample_rate if self._transport else None @property def statistics(self): return self._transport.statistics if self._transport else None - def _get_muted(self): + @property + def muted(self): return self.__dict__.get('muted', False) - def _set_muted(self, value): + @muted.setter + def muted(self, value): if not isinstance(value, bool): raise ValueError('illegal value for muted property: %r' % (value,)) if value == self.muted: return old_producer_slot = self.producer_slot self.__dict__['muted'] = value notification_center = NotificationCenter() data = NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot) notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=data) - muted = property(_get_muted, _set_muted) - del _get_muted, _set_muted - # RTP properties # @property def local_rtp_address(self): return self._rtp_transport.local_rtp_address if self._rtp_transport else None @property def local_rtp_port(self): return self._rtp_transport.local_rtp_port if self._rtp_transport else None @property def remote_rtp_address(self): if self._ice_state == 'IN_USE': return self._rtp_transport.remote_rtp_address_received if self._rtp_transport else None else: return self._rtp_transport.remote_rtp_address_sdp if self._rtp_transport else None @property def remote_rtp_port(self): if self._ice_state == 'IN_USE': return self._rtp_transport.remote_rtp_port_received if self._rtp_transport else None else: return self._rtp_transport.remote_rtp_port_sdp if self._rtp_transport else None @property def local_rtp_candidate_type(self): return self._rtp_transport.local_rtp_candidate_type if self._rtp_transport else None @property def remote_rtp_candidate_type(self): return self._rtp_transport.remote_rtp_candidate_type if self._rtp_transport else None @property def ice_active(self): return self._ice_state == 'IN_USE' # Generic properties # @property def on_hold(self): return self.on_hold_by_local or self.on_hold_by_remote # Public methods # @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): # TODO: actually validate the SDP settings = SIPSimpleSettings() remote_stream = remote_sdp.media[stream_index] if remote_stream.media != 'audio': raise UnknownStreamError if remote_stream.transport not in ('RTP/AVP', 'RTP/SAVP'): raise InvalidStreamError('expected RTP/AVP or RTP/SAVP transport in audio stream, got %s' % remote_stream.transport) local_encryption_policy = 'sdes_optional' if local_encryption_policy == 'sdes_mandatory' and not 'crypto' in remote_stream.attributes: raise InvalidStreamError("SRTP/SDES is locally mandatory but it's not remotely enabled") if remote_stream.transport == 'RTP/SAVP' and 'crypto' in remote_stream.attributes and local_encryption_policy not in ('opportunistic', 'sdes_optional', 'sdes_mandatory'): raise InvalidStreamError("SRTP/SDES is remotely mandatory but it's not locally enabled") supported_codecs = session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs): raise InvalidStreamError('no compatible codecs found') stream = cls() stream._incoming_remote_sdp = remote_sdp stream._incoming_stream_index = stream_index return stream def initialize(self, session, direction): with self._lock: if self.state != 'NULL': raise RuntimeError('AudioStream.initialize() may only be called in the NULL state') self.state = 'INITIALIZING' self.session = session local_encryption_policy = 'sdes_optional' if hasattr(self, '_incoming_remote_sdp') and hasattr(self, '_incoming_stream_index'): # ICE attributes could come at the session level or at the media level remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index] self._try_ice = (remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates if 'zrtp-hash' in remote_stream.attributes: incoming_stream_encryption = 'zrtp' elif 'crypto' in remote_stream.attributes: incoming_stream_encryption = 'sdes_mandatory' if remote_stream.transport == 'RTP/SAVP' else 'sdes_optional' else: incoming_stream_encryption = None if incoming_stream_encryption is not None and local_encryption_policy == 'opportunistic': self._srtp_encryption = incoming_stream_encryption else: self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy else: self._try_ice = True self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy self._init_rtp_transport() def get_local_media(self, remote_sdp=None, index=0): with self._lock: if self.state not in ['INITIALIZED', 'WAIT_ICE', 'ESTABLISHED']: raise RuntimeError('AudioStream.get_local_media() may only be called in the INITIALIZED, WAIT_ICE or ESTABLISHED states') if remote_sdp is None: # offer old_direction = self._transport.direction if old_direction is None: new_direction = 'sendrecv' elif 'send' in old_direction: new_direction = ('sendonly' if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else 'sendrecv') else: new_direction = ('inactive' if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else 'recvonly') else: new_direction = None return self._transport.get_local_media(remote_sdp, index, new_direction) def start(self, local_sdp, remote_sdp, stream_index): with self._lock: if self.state != 'INITIALIZED': raise RuntimeError('AudioStream.start() may only be called in the INITIALIZED state') settings = SIPSimpleSettings() self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout) self._check_hold(self._transport.direction, True) if self._try_ice: self.state = 'WAIT_ICE' else: self.state = 'ESTABLISHED' self.notification_center.post_notification('MediaStreamDidStart', sender=self) def validate_update(self, remote_sdp, stream_index): with self._lock: # TODO: implement return True def update(self, local_sdp, remote_sdp, stream_index): with self._lock: connection = remote_sdp.media[stream_index].connection or remote_sdp.connection if not self._rtp_transport.ice_active and (connection.address != self._rtp_transport.remote_rtp_address_sdp or self._rtp_transport.remote_rtp_port_sdp != remote_sdp.media[stream_index].port): settings = SIPSimpleSettings() old_consumer_slot = self.consumer_slot old_producer_slot = self.producer_slot self.notification_center.remove_observer(self, sender=self._transport) self._transport.stop() try: self._transport = AudioTransport(self.mixer, self._rtp_transport, remote_sdp, stream_index, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) except SIPCoreError as e: self.state = 'ENDED' self._failure_reason = e.args[0] self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=self._failure_reason)) return self.notification_center.add_observer(self, sender=self._transport) self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout) self.notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=True, old_consumer_slot=old_consumer_slot, new_consumer_slot=self.consumer_slot, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot)) if connection.address == '0.0.0.0' and remote_sdp.media[stream_index].direction == 'sendrecv': self._transport.update_direction('recvonly') self._check_hold(self._transport.direction, False) self.notification_center.post_notification('RTPStreamDidChangeRTPParameters', sender=self) else: new_direction = local_sdp.media[stream_index].direction self._transport.update_direction(new_direction) self._check_hold(new_direction, False) self._hold_request = None def hold(self): with self._lock: if self.on_hold_by_local or self._hold_request == 'hold': return if self.state == 'ESTABLISHED' and self.direction != 'inactive': self.bridge.remove(self) self._hold_request = 'hold' def unhold(self): with self._lock: if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold': return if self.state == 'ESTABLISHED' and self._hold_request == 'hold': self.bridge.add(self) self._hold_request = None if self._hold_request == 'hold' else 'unhold' def deactivate(self): with self._lock: self.bridge.stop() def end(self): with self._lock: if not self._initialized or self._done: return self._done = True self.notification_center.post_notification('MediaStreamWillEnd', sender=self) if self._transport is not None: self._transport.stop() self.notification_center.remove_observer(self, sender=self._transport) self._transport = None self.notification_center.remove_observer(self, sender=self._rtp_transport) self._rtp_transport = None self.state = 'ENDED' self.notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason)) self.session = None def reset(self, stream_index): with self._lock: if self.direction == 'inactive' and not self.on_hold_by_local: new_direction = 'sendrecv' self._transport.update_direction(new_direction) self._check_hold(new_direction, False) # TODO: do a full reset, re-creating the AudioTransport, so that a new offer # would contain all codecs and ICE would be renegotiated -Saul def send_dtmf(self, digit): with self._lock: if self.state != 'ESTABLISHED': raise RuntimeError('AudioStream.send_dtmf() cannot be used in %s state' % self.state) try: self._transport.send_dtmf(digit) except PJSIPError as e: if not e.args[0].endswith('(PJ_ETOOMANY)'): raise # Notification handling # def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPTransportDidFail(self, notification): with self._lock: self.notification_center.remove_observer(self, sender=notification.sender) if self.state == 'ENDED': return self._try_next_rtp_transport(notification.data.reason) def _NH_RTPTransportDidInitialize(self, notification): settings = SIPSimpleSettings() rtp_transport = notification.sender with self._lock: if self.state == 'ENDED': return del self._rtp_args del self._stun_servers try: if hasattr(self, '_incoming_remote_sdp') and hasattr(self, '_incoming_stream_index'): try: audio_transport = AudioTransport(self.mixer, rtp_transport, self._incoming_remote_sdp, self._incoming_stream_index, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) finally: del self._incoming_remote_sdp del self._incoming_stream_index else: audio_transport = AudioTransport(self.mixer, rtp_transport, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) except SIPCoreError as e: self.state = "ENDED" self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=e.args[0])) return self._rtp_transport = rtp_transport self._transport = audio_transport self.notification_center.add_observer(self, sender=audio_transport) self._initialized = True self.state = 'INITIALIZED' self.notification_center.post_notification('MediaStreamDidInitialize', sender=self) def _NH_RTPAudioStreamGotDTMF(self, notification): self.notification_center.post_notification('AudioStreamGotDTMF', sender=self, data=NotificationData(digit=notification.data.digit)) def _NH_RTPAudioTransportDidTimeout(self, notification): self.notification_center.post_notification('RTPStreamDidTimeout', sender=self) def _NH_RTPTransportICENegotiationStateDidChange(self, notification): with self._lock: if self._ice_state != 'NULL' or self.state not in ('INITIALIZING', 'INITIALIZED', 'WAIT_ICE'): return self.notification_center.post_notification('RTPStreamICENegotiationStateDidChange', sender=self, data=notification.data) def _NH_RTPTransportICENegotiationDidSucceed(self, notification): with self._lock: if self.state != 'WAIT_ICE': return self._ice_state = 'IN_USE' self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidSucceed', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) def _NH_RTPTransportICENegotiationDidFail(self, notification): with self._lock: if self.state != 'WAIT_ICE': return self._ice_state = 'FAILED' self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidFail', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) # Private methods # def _init_rtp_transport(self, stun_servers=None): self._rtp_args = dict() self._rtp_args['encryption'] = self._srtp_encryption self._rtp_args['use_ice'] = self._try_ice self._stun_servers = [(None, None)] if stun_servers: self._stun_servers.extend(reversed(stun_servers)) self._try_next_rtp_transport() def _try_next_rtp_transport(self, failure_reason=None): if self._stun_servers: stun_address, stun_port = self._stun_servers.pop() try: rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args) except SIPCoreError as e: self._try_next_rtp_transport(e.args[0]) else: self.notification_center.add_observer(self, sender=rtp_transport) try: rtp_transport.set_INIT() except SIPCoreError as e: self.notification_center.remove_observer(self, sender=rtp_transport) self._try_next_rtp_transport(e.args[0]) else: self.state = 'ENDED' self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=failure_reason)) def _check_hold(self, direction, is_initial): was_on_hold_by_local = self.on_hold_by_local was_on_hold_by_remote = self.on_hold_by_remote was_inactive = self.direction == 'inactive' self.direction = direction inactive = self.direction == 'inactive' self.on_hold_by_local = was_on_hold_by_local if inactive else direction == 'sendonly' self.on_hold_by_remote = 'send' not in direction if (is_initial or was_on_hold_by_local or was_inactive) and not inactive and not self.on_hold_by_local and self._hold_request != 'hold': self.bridge.add(self) if not was_on_hold_by_local and self.on_hold_by_local: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='local', on_hold=True)) if was_on_hold_by_local and not self.on_hold_by_local: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='local', on_hold=False)) if not was_on_hold_by_remote and self.on_hold_by_remote: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='remote', on_hold=True)) if was_on_hold_by_remote and not self.on_hold_by_remote: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='remote', on_hold=False)) diff --git a/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py b/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py index 2a3b3e0..f250137 100644 --- a/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py +++ b/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py @@ -1,283 +1,284 @@ import hashlib from twisted.words.xish import domish from sylk import __version__ as SYLK_VERSION from sylk.applications.xmppgateway.util import html2text CHATSTATES_NS = 'http://jabber.org/protocol/chatstates' RECEIPTS_NS = 'urn:xmpp:receipts' STANZAS_NS = 'urn:ietf:params:xml:ns:xmpp-stanzas' XML_NS = 'http://www.w3.org/XML/1998/namespace' MUC_NS = 'http://jabber.org/protocol/muc' MUC_USER_NS = MUC_NS + '#user' CAPS_NS = 'http://jabber.org/protocol/caps' SYLK_HASH = hashlib.sha1('SylkServer-%s' % SYLK_VERSION).hexdigest() SYLK_CAPS = [] class BaseStanza(object): stanza_type = None # to be defined by subclasses type = None def __init__(self, sender, recipient, id=None): self.sender = sender self.recipient = recipient self.id = id def to_xml_element(self): xml_element = domish.Element((None, self.stanza_type)) xml_element['from'] = unicode(self.sender.uri.as_xmpp_jid()) xml_element['to'] = unicode(self.recipient.uri.as_xmpp_jid()) if self.type: xml_element['type'] = self.type if self.id is not None: xml_element['id'] = self.id return xml_element class ErrorStanza(object): """ Stanza representing an error of another stanza. It's not a base stanza type on its own. """ def __init__(self, stanza_type, sender, recipient, error_type, conditions, id=None): self.stanza_type = stanza_type self.sender = sender self.recipient = recipient self.id = id self.conditions = conditions self.error_type = error_type @classmethod def from_stanza(cls, stanza, error_type, conditions): # In error stanzas sender and recipient are swapped return cls(stanza.stanza_type, stanza.recipient, stanza.sender, error_type, conditions, id=stanza.id) def to_xml_element(self): xml_element = domish.Element((None, self.stanza_type)) xml_element['from'] = unicode(self.sender.uri.as_xmpp_jid()) xml_element['to'] = unicode(self.recipient.uri.as_xmpp_jid()) xml_element['type'] = 'error' if self.id is not None: xml_element['id'] = self.id error_element = domish.Element((None, 'error')) error_element['type'] = self.error_type [error_element.addChild(domish.Element((ns, condition))) for condition, ns in self.conditions] xml_element.addChild(error_element) return xml_element class BaseMessageStanza(BaseStanza): stanza_type = 'message' def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=False): super(BaseMessageStanza, self).__init__(sender, recipient, id=id) self.use_receipt = use_receipt if body is not None and html_body is None: self.body = body self.html_body = None elif body is None and html_body is not None: self.body = html2text(html_body) self.html_body = html_body else: self.body = body self.html_body = html_body def to_xml_element(self): xml_element = super(BaseMessageStanza, self).to_xml_element() if self.id is not None and self.recipient.uri.resource is not None and self.use_receipt: xml_element.addElement('request', defaultUri=RECEIPTS_NS) if self.body is not None: xml_element.addElement('body', content=self.body) if self.html_body is not None: xml_element.addElement('html', content=self.html_body) return xml_element class NormalMessage(BaseMessageStanza): def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=False): if body is None and html_body is None: raise ValueError('either body or html_body need to be set') super(NormalMessage, self).__init__(sender, recipient, body, html_body, id, use_receipt) class ChatMessage(BaseMessageStanza): type = 'chat' def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=True): if body is None and html_body is None: raise ValueError('either body or html_body need to be set') super(ChatMessage, self).__init__(sender, recipient, body, html_body, id, use_receipt) def to_xml_element(self): xml_element = super(ChatMessage, self).to_xml_element() xml_element.addElement('active', defaultUri=CHATSTATES_NS) return xml_element class ChatComposingIndication(BaseMessageStanza): type = 'chat' def __init__(self, sender, recipient, state, id=None, use_receipt=False): super(ChatComposingIndication, self).__init__(sender, recipient, id=id, use_receipt=use_receipt) self.state = state def to_xml_element(self): xml_element = super(ChatComposingIndication, self).to_xml_element() xml_element.addElement(self.state, defaultUri=CHATSTATES_NS) return xml_element class GroupChatMessage(BaseMessageStanza): type = 'groupchat' def __init__(self, sender, recipient, body=None, html_body=None, id=None): # TODO: add timestamp if body is None and html_body is None: raise ValueError('either body or html_body need to be set') super(GroupChatMessage, self).__init__(sender, recipient, body, html_body, id, False) class MessageReceipt(BaseMessageStanza): def __init__(self, sender, recipient, receipt_id, id=None): super(MessageReceipt, self).__init__(sender, recipient, id=id, use_receipt=False) self.receipt_id = receipt_id def to_xml_element(self): xml_element = super(MessageReceipt, self).to_xml_element() receipt_element = domish.Element((RECEIPTS_NS, 'received')) receipt_element['id'] = self.receipt_id xml_element.addChild(receipt_element) return xml_element class BasePresenceStanza(BaseStanza): stanza_type = 'presence' class IncomingInvitationMessage(BaseMessageStanza): def __init__(self, sender, recipient, invited_user, reason=None, id=None): super(IncomingInvitationMessage, self).__init__(sender, recipient, body=None, html_body=None, id=id, use_receipt=False) self.invited_user = invited_user self.reason = reason def to_xml_element(self): xml_element = super(IncomingInvitationMessage, self).to_xml_element() child = xml_element.addElement((MUC_USER_NS, 'x')) child.addElement('invite') child.invite['to'] = unicode(self.invited_user.uri.as_xmpp_jid()) if self.reason: child.invite.addElement('reason', content=self.reason) return xml_element class OutgoingInvitationMessage(BaseMessageStanza): def __init__(self, sender, recipient, originator, reason=None, id=None): super(OutgoingInvitationMessage, self).__init__(sender, recipient, body=None, html_body=None, id=id, use_receipt=False) self.originator = originator self.reason = reason def to_xml_element(self): xml_element = super(OutgoingInvitationMessage, self).to_xml_element() child = xml_element.addElement((MUC_USER_NS, 'x')) child.addElement('invite') child.invite['from'] = unicode(self.originator.uri.as_xmpp_jid()) if self.reason: child.invite.addElement('reason', content=self.reason) return xml_element class AvailabilityPresence(BasePresenceStanza): def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None): super(AvailabilityPresence, self).__init__(sender, recipient, id=id) self.available = available self.show = show self.priority = priority self.statuses = statuses or {} - def _get_available(self): + @property + def available(self): return self.__dict__['available'] - def _set_available(self, available): + + @available.setter + def available(self, available): if available: self.type = None else: self.type = 'unavailable' self.__dict__['available'] = available - available = property(_get_available, _set_available) - del _get_available, _set_available @property def status(self): status = self.statuses.get(None) if status is None: try: status = next(self.statuses.itervalues()) except StopIteration: pass return status def to_xml_element(self): xml_element = super(BasePresenceStanza, self).to_xml_element() if self.available: if self.show is not None: xml_element.addElement('show', content=self.show) if self.priority != 0: xml_element.addElement('priority', content=unicode(self.priority)) caps = xml_element.addElement('c', defaultUri=CAPS_NS) caps['node'] = 'http://sylkserver.com' caps['hash'] = 'sha-1' caps['ver'] = SYLK_HASH if SYLK_CAPS: caps['ext'] = ' '.join(SYLK_CAPS) for lang, text in self.statuses.iteritems(): status = xml_element.addElement('status', content=text) if lang: status[(XML_NS, 'lang')] = lang return xml_element class SubscriptionPresence(BasePresenceStanza): def __init__(self, sender, recipient, type, id=None): super(SubscriptionPresence, self).__init__(sender, recipient, id=id) self.type = type class ProbePresence(BasePresenceStanza): type = 'probe' class MUCAvailabilityPresence(AvailabilityPresence): def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None, affiliation=None, jid=None, role=None, muc_statuses=None): super(MUCAvailabilityPresence, self).__init__(sender, recipient, available, show, statuses, priority, id) self.affiliation = affiliation or 'member' self.role = role or 'participant' self.muc_statuses = muc_statuses or [] self.jid = jid def to_xml_element(self): xml_element = super(MUCAvailabilityPresence, self).to_xml_element() muc = xml_element.addElement('x', defaultUri=MUC_USER_NS) item = muc.addElement('item') if self.affiliation: item['affiliation'] = self.affiliation if self.role: item['role'] = self.role if self.jid: item['jid'] = unicode(self.jid.uri.as_xmpp_jid()) for code in self.muc_statuses: status = muc.addElement('status') status['code'] = code return xml_element class MUCErrorPresence(ErrorStanza): def to_xml_element(self): xml_element = super(MUCErrorPresence, self).to_xml_element() xml_element.addElement('x', defaultUri=MUC_USER_NS) return xml_element diff --git a/sylk/applications/xmppgateway/xmpp/subscription.py b/sylk/applications/xmppgateway/xmpp/subscription.py index ac70b8c..55f19c0 100644 --- a/sylk/applications/xmppgateway/xmpp/subscription.py +++ b/sylk/applications/xmppgateway/xmpp/subscription.py @@ -1,199 +1,201 @@ from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from application.python.types import Singleton from eventlib import coros, proc from zope.interface import implements from sylk.applications.xmppgateway.xmpp.stanzas import SubscriptionPresence, ProbePresence, AvailabilityPresence __all__ = 'XMPPSubscription', 'XMPPIncomingSubscription', 'XMPPSubscriptionManager' class XMPPSubscription(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() - def _set_state(self, new_state): + @property + def state(self): + return self.__dict__['state'] + + @state.setter + def state(self, new_state): prev_state = self.__dict__.get('state', None) self.__dict__['state'] = new_state if prev_state != new_state: NotificationCenter().post_notification('XMPPSubscriptionChangedState', sender=self, data=NotificationData(prev_state=prev_state, state=new_state)) - def _get_state(self): - return self.__dict__['state'] - state = property(_get_state, _set_state) - del _get_state, _set_state def start(self): NotificationCenter().post_notification('XMPPSubscriptionDidStart', sender=self) self._proc = proc.spawn(self._run) self.subscribe() def end(self): if self.state == 'terminated': return self._proc.kill() self._proc = None NotificationCenter().post_notification('XMPPSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def subscribe(self): self.state = 'subscribe_sent' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribe') self.xmpp_manager.send_stanza(stanza) # If we are already subscribed we may not receive an answer, send a probe just in case self._send_probe() def unsubscribe(self): self.state = 'unsubscribe_sent' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribe') self.xmpp_manager.send_stanza(stanza) def _send_probe(self): self.state = 'subscribe_sent' stanza = ProbePresence(self.local_identity, self.remote_identity) self.xmpp_manager.send_stanza(stanza) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, AvailabilityPresence): if self.state == 'subscribe_sent': self.state = 'active' notification_center.post_notification('XMPPSubscriptionGotNotify', sender=self, data=NotificationData(presence=item)) elif isinstance(item, SubscriptionPresence): if self.state == 'subscribe_sent' and item.type == 'subscribed': self.state = 'active' elif item.type == 'unsubscribed': prev_state = self.state self.state = 'terminated' if prev_state in ('active', 'unsubscribe_sent'): notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self) else: notification_center.post_notification('XMPPSubscriptionDidFail', sender=self) break self._proc = None class XMPPIncomingSubscription(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() - def _set_state(self, new_state): + @property + def state(self): + return self.__dict__['state'] + + @state.setter + def state(self, new_state): prev_state = self.__dict__.get('state', None) self.__dict__['state'] = new_state if prev_state != new_state: NotificationCenter().post_notification('XMPPIncomingSubscriptionChangedState', sender=self, data=NotificationData(prev_state=prev_state, state=new_state)) - def _get_state(self): - return self.__dict__['state'] - state = property(_get_state, _set_state) - del _get_state, _set_state def start(self): NotificationCenter().post_notification('XMPPIncomingSubscriptionDidStart', sender=self) self._proc = proc.spawn(self._run) def end(self): if self.state == 'terminated': return self.state = 'terminated' self._proc.kill() self._proc = None NotificationCenter().post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) def accept(self): self.state = 'active' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribed') self.xmpp_manager.send_stanza(stanza) def reject(self): self.state = 'terminating' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribed') self.xmpp_manager.send_stanza(stanza) self.end() def send_presence(self, stanza): self.xmpp_manager.send_stanza(stanza) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, SubscriptionPresence): if item.type == 'subscribe': notification_center.post_notification('XMPPIncomingSubscriptionGotSubscribe', sender=self) elif item.type == 'unsubscribe': self.state = 'terminated' notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingSubscriptionGotUnsubscribe', sender=self) notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) break elif isinstance(item, ProbePresence): notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingSubscriptionGotProbe', sender=self) self._proc = None class XMPPSubscriptionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.incoming_subscriptions = {} self.outgoing_subscriptions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPSubscriptionDidStart') notification_center.add_observer(self, name='XMPPSubscriptionDidEnd') notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidStart') notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPSubscriptionDidStart') notification_center.remove_observer(self, name='XMPPSubscriptionDidEnd') notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidStart') notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPSubscriptionDidStart(self, notification): subscription = notification.sender self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription def _NH_XMPPSubscriptionDidEnd(self, notification): subscription = notification.sender del self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] def _NH_XMPPIncomingSubscriptionDidStart(self, notification): subscription = notification.sender self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription def _NH_XMPPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender del self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] diff --git a/sylk/bonjour.py b/sylk/bonjour.py index 354e184..e63c688 100644 --- a/sylk/bonjour.py +++ b/sylk/bonjour.py @@ -1,258 +1,257 @@ import uuid from application import log from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib import api, coros, proc from eventlib.green import select from sipsimple.account.bonjour import _bonjour, BonjourPresenceState, BonjourRegistrationFile from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.threading import call_in_twisted_thread, run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from threading import Lock from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount class RestartSelect(Exception): pass class BonjourService(object): implements(IObserver) def __init__(self, service='sipfocus', name='SylkServer', uri_user=None, is_focus=True): self.account = DefaultAccount() self.service = service self.name = name self.uri_user = uri_user self.is_focus = is_focus self.id = str(uuid.uuid4()) self._stopped = True self._files = [] self._command_channel = coros.queue() self._select_proc = None self._register_timer = None self._update_timer = None self._lock = Lock() self.__dict__['presence_state'] = None @run_in_green_thread def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='NetworkConditionsDidChange') self._select_proc = proc.spawn(self._process_files) proc.spawn(self._handle_commands) self._activate() @run_in_green_thread def stop(self): self._deactivate() notification_center = NotificationCenter() notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._select_proc.kill() self._command_channel.send_exception(api.GreenletExit) def _activate(self): self._stopped = False self._command_channel.send(Command('register')) def _deactivate(self): command = Command('stop') self._command_channel.send(command) command.wait() self._stopped = True def restart_registration(self): self._command_channel.send(Command('unregister')) self._command_channel.send(Command('register')) def update_registrations(self): self._command_channel.send(Command('update_registrations')) - def _get_presence_state(self): + @property + def presence_state(self): return self.__dict__['presence_state'] - def _set_presence_state(self, state): + @presence_state.setter + def presence_state(self, state): if state is not None and not isinstance(state, BonjourPresenceState): raise ValueError("state must be a %s instance or None" % BonjourPresenceState.__name__) with self._lock: old_state = self.__dict__['presence_state'] self.__dict__['presence_state'] = state if state != old_state: call_in_twisted_thread(self.update_registrations) - presence_state = property(_get_presence_state, _set_presence_state) - del _get_presence_state, _set_presence_state - def _register_cb(self, file, flags, error_code, name, regtype, domain): notification_center = NotificationCenter() file = BonjourRegistrationFile.find_by_file(file) if error_code == _bonjour.kDNSServiceErr_NoError: notification_center.post_notification('BonjourServiceRegistrationDidSucceed', sender=self, data=NotificationData(name=name, transport=file.transport)) else: error = _bonjour.BonjourError(error_code) notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, data=NotificationData(reason=str(error), transport=file.transport)) self._files.remove(file) self._select_proc.kill(RestartSelect) file.close() if self._register_timer is None: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register')) def _process_files(self): while True: try: ready = select.select([f for f in self._files if not f.active and not f.closed], [], [])[0] except RestartSelect: continue else: for file in ready: file.active = True self._command_channel.send(Command('process_results', files=[f for f in ready if not f.closed])) def _handle_commands(self): while True: command = self._command_channel.wait() if not self._stopped: handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_unregister(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile)): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() def _CH_register(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None supported_transports = set(transport for transport in settings.sip.transport_list if transport!='tls' or self.account.tls.certificate is not None) registered_transports = set(file.transport for file in self._files if isinstance(file, BonjourRegistrationFile)) missing_transports = supported_transports - registered_transports added_transports = set() for transport in missing_transports: notification_center.post_notification('BonjourServiceWillRegister', sender=self, data=NotificationData(transport=transport)) try: contact_uri = self.account.contact[transport] contact_uri.user = self.uri_user if self.is_focus: contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri), instance_id=self.id) state = self.presence_state if state is not None: txtdata['state'] = state.state txtdata['note'] = state.note.encode('utf-8') file = _bonjour.DNSServiceRegister(name=str(contact_uri), regtype="_%s._%s" % (self.service, transport if transport == 'udp' else 'tcp'), port=contact_uri.port, callBack=self._register_cb, txtRecord=_bonjour.TXTRecord(items=txtdata)) except (_bonjour.BonjourError, KeyError) as e: notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, data=NotificationData(reason=str(e), transport=transport)) else: self._files.append(BonjourRegistrationFile(file, transport)) added_transports.add(transport) if added_transports: self._select_proc.kill(RestartSelect) if added_transports != missing_transports: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register', command.event)) else: command.signal() def _CH_update_registrations(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None available_transports = settings.sip.transport_list old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile) and f.transport not in available_transports): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() update_failure = False for file in (f for f in self._files if isinstance(f, BonjourRegistrationFile)): try: contact_uri = self.account.contact[file.transport] contact_uri.user = self.uri_user if self.is_focus: contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri), instance_id=self.id) state = self.presence_state if state is not None: txtdata['state'] = state.state txtdata['note'] = state.note.encode('utf-8') _bonjour.DNSServiceUpdateRecord(file.file, None, flags=0, rdata=_bonjour.TXTRecord(items=txtdata), ttl=0) except (_bonjour.BonjourError, KeyError) as e: notification_center.post_notification('BonjourServiceRegistrationUpdateDidFail', sender=self, data=NotificationData(reason=str(e), transport=file.transport)) update_failure = True self._command_channel.send(Command('register')) if update_failure: self._update_timer = reactor.callLater(1, self._command_channel.send, Command('update_registrations', command.event)) else: command.signal() def _CH_process_results(self, command): for file in (f for f in command.files if not f.closed): try: _bonjour.DNSServiceProcessResult(file.file) except: # Should we close the file? The documentation doesn't say anything about this. -Luci log.exception() for file in command.files: file.active = False self._files = [f for f in self._files if not f.closed] self._select_proc.kill(RestartSelect) def _CH_stop(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None old_files = self._files self._files = [] self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_NetworkConditionsDidChange(self, notification): if self._files: self.restart_registration()