diff --git a/sylk/applications/__init__.py b/sylk/applications/__init__.py index 6449958..611c4d4 100644 --- a/sylk/applications/__init__.py +++ b/sylk/applications/__init__.py @@ -1,319 +1,319 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # __all__ = ['ISylkApplication', 'ApplicationRegistry', 'SylkApplication', 'IncomingRequestHandler', 'ApplicationLogger'] import abc import os import socket import struct import sys from application import log from application.configuration.datatypes import NetworkRange from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.types import Singleton from itertools import chain from sipsimple.threading import run_in_twisted_thread from zope.interface import implements from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig SYLK_APP_HEADER = 'X-Sylk-App' class ApplicationRegistry(object): __metaclass__ = Singleton def __init__(self): self.applications = [] def __iter__(self): return iter(self.applications) def find_application(self, name): try: return next(app for app in self.applications if app.__appname__ == name) except StopIteration: return None def add(self, app): if app not in self.applications: self.applications.append(app) class ApplicationName(object): def __get__(self, obj, objtype): name = objtype.__name__ return name[:-11].lower() if name.endswith('Application') else name.lower() class SylkApplicationMeta(abc.ABCMeta, Singleton): """Metaclass for defining SylkServer applications: a Singleton that also adds them to the application registry""" def __init__(cls, name, bases, dic): super(SylkApplicationMeta, cls).__init__(name, bases, dic) if name != 'SylkApplication': ApplicationRegistry().add(cls) class SylkApplication(object): """Base class for all SylkServer applications""" __metaclass__ = SylkApplicationMeta __appname__ = ApplicationName() @abc.abstractmethod def start(self): pass @abc.abstractmethod def stop(self): pass @abc.abstractmethod def incoming_session(self, session): pass @abc.abstractmethod def incoming_subscription(self, subscribe_request, data): pass @abc.abstractmethod def incoming_referral(self, refer_request, data): pass @abc.abstractmethod def incoming_message(self, message_request, data): pass def load_builtin_applications(): toplevel = os.path.dirname(__file__) app_list = [item for item in os.listdir(toplevel) if os.path.isdir(os.path.join(toplevel, item)) and '__init__.py' in os.listdir(os.path.join(toplevel, item))] for module in ['sylk.applications.%s' % item for item in set(app_list).difference(ServerConfig.disabled_applications)]: __import__(module) def load_extra_applications(): if ServerConfig.extra_applications_dir: toplevel = os.path.realpath(os.path.abspath(ServerConfig.extra_applications_dir.normalized)) if os.path.isdir(toplevel): app_list = [item for item in os.listdir(toplevel) if os.path.isdir(os.path.join(toplevel, item)) and '__init__.py' in os.listdir(os.path.join(toplevel, item))] sys.path.append(toplevel) for module in (item for item in set(app_list).difference(ServerConfig.disabled_applications)): __import__(module) def load_applications(): load_builtin_applications() load_extra_applications() [app() for app in ApplicationRegistry()] class ApplicationNotLoadedError(Exception): pass class IncomingRequestHandler(object): """ Handle incoming requests and match them to applications. """ __metaclass__ = Singleton implements(IObserver) def __init__(self): load_applications() registry = ApplicationRegistry() log.msg('Loaded applications: %s' % ', '.join([app.__appname__ for app in registry])) default_application = registry.find_application(ServerConfig.default_application) if default_application is None: log.warning('Default application "%s" does not exist, falling back to "conference"' % ServerConfig.default_application) ServerConfig.default_application = 'conference' else: log.msg('Default application: %s' % ServerConfig.default_application) self.application_map = dict((item.split(':')) for item in ServerConfig.application_map) self.authorization_handler = AuthorizationHandler() def start(self): [app().start() for app in ApplicationRegistry()] self.authorization_handler.start() notification_center = NotificationCenter() notification_center.add_observer(self, name='SIPSessionNewIncoming') notification_center.add_observer(self, name='SIPIncomingSubscriptionGotSubscribe') notification_center.add_observer(self, name='SIPIncomingReferralGotRefer') notification_center.add_observer(self, name='SIPIncomingRequestGotRequest') def stop(self): self.authorization_handler.stop() notification_center = NotificationCenter() notification_center.remove_observer(self, name='SIPSessionNewIncoming') notification_center.remove_observer(self, name='SIPIncomingSubscriptionGotSubscribe') notification_center.remove_observer(self, name='SIPIncomingReferralGotRefer') notification_center.remove_observer(self, name='SIPIncomingRequestGotRequest') [app().stop() for app in ApplicationRegistry()] def get_application(self, ruri, headers): if SYLK_APP_HEADER in headers: application = headers[SYLK_APP_HEADER].body.strip() else: application = ServerConfig.default_application if self.application_map: prefixes = ("%s@%s" % (ruri.user, ruri.host), ruri.host, ruri.user) for prefix in prefixes: if prefix in self.application_map: application = self.application_map[prefix] break try: app = next(app for app in ApplicationRegistry() if app.__appname__ == application) except StopIteration: log.error('Application %s is not loaded' % application) raise ApplicationNotLoadedError else: return app() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionNewIncoming(self, notification): session = notification.sender try: self.authorization_handler.authorize_source(session.peer_address.ip) except UnauthorizedRequest: session.reject(403) return try: - app = self.get_application(session._invitation.request_uri, notification.data.headers) + app = self.get_application(session.request_uri, notification.data.headers) except ApplicationNotLoadedError: session.reject(404) else: app.incoming_session(session) def _NH_SIPIncomingSubscriptionGotSubscribe(self, notification): subscribe_request = notification.sender try: self.authorization_handler.authorize_source(subscribe_request.peer_address.ip) except UnauthorizedRequest: subscribe_request.reject(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: subscribe_request.reject(404) else: app.incoming_subscription(subscribe_request, notification.data) def _NH_SIPIncomingReferralGotRefer(self, notification): refer_request = notification.sender try: self.authorization_handler.authorize_source(refer_request.peer_address.ip) except UnauthorizedRequest: refer_request.reject(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: refer_request.reject(404) else: app.incoming_referral(refer_request, notification.data) def _NH_SIPIncomingRequestGotRequest(self, notification): request = notification.sender if notification.data.method != 'MESSAGE': request.answer(405) return try: self.authorization_handler.authorize_source(request.peer_address.ip) except UnauthorizedRequest: request.answer(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: request.answer(404) else: app.incoming_message(request, notification.data) class UnauthorizedRequest(Exception): pass class AuthorizationHandler(object): implements(IObserver) def __init__(self): self.state = None self.trusted_peers = SIPConfig.trusted_peers self.thor_nodes = [] @property def trusted_parties(self): if ThorNodeConfig.enabled: return self.thor_nodes return self.trusted_peers def start(self): NotificationCenter().add_observer(self, name='ThorNetworkGotUpdate') self.state = 'started' def stop(self): self.state = 'stopped' NotificationCenter().remove_observer(self, name='ThorNetworkGotUpdate') def authorize_source(self, ip_address): if self.state != 'started': raise UnauthorizedRequest for range in self.trusted_parties: if struct.unpack('!L', socket.inet_aton(ip_address))[0] & range[1] == range[0]: return True raise UnauthorizedRequest @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_ThorNetworkGotUpdate(self, notification): thor_nodes = [] for node in chain(*(n.nodes for n in notification.data.networks.values())): thor_nodes.append(NetworkRange(node)) self.thor_nodes = thor_nodes class ApplicationLogger(object): __metaclass__ = Singleton @classmethod def for_package(cls, package): return cls(package.split('.')[-1]) def __init__(self, prefix): self.prefix = '[%s] ' % prefix def info(self, message, **context): log.info(self.prefix+message, **context) def warning(self, message, **context): log.warning(self.prefix+message, **context) def debug(self, message, **context): log.debug(self.prefix+message, **context) def error(self, message, **context): log.error(self.prefix+message, **context) def critical(self, message, **context): log.critical(self.prefix+message, **context) def exception(self, message=None, **context): if message is not None: message = self.prefix+message log.exception(message, **context) # Some aliases that are commonly used msg = info warn = warning fatal = critical err = exception diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py index 850057e..f3f21ab 100644 --- a/sylk/applications/conference/__init__.py +++ b/sylk/applications/conference/__init__.py @@ -1,437 +1,437 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # import mimetypes import os import re import shutil from application.notification import IObserver, NotificationCenter from application.python import Null from gnutls.interfaces.twisted import X509Credentials from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader, SubjectHeader from sipsimple.lookup import DNSLookup from sipsimple.streams import AudioStream from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications import SylkApplication from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.logger import log from sylk.applications.conference.room import Room from sylk.applications.conference.web import ScreenSharingWebServer from sylk.bonjour import BonjourServices from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.session import Session, IllegalStateError from sylk.streams import ChatStream from sylk.tls import Certificate, PrivateKey class ACLValidationError(Exception): pass class RoomNotFoundError(Exception): pass class ConferenceApplication(SylkApplication): implements(IObserver) def __init__(self): self._rooms = {} self.invited_participants_map = {} self.bonjour_focus_service = Null self.bonjour_room_service = Null self.screen_sharing_web_server = None def start(self): # cleanup old files for path in (ConferenceConfig.file_transfer_dir, ConferenceConfig.screen_sharing_dir): try: shutil.rmtree(path) except EnvironmentError: pass if ServerConfig.enable_bonjour and ServerConfig.default_application == 'conference': self.bonjour_focus_service = BonjourServices(service='sipfocus') self.bonjour_focus_service.start() log.msg("Bonjour publication started for service 'sipfocus'") self.bonjour_room_service = BonjourServices(service='sipuri', name='Conference Room', uri_user='conference') self.bonjour_room_service.start() self.bonjour_room_service.presence_state = BonjourPresenceState('available', u'No participants') log.msg("Bonjour publication started for service 'sipuri'") self.screen_sharing_web_server = ScreenSharingWebServer(ConferenceConfig.screen_sharing_dir) if ConferenceConfig.screen_sharing_use_https and ConferenceConfig.screen_sharing_certificate is not None: cert = Certificate(ConferenceConfig.screen_sharing_certificate.normalized) key = PrivateKey(ConferenceConfig.screen_sharing_certificate.normalized) credentials = X509Credentials(cert, key) else: credentials = None self.screen_sharing_web_server.start(ConferenceConfig.screen_sharing_ip, ConferenceConfig.screen_sharing_port, credentials) listen_address = self.screen_sharing_web_server.listener.getHost() log.msg("ScreenSharing listener started on %s:%d" % (listen_address.host, listen_address.port)) def stop(self): self.bonjour_focus_service.stop() self.bonjour_room_service.stop() self.screen_sharing_web_server.stop() def get_room(self, uri, create=False): room_uri = '%s@%s' % (uri.user, uri.host) try: room = self._rooms[room_uri] except KeyError: if create: room = Room(room_uri) self._rooms[room_uri] = room return room else: raise RoomNotFoundError else: return room def remove_room(self, uri): room_uri = '%s@%s' % (uri.user, uri.host) self._rooms.pop(room_uri, None) def validate_acl(self, room_uri, from_uri): room_uri = '%s@%s' % (room_uri.user, room_uri.host) cfg = get_room_config(room_uri) if cfg.access_policy == 'allow,deny': if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri): return raise ACLValidationError else: if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri): raise ACLValidationError def incoming_session(self, session): log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer'] if not audio_streams and not chat_streams and not transfer_streams: log.msg(u'Session rejected: invalid media, only RTP audio and MSRP chat are supported') session.reject(488) return try: - self.validate_acl(session._invitation.request_uri, session.remote_identity.uri) + self.validate_acl(session.request_uri, session.remote_identity.uri) except ACLValidationError: log.msg(u'Session rejected: unauthorized by access list') session.reject(403) return # Check if requested files belong to this room for stream in (stream for stream in transfer_streams if stream.direction == 'sendonly'): try: - room = self.get_room(session._invitation.request_uri) + room = self.get_room(session.request_uri) except RoomNotFoundError: log.msg(u'Session rejected: room not found') session.reject(404) return try: file = next(file for file in room.files if file.hash == stream.file_selector.hash) except StopIteration: log.msg(u'Session rejected: requested file not found') session.reject(404) return filename = os.path.basename(file.name) for dirpath, dirnames, filenames in os.walk(os.path.join(ConferenceConfig.file_transfer_dir, room.uri)): if filename in filenames: path = os.path.join(dirpath, filename) stream.file_selector.fd = open(path, 'r') if stream.file_selector.size is None: stream.file_selector.size = os.fstat(stream.file_selector.fd.fileno()).st_size if stream.file_selector.type is None: mime_type, encoding = mimetypes.guess_type(filename) if encoding is not None: type = 'application/x-%s' % encoding elif mime_type is not None: type = mime_type else: type = 'application/octet-stream' stream.file_selector.type = type break else: # File got removed from the filesystem log.msg(u'Session rejected: requested file removed from the filesystem') session.reject(404) return NotificationCenter().add_observer(self, sender=session) if audio_streams: session.send_ring_indication() streams = [streams[0] for streams in (audio_streams, chat_streams, transfer_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return if subscribe_request.event != 'conference': log.msg(u'Subscription rejected: only conference event is supported') subscribe_request.reject(489) return try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: try: self.validate_acl(to_header.uri, from_header.uri) except ACLValidationError: # Check if we need to skip the ACL because this was an invited participant if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})): log.msg(u'Subscription rejected: unauthorized by access list') subscribe_request.reject(403) return try: room = self.get_room(data.request_uri) except RoomNotFoundError: try: room = self.get_room(to_header.uri) except RoomNotFoundError: log.msg(u'Subscription rejected: room not yet created') subscribe_request.reject(480) return if not room.started: log.msg(u'Subscription rejected: room not started yet') subscribe_request.reject(480) else: room.handle_incoming_subscription(subscribe_request, data) def incoming_referral(self, refer_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) refer_to_header = data.headers.get('Refer-To', Null) if Null in (from_header, to_header, refer_to_header): refer_request.reject(400) return log.msg(u'Room %s - join request from %s to %s' % ('%s@%s' % (to_header.uri.user, to_header.uri.host), from_header.uri, refer_to_header.uri)) try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: log.msg(u'Room %s - invite participant request rejected: unauthorized by access list' % data.request_uri) refer_request.reject(403) return referral_handler = IncomingReferralHandler(refer_request, data) referral_handler.start() def incoming_message(self, message_request, data): log.msg(u'SIP MESSAGE is not supported, use MSRP media instead') message_request.answer(405) def accept_session(self, session, streams): if session.state == 'incoming': try: session.accept(streams, is_focus=True) except IllegalStateError: pass def add_participant(self, session, room_uri): # Keep track of the invited participants, we must skip ACL policy # for SUBSCRIBE requests room_uri_str = '%s@%s' % (room_uri.user, room_uri.host) log.msg(u'Room %s - outgoing session to %s started' % (room_uri_str, session.remote_identity.uri)) d = self.invited_participants_map.setdefault(room_uri_str, {}) d.setdefault(str(session.remote_identity.uri), 0) d[str(session.remote_identity.uri)] += 1 NotificationCenter().add_observer(self, sender=session) room = self.get_room(room_uri, True) room.start() room.add_session(session) def remove_participant(self, participant_uri, room_uri): try: room = self.get_room(room_uri) except RoomNotFoundError: pass else: log.msg('Room %s - %s removed from conference' % (room_uri, participant_uri)) room.terminate_sessions(participant_uri) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender - room = self.get_room(session._invitation.request_uri, True) # FIXME + room = self.get_room(session.request_uri, True) room.start() room.add_session(session) @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) if session.direction == 'incoming': - room_uri = session._invitation.request_uri # FIXME + room_uri = session.request_uri else: # Clear invited participants mapping room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host) d = self.invited_participants_map[room_uri_str] d[str(session.remote_identity.uri)] -= 1 if d[str(session.remote_identity.uri)] == 0: del d[str(session.remote_identity.uri)] room_uri = session.local_identity.uri # We could get this notifiction even if we didn't get SIPSessionDidStart try: room = self.get_room(room_uri) except RoomNotFoundError: return if session in room.sessions: room.remove_session(session) if not room.stopping and room.empty: self.remove_room(room_uri) room.stop() def _NH_SIPSessionDidFail(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) log.msg(u'Session from %s failed: %s' % (session.remote_identity.uri, notification.data.reason)) class IncomingReferralHandler(object): implements(IObserver) def __init__(self, refer_request, data): self._refer_request = refer_request self._refer_headers = data.headers self.room_uri = data.request_uri self.room_uri_str = '%s@%s' % (self.room_uri.user, self.room_uri.host) self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri) self.method = data.headers.get('Refer-To').parameters.get('method', 'INVITE').upper() self.session = None self.streams = [] def start(self): if not self.refer_to_uri.startswith(('sip:', 'sips:')): self.refer_to_uri = 'sip:%s' % self.refer_to_uri try: self.refer_to_uri = SIPURI.parse(self.refer_to_uri) except SIPCoreError: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.reject(488) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._refer_request) if self.method == 'INVITE': self._refer_request.accept() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.refer_to_uri lookup = DNSLookup() notification_center.add_observer(self, sender=lookup) lookup.lookup_sip_proxy(uri, settings.sip.transport_list) elif self.method == 'BYE': log.msg('Room %s - %s removed %s from the room' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self._refer_request.accept() conference_application = ConferenceApplication() conference_application.remove_participant(self.refer_to_uri, self.room_uri) self._refer_request.end(200) else: self._refer_request.reject(488) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) account = DefaultAccount() conference_application = ConferenceApplication() try: room = conference_application.get_room(self.room_uri) except RoomNotFoundError: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return else: active_media = room.active_media if not active_media: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return if 'audio' in active_media: self.streams.append(AudioStream()) if 'chat' in active_media: self.streams.append(ChatStream()) self.session = Session(account) notification_center.add_observer(self, sender=self.session) original_from_header = self._refer_headers.get('From') if original_from_header.display_name: original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host) else: original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host) from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call') to_header = ToHeader(self.refer_to_uri) transport = notification.data.result[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)) extra_headers = [] if self._refer_headers.get('Referred-By', None) is not None: extra_headers.append(Header.new(self._refer_headers.get('Referred-By'))) else: extra_headers.append(Header('Referred-By', str(original_from_header.uri))) if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(original_from_header.uri))) extra_headers.append(SubjectHeader(u'Join conference request from %s' % original_identity)) route = notification.data.result[0] self.session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=self.streams, is_focus=True, extra_headers=extra_headers) def _NH_DNSLookupDidFail(self, notification): notification.center.remove_observer(self, sender=notification.sender) def _NH_SIPSessionGotRingIndication(self, notification): if self._refer_request is not None: self._refer_request.send_notify(180) def _NH_SIPSessionGotProvisionalResponse(self, notification): if self._refer_request is not None: self._refer_request.send_notify(notification.data.code, notification.data.reason) def _NH_SIPSessionDidStart(self, notification): notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) conference_application = ConferenceApplication() conference_application.add_participant(self.session, self.room_uri) log.msg('Room %s - %s added %s' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self.session = None self.streams = [] def _NH_SIPSessionDidFail(self, notification): log.msg('Room %s - failed to add %s: %s' % (self.room_uri_str, self.refer_to_uri, notification.data.reason)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(notification.data.code or 500, notification.data.reason or notification.data.code) self.session = None self.streams = [] def _NH_SIPSessionDidEnd(self, notification): # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) self.session = None self.streams = [] def _NH_SIPIncomingReferralDidEnd(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._refer_request = None diff --git a/sylk/applications/ircconference/__init__.py b/sylk/applications/ircconference/__init__.py index 59ad8ea..941c8fa 100644 --- a/sylk/applications/ircconference/__init__.py +++ b/sylk/applications/ircconference/__init__.py @@ -1,100 +1,100 @@ # Copyright (C) 2011 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter from application.python import Null from twisted.internet import reactor from zope.interface import implements from sylk.applications import SylkApplication from sylk.applications.ircconference.logger import log from sylk.applications.ircconference.room import IRCRoom class IRCConferenceApplication(SylkApplication): implements(IObserver) def __init__(self): self.rooms = set() self.pending_sessions = [] def start(self): pass def stop(self): pass def incoming_session(self, session): log.msg('New incoming session from %s' % session.remote_identity.uri) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject(488) return self.pending_sessions.append(session) NotificationCenter().add_observer(self, sender=session) if audio_streams: session.send_ring_indication() if chat_streams: for stream in chat_streams: # Disable chatroom capabilities other than nickname stream.chatroom_capabilities = ['nickname'] streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): to_header = data.headers.get('To', Null) if to_header is Null: subscribe_request.reject(400) return room = IRCRoom.get_room(data.request_uri) if not room.started: room = IRCRoom.get_room(to_header.uri) if not room.started: subscribe_request.reject(480) return room.handle_incoming_subscription(subscribe_request, data) def incoming_referral(self, request, data): request.reject(405) def incoming_message(self, request, data): request.reject(405) def accept_session(self, session, streams): if session in self.pending_sessions: session.accept(streams, is_focus=True) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender self.pending_sessions.remove(session) - room = IRCRoom.get_room(session._invitation.request_uri) # FIXME + room = IRCRoom.get_room(session.request_uri) room.start() room.add_session(session) self.rooms.add(room) def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.msg('Session from %s ended' % session.remote_identity.uri) NotificationCenter().remove_observer(self, sender=session) - room = IRCRoom.get_room(session._invitation.request_uri) # FIXME + room = IRCRoom.get_room(session.request_uri) if session in room.sessions: # We could get this notifiction even if we didn't get SIPSessionDidStart room.remove_session(session) if room.empty: room.stop() try: self.rooms.remove(room) except KeyError: pass def _NH_SIPSessionDidFail(self, notification): session = notification.sender self.pending_sessions.remove(session) log.msg('Session from %s failed' % session.remote_identity.uri) diff --git a/sylk/applications/playback/__init__.py b/sylk/applications/playback/__init__.py index 59d13c0..fe5d116 100644 --- a/sylk/applications/playback/__init__.py +++ b/sylk/applications/playback/__init__.py @@ -1,129 +1,127 @@ # Copyright (C) 2013 AG Projects. See LICENSE for details # from application.python import Null from application.notification import IObserver, NotificationCenter from eventlib import proc from sipsimple.audio import WavePlayer, WavePlayerError from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.applications import SylkApplication, ApplicationLogger from sylk.applications.playback.configuration import get_config log = ApplicationLogger.for_package(__package__) class PlaybackApplication(SylkApplication): implements(IObserver) def start(self): pass def stop(self): pass def incoming_session(self, session): log.msg('Incoming session %s from %s to %s' % (session.call_id, session.remote_identity.uri, session.local_identity.uri)) - ruri = session._invitation.request_uri - config = get_config('%s@%s' % (ruri.user, ruri.host)) + config = get_config('%s@%s' % (session.request_uri.user, session.request_uri.host)) if config is None: - log.msg(u'Session %s rejected: no configuration found for %s' % (session.call_id, ruri)) + log.msg(u'Session %s rejected: no configuration found for %s' % (session.call_id, session.request_uri)) session.reject(488) return stream_types = {'audio'} if config.enable_video: stream_types.add('video') streams = [stream for stream in session.proposed_streams if stream.type in stream_types] if not streams: log.msg(u'Session %s rejected: invalid media, only RTP audio and video is supported' % session.call_id) session.reject(488) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) session.send_ring_indication() reactor.callLater(config.answer_delay, self._accept_session, session, streams) def _accept_session(self, session, streams): if session.state == 'incoming': session.accept(streams) def incoming_subscription(self, request, data): request.reject(405) def incoming_referral(self, request, data): request.reject(405) def incoming_message(self, request, data): request.reject(405) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) @run_in_green_thread def _NH_SIPSessionDidStart(self, notification): session = notification.sender log.msg('Session %s started' % session.call_id) handler = PlaybackHandler(session) handler.run() def _NH_SIPSessionDidFail(self, notification): session = notification.sender log.msg('Session %s failed' % session.call_id) NotificationCenter().remove_observer(self, sender=session) def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.msg('Session %s ended' % session.call_id) NotificationCenter().remove_observer(self, sender=session) def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender session.reject_proposal() class PlaybackHandler(object): implements(IObserver) def __init__(self, session): self.session = session self.proc = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) def run(self): self.proc = proc.spawn(self._play) def _play(self): - ruri = self.session._invitation.request_uri - config = get_config('%s@%s' % (ruri.user, ruri.host)) + config = get_config('%s@%s' % (self.session.request_uri.user, self.session.request_uri.host)) audio_stream = self.session.streams[0] player = WavePlayer(audio_stream.mixer, config.file) audio_stream.bridge.add(player) log.msg(u"Playing file %s for session %s" % (config.file, self.session.call_id)) try: player.play().wait() except (ValueError, WavePlayerError), e: log.warning(u"Error playing file %s: %s" % (config.file, e)) except proc.ProcExit: pass finally: player.stop() self.proc = None audio_stream.bridge.remove(player) self.session.end() self.session = None def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): notification.center.remove_observer(self, sender=notification.sender) if self.proc: self.proc.kill() diff --git a/sylk/applications/xmppgateway/__init__.py b/sylk/applications/xmppgateway/__init__.py index 658a75b..3e461e0 100644 --- a/sylk/applications/xmppgateway/__init__.py +++ b/sylk/applications/xmppgateway/__init__.py @@ -1,540 +1,540 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter from application.python import Null from sipsimple.core import SIPURI, SIPCoreError from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.applications import SylkApplication from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource from sylk.applications.xmppgateway.im import SIPMessageSender, SIPMessageError, ChatSessionHandler from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.presence import S2XPresenceHandler, X2SPresenceHandler from sylk.applications.xmppgateway.media import MediaSessionHandler from sylk.applications.xmppgateway.muc import X2SMucInvitationHandler, S2XMucInvitationHandler, X2SMucHandler from sylk.applications.xmppgateway.util import format_uri from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, NormalMessage class XMPPGatewayApplication(SylkApplication): implements(IObserver) def __init__(self): self.xmpp_manager = XMPPManager() self.pending_sessions = {} self.chat_sessions = set() self.media_sessions = set() self.s2x_muc_sessions = {} self.x2s_muc_sessions = {} self.s2x_presence_subscriptions = {} self.x2s_presence_subscriptions = {} self.s2x_muc_add_participant_handlers = {} self.x2s_muc_add_participant_handlers = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.xmpp_manager) notification_center.add_observer(self, name='JingleSessionNewIncoming') self.xmpp_manager.start() def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.xmpp_manager) notification_center.add_observer(self, name='JingleSessionNewIncoming') self.xmpp_manager.stop() def incoming_session(self, session): stream_types = set([stream.type for stream in session.proposed_streams]) if 'chat' in stream_types: log.msg('New chat session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) self.incoming_chat_session(session) elif 'audio' in stream_types: log.msg('New audio session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) self.incoming_media_session(session) else: log.msg('New session from %s to %s rejected. Unsupported media: %s ' % (session.remote_identity.uri, session.local_identity.uri, stream_types)) session.reject(488) def incoming_chat_session(self, session): # Check if this session is really an invitation to add a participant to a conference room / muc if session.remote_identity.uri.host in self.xmpp_manager.muc_domains and 'isfocus' in session._invitation.remote_contact_header.parameters: try: referred_by_uri = SIPURI.parse(session.transfer_info.referred_by) except SIPCoreError: log.msg("SIP multiparty session invitation %s failed: invalid Referred-By header" % session.call_id) session.reject(488) return muc_uri = FrozenURI(session.remote_identity.uri.user, session.remote_identity.uri.host) inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) recipient_uri = FrozenURI(session.local_identity.uri.user, session.local_identity.uri.host) sender = Identity(muc_uri) recipient = Identity(recipient_uri) inviter = Identity(inviter_uri) try: handler = self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] except KeyError: handler = S2XMucInvitationHandler(session, sender, recipient, inviter) self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] = handler NotificationCenter().add_observer(self, sender=handler) handler.start() else: log.msg("SIP multiparty session invitation %s failed: there is another invitation in progress from %s to %s" % (session.call_id, format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'))) session.reject(480) return # Check domain if session.remote_identity.uri.host not in XMPPGatewayConfig.domains: log.msg('Session rejected: From domain is not a local XMPP domain') session.reject(606, 'Not Acceptable') return # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) # Get URI representing the XMPP side - request_uri = session._invitation.request_uri + request_uri = session.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: pass else: # There is another pending session with same identifiers, can't accept this one log.msg('Session rejected: other session with same identifiers in progress') session.reject(488) return sip_identity = Identity(sip_leg_uri, session.remote_identity.display_name) handler = ChatSessionHandler.new_from_sip_session(sip_identity, session) NotificationCenter().add_observer(self, sender=handler) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler if xmpp_leg_uri.resource is not None: # Incoming session target contained GRUU, so create XMPPChatSession immediately xmpp_session = XMPPChatSession(local_identity=handler.sip_identity, remote_identity=Identity(xmpp_leg_uri)) handler.xmpp_identity = xmpp_session.remote_identity handler.xmpp_session = xmpp_session def incoming_media_session(self, session): if session.remote_identity.uri.host not in self.xmpp_manager.domains|self.xmpp_manager.muc_domains: log.msg('Session rejected: From domain is not a local XMPP domain') session.reject(403) return handler = MediaSessionHandler.new_from_sip_session(session) if handler is not None: NotificationCenter().add_observer(self, sender=handler) def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return log.msg('SIP subscription from %s to %s' % (format_uri(from_header.uri, 'sip'), format_uri(to_header.uri, 'xmpp'))) if subscribe_request.event != 'presence': log.msg('SIP subscription rejected: only presence event is supported') subscribe_request.reject(489) return # Check domain remote_identity_uri = data.headers['From'].uri if remote_identity_uri.host not in XMPPGatewayConfig.domains: log.msg('SIP subscription rejected: From domain is not a local XMPP domain') subscribe_request.reject(606) return # Get URI representing the SIP side sip_leg_uri = FrozenURI(remote_identity_uri.user, remote_identity_uri.host) # Get URI representing the XMPP side request_uri = data.request_uri xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host) try: handler = self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: sip_identity = Identity(sip_leg_uri, data.headers['From'].display_name) xmpp_identity = Identity(xmpp_leg_uri) handler = S2XPresenceHandler(sip_identity, xmpp_identity) self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] = handler NotificationCenter().add_observer(self, sender=handler) handler.start() handler.add_sip_subscription(subscribe_request) def incoming_referral(self, refer_request, data): refer_request.reject(405) def incoming_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null).content_type from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (content_type, from_header, to_header): message_request.answer(400) return log.msg('New SIP Message from %s to %s' % (from_header.uri, to_header.uri)) # Check domain if from_header.uri.host not in XMPPGatewayConfig.domains: log.msg('Message rejected: From domain is not a local XMPP domain') message_request.answer(606) return if content_type == 'message/cpim': try: cpim_message = CPIMMessage.parse(data.body) except CPIMParserError: log.msg('Message rejected: CPIM parse error') message_request.answer(400) return else: body = cpim_message.body content_type = cpim_message.content_type sender = cpim_message.sender or from_header from_uri = sender.uri else: body = data.body from_uri = from_header.uri to_uri = str(to_header.uri) message_request.answer(200) if from_uri.parameters.get('gr', None) is None: from_uri = SIPURI.new(from_uri) from_uri.parameters['gr'] = generate_sylk_resource() sender = Identity(FrozenURI.parse(from_uri)) recipient = Identity(FrozenURI.parse(to_uri)) if content_type in ('text/plain', 'text/html'): if content_type == 'text/plain': html_body = None else: html_body = body body = None if XMPPGatewayConfig.use_msrp_for_chat: message = NormalMessage(sender, recipient, body, html_body, use_receipt=False) self.xmpp_manager.send_stanza(message) else: message = ChatMessage(sender, recipient, body, html_body, use_receipt=False) self.xmpp_manager.send_stanza(message) elif content_type == IsComposingDocument.content_type: if not XMPPGatewayConfig.use_msrp_for_chat: try: msg = IsComposingMessage.parse(body) except ParserError: pass else: state = 'composing' if msg.state == 'active' else 'paused' message = ChatComposingIndication(sender, recipient, state, use_receipt=False) self.xmpp_manager.send_stanza(message) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Out of band XMPP stanza handling @run_in_green_thread def _NH_XMPPGotChatMessage(self, notification): # This notification is only processed here untill the ChatSessionHandler # has both (SIP and XMPP) sessions established message = notification.data.message sender = message.sender recipient = message.recipient if XMPPGatewayConfig.use_msrp_for_chat: if recipient.uri.resource is None: # If recipient resource is not set the session is started from # the XMPP side sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Not found, need to create a new handler and a outgoing SIP session xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler NotificationCenter().add_observer(self, sender=handler) handler.enqueue_xmpp_message(message) else: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI(sender.uri.user, sender.uri.host) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI(recipient.uri.user, recipient.uri.host) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # It's a new XMPP session to a full JID, disregard the full JID and start a new SIP session to the bare JID xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler NotificationCenter().add_observer(self, sender=handler) handler.enqueue_xmpp_message(message) else: # Found handle, create XMPP session and establish session session = XMPPChatSession(local_identity=recipient, remote_identity=sender) handler.enqueue_xmpp_message(message) handler.xmpp_identity = session.remote_identity handler.xmpp_session = session else: sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) @run_in_green_thread def _NH_XMPPGotNormalMessage(self, notification): message = notification.data.message sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) @run_in_green_thread def _NH_XMPPGotComposingIndication(self, notification): composing_indication = notification.data.composing_indication sender = composing_indication.sender recipient = composing_indication.recipient if not XMPPGatewayConfig.use_msrp_for_chat: state = 'active' if composing_indication.state == 'composing' else 'idle' body = IsComposingMessage(state=state, refresh=composing_indication.interval or 30).toxml() message = NormalMessage(sender, recipient, body, IsComposingDocument.content_type) sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) def _NH_XMPPGotPresenceSubscriptionRequest(self, notification): stanza = notification.data.stanza # Disregard the resource part, the presence request could be a probe instead of a subscribe sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: handler = self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] except KeyError: xmpp_identity = stanza.sender xmpp_identity.uri = sender_uri_bare sip_identity = stanza.recipient handler = X2SPresenceHandler(sip_identity, xmpp_identity) self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] = handler notification.center.add_observer(self, sender=handler) handler.start() def _NH_XMPPGotMucJoinRequest(self, notification): stanza = notification.data.stanza muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) nickname = stanza.recipient.uri.resource try: handler = self.x2s_muc_sessions[(stanza.sender.uri, muc_uri)] except KeyError: xmpp_identity = stanza.sender sip_identity = stanza.recipient sip_identity.uri = muc_uri handler = X2SMucHandler(sip_identity, xmpp_identity, nickname) handler._first_stanza = stanza notification.center.add_observer(self, sender=handler) handler.start() # Check if there was a pending join request on the SIP side try: handler = self.s2x_muc_add_participant_handlers[(muc_uri, FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host))] except KeyError: pass else: handler.stop() def _NH_XMPPGotMucAddParticipantRequest(self, notification): sender = notification.data.sender recipient = notification.data.recipient participant = notification.data.participant muc_uri = FrozenURI(recipient.uri.user, recipient.uri.host) sender_uri = FrozenURI(sender.uri.user, sender.uri.host) participant_uri = FrozenURI(participant.uri.user, participant.uri.host) sender = Identity(sender_uri) recipient = Identity(muc_uri) participant = Identity(participant_uri) try: handler = self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] except KeyError: handler = X2SMucInvitationHandler(sender, recipient, participant) self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] = handler notification.center.add_observer(self, sender=handler) handler.start() # Chat session handling def _NH_ChatSessionDidStart(self, notification): handler = notification.sender log.msg('Chat session established sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) for k,v in self.pending_sessions.items(): if v is handler: del self.pending_sessions[k] break self.chat_sessions.add(handler) def _NH_ChatSessionDidEnd(self, notification): handler = notification.sender log.msg('Chat session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.chat_sessions.remove(handler) notification.center.remove_observer(self, sender=handler) def _NH_ChatSessionDidFail(self, notification): handler = notification.sender uris = None for k,v in self.pending_sessions.items(): if v is handler: uris = k del self.pending_sessions[k] break sip_uri, xmpp_uri = uris log.msg('Chat session failed sip:%s <--> xmpp:%s (%s)' % (sip_uri, xmpp_uri, notification.data.reason)) notification.center.remove_observer(self, sender=handler) # Presence handling def _NH_S2XPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) def _NH_S2XPresenceHandlerDidEnd(self, notification): handler = notification.sender self.s2x_presence_subscriptions.pop((handler.sip_identity.uri, handler.xmpp_identity.uri), None) notification.center.remove_observer(self, sender=handler) log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) def _NH_X2SPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) def _NH_X2SPresenceHandlerDidEnd(self, notification): handler = notification.sender self.x2s_presence_subscriptions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification.center.remove_observer(self, sender=handler) log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) # MUC handling def _NH_X2SMucHandlerDidStart(self, notification): handler = notification.sender log.msg('Multiparty session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_muc_sessions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler def _NH_X2SMucHandlerDidEnd(self, notification): handler = notification.sender log.msg('Multiparty session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_muc_sessions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification.center.remove_observer(self, sender=handler) def _NH_X2SMucInvitationHandlerDidStart(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.msg('%s invited %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'))) def _NH_X2SMucInvitationHandlerDidEnd(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.msg('%s added %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'))) del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] notification.center.remove_observer(self, sender=handler) def _NH_X2SMucInvitationHandlerDidFail(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'), notification.data.failure)) del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] notification.center.remove_observer(self, sender=handler) def _NH_S2XMucInvitationHandlerDidStart(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.msg("%s invited %s to multiparty chat %s" % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'))) def _NH_S2XMucInvitationHandlerDidEnd(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.msg('%s added %s to multiparty chat %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'))) del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] notification.center.remove_observer(self, sender=handler) def _NH_S2XMucInvitationHandlerDidFail(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'), str(notification.data.failure))) del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] notification.center.remove_observer(self, sender=handler) # Media sessions def _NH_JingleSessionNewIncoming(self, notification): session = notification.sender handler = MediaSessionHandler.new_from_jingle_session(session) if handler is not None: notification.center.add_observer(self, sender=handler) def _NH_MediaSessionHandlerDidStart(self, notification): handler = notification.sender log.msg('Media session started sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.media_sessions.add(handler) def _NH_MediaSessionHandlerDidEnd(self, notification): handler = notification.sender log.msg('Media session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.media_sessions.remove(handler) notification.center.remove_observer(self, sender=handler) def _NH_MediaSessionHandlerDidFail(self, notification): handler = notification.sender log.msg('Media session failed sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) notification.center.remove_observer(self, sender=handler) diff --git a/sylk/applications/xmppgateway/media.py b/sylk/applications/xmppgateway/media.py index 755ef6d..8f68be8 100644 --- a/sylk/applications/xmppgateway/media.py +++ b/sylk/applications/xmppgateway/media.py @@ -1,333 +1,333 @@ # Copyright (C) 2013 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib.twistedutil import block_on from sipsimple.audio import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import ContactHeader, FromHeader, ToHeader from sipsimple.core import Engine, SIPURI, SIPCoreError from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry as SIPMediaStreamRegistry from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource, decode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry as JingleMediaStreamRegistry from sylk.applications.xmppgateway.xmpp.stanzas import jingle from sylk.configuration import SIPConfig from sylk.session import Session __all__ = ['MediaSessionHandler'] class MediaSessionHandler(object): implements(IObserver) def __init__(self): self.started = False self.ended = False self._sip_identity = None self._xmpp_identity = None self._audio_bidge = AudioConference() self.sip_session = None self.jingle_session = None @classmethod def new_from_sip_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = JingleMediaStreamRegistry().get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject(488) return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) instance._sip_identity = Identity(sip_leg_uri) # Get URI representing the XMPP side - request_uri = session._invitation.request_uri + request_uri = session.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) instance._xmpp_identity = Identity(xmpp_leg_uri) instance.sip_session = session instance._start_outgoing_jingle_session(streams) return instance @classmethod def new_from_jingle_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = SIPMediaStreamRegistry().get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject('unsupported-applications') return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) instance._xmpp_identity = session.remote_identity instance._sip_identity = session.local_identity instance.jingle_session = session instance._start_outgoing_sip_session(streams) return instance @property def sip_identity(self): return self._sip_identity @property def xmpp_identity(self): return self._xmpp_identity def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('MediaSessionHandlerDidStart', sender=self) def _get_started(self): return self.__dict__['started'] started = property(_get_started, _set_started) del _get_started, _set_started @run_in_green_thread def _start_outgoing_sip_session(self, streams): notification_center = NotificationCenter() # self.xmpp_identity is our local identity on the SIP side from_uri = self.xmpp_identity.uri.as_sip_uri() from_uri.parameters.pop('gr', None) # no GRUU in From header to_uri = self.sip_identity.uri.as_sip_uri() to_uri.parameters.pop('gr', None) # no GRUU in To header # TODO: need to fix GRUU in the proxy #contact_uri = self.xmpp_identity.uri.as_sip_uri() #contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('MedialSessionHandlerDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) contact_header = ContactHeader(contact_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) self.sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=streams) @run_in_green_thread def _start_outgoing_jingle_session(self, streams): if self.xmpp_identity.uri.resource is not None: self.sip_session.reject() return xmpp_manager = XMPPManager() local_jid = self.sip_identity.uri.as_xmpp_jid() remote_jid = self.xmpp_identity.uri.as_xmpp_jid() # If this was an invitation to a conference, use the information in the Referred-By header if self.sip_identity.uri.host in xmpp_manager.muc_domains and self.sip_session.transfer_info and self.sip_session.transfer_info.referred_by: try: referred_by_uri = SIPURI.parse(self.sip_session.transfer_info.referred_by) except SIPCoreError: self.sip_session.reject(488) return else: inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) local_jid = inviter_uri.as_xmpp_jid() # Use disco to gather potential JIDs to call d = xmpp_manager.disco_client_protocol.requestItems(remote_jid, sender=local_jid) try: items = block_on(d) except Exception: items = [] if not items: self.sip_session.reject(480) return # Check which items support Jingle valid = [] for item in items: d = xmpp_manager.disco_client_protocol.requestInfo(item.entity, nodeIdentifier=item.nodeIdentifier, sender=local_jid) try: info = block_on(d) except Exception: continue if jingle.NS_JINGLE in info.features and jingle.NS_JINGLE_APPS_RTP in info.features: valid.append(item.entity) if not valid: self.sip_session.reject(480) return # TODO: start multiple sessions? self._xmpp_identity = Identity(FrozenURI.parse(valid[0])) notification_center = NotificationCenter() if self.sip_identity.uri.host in xmpp_manager.muc_domains: self.jingle_session = JingleSession(xmpp_manager.jingle_coin_protocol) else: self.jingle_session = JingleSession(xmpp_manager.jingle_protocol) notification_center.add_observer(self, sender=self.jingle_session) self.jingle_session.connect(self.sip_identity, self.xmpp_identity, streams, is_focus=self.sip_session.remote_focus) def end(self): if self.ended: return notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) if self.sip_session.direction == 'incoming' and not self.started: self.sip_session.reject() else: self.sip_session.end() self.sip_session = None if self.jingle_session is not None: notification_center.remove_observer(self, sender=self.jingle_session) if self.jingle_session.direction == 'incoming' and not self.started: self.jingle_session.reject() else: self.jingle_session.end() self.jingle_session = None self.ended = True if self.started: notification_center.post_notification('MediaSessionHandlerDidEnd', sender=self) else: notification_center.post_notification('MediaSessionHandlerDidFail', sender=self) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP session %s started" % self.sip_session.call_id) if self.sip_session.direction == 'outgoing': # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.jingle_session.accept(self.jingle_session.proposed_streams, is_focus=self.sip_session.remote_focus) else: # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP session %s ended" % self.sip_session.call_id) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP session %s failed (%s)" % (self.sip_session.call_id, notification.data.reason)) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_SIPSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.jingle_session.hold() else: self.jingle_session.unhold() def _NH_SIPSessionGotConferenceInfo(self, notification): self.jingle_session._send_conference_info(notification.data.conference_info.toxml()) def _NH_JingleSessionDidStart(self, notification): log.msg("Jingle session %s started" % notification.sender.id) if self.jingle_session.direction == 'incoming': # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) else: # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.sip_session.accept(self.sip_session.proposed_streams) def _NH_JingleSessionDidEnd(self, notification): log.msg("Jingle session %s ended" % notification.sender.id) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidFail(self, notification): log.msg("Jingle session %s failed (%s)" % (notification.sender.id, notification.data.reason)) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.sip_session.hold() else: self.sip_session.unhold() diff --git a/sylk/session.py b/sylk/session.py index e1b4aec..23a41de 100644 --- a/sylk/session.py +++ b/sylk/session.py @@ -1,1935 +1,1939 @@ # Copyright (C) 2011 AG Projects. See LICENSE for details. # import random from threading import RLock from time import time from application.notification import IObserver, Notification, NotificationCenter, NotificationData from application.python import Null, limit from application.python.decorator import decorator, preserve_signature from application.python.types import Singleton from application.system import host from eventlib import api, coros, proc from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, Invitation, Subscription, SIPCoreError, SIPCoreInvalidStateError, PJSIPError, sip_status_messages from sipsimple.core import ContactHeader, RouteHeader, FromHeader, ToHeader, ReasonHeader, WarningHeader from sipsimple.core import SIPURI, SDPConnection, SDPSession, SDPMediaStream from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import ParserError from sipsimple.payloads.conference import ConferenceDocument from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount class InvitationDisconnectedError(Exception): def __init__(self, invitation, data): self.invitation = invitation self.data = data class MediaStreamDidNotInitializeError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class MediaStreamDidFailError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class SubscriptionError(Exception): def __init__(self, error, timeout, **attributes): self.error = error self.timeout = timeout self.attributes = attributes class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class IllegalStateError(RuntimeError): pass @decorator def transition_state(required_state, new_state): def state_transitioner(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): with obj._lock: if obj.state != required_state: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) obj.state = new_state return func(obj, *args, **kwargs) return wrapper return state_transitioner @decorator def check_state(required_states): def state_checker(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): if obj.state not in required_states: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) return func(obj, *args, **kwargs) return wrapper return state_checker class ConferenceHandler(object): implements(IObserver) def __init__(self, session): self.session = session self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _activate(self): self.active = True command = Command('subscribe') self._command_channel.send(command) return command def _deactivate(self): self.active = False command = Command('unsubscribe') self._command_channel.send(command) return command def _resubscribe(self): command = Command('subscribe') self._command_channel.send(command) return command def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._deactivate() command = Command('terminate') self._command_channel.send(command) command.wait() self.session = None def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): command.signal() raise proc.ProcExit() def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) target_uri = SIPURI.new(self.session.remote_identity.uri) refresh_interval = getattr(command, 'refresh_interval', account.sip.subscribe_interval) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[route] except KeyError: continue subscription = Subscription(target_uri, FromHeader(SIPURI.new(self.session.local_identity.uri)), ToHeader(target_uri), ContactHeader(contact_uri), 'conference', RouteHeader(route.uri), credentials=account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) timeout = 5 raise SubscriptionError(error='Internal error', timeout=timeout) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail, e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time timeout = random.uniform(60, 120) raise SubscriptionError(error='Authentication failed', timeout=timeout) elif e.data.code == 423: # Get the value of the Min-Expires header timeout = random.uniform(60, 120) if e.data.min_expires is not None and e.data.min_expires > refresh_interval: raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires) else: raise SubscriptionError(error='Interval too short', timeout=timeout) elif e.data.code in (405, 406, 489, 1400): command.signal(e) return else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription timeout = random.uniform(60, 180) raise SubscriptionError(error='No more routes to try', timeout=timeout) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'conference' and notification.data.body: try: conference_info = ConferenceDocument.parse(notification.data.body) except ParserError: pass else: notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info)) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) except InterruptSubscription, e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription, e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) except SubscriptionError, e: if 'min_expires' in e.attributes: command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires']) else: command = Command('subscribe', command.event) self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_SIPSessionDidStart(self, notification): if self.session.remote_focus: self._activate() @run_in_green_thread def _NH_SIPSessionDidFail(self, notification): self._terminate() @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): self._terminate() def _NH_SIPSessionDidRenegotiateStreams(self, notification): if self.session.remote_focus and not self.active: self._activate() elif not self.session.remote_focus and self.active: self._deactivate() def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._resubscribe() class Session(object): implements(IObserver) media_stream_timeout = 15 short_reinvite_timeout = 5 def __init__(self, account): self.account = account self.direction = None self.end_time = None self.on_hold = False self.proposed_streams = None self.route = None self.state = None self.start_time = None self.streams = None self.transport = None self.local_focus = False self.remote_focus = False self.greenlet = None self.conference = None self._channel = coros.queue() self._hold_in_progress = False self._invitation = None self._local_identity = None self._remote_identity = None self._lock = RLock() def init_incoming(self, invitation, data): remote_sdp = invitation.sdp.proposed_remote if not remote_sdp: invitation.send_response(488) return self.proposed_streams = [] for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry(): try: stream = stream_type.new_from_sdp(self, remote_sdp, index) except InvalidStreamError: break except UnknownStreamError: continue else: stream.index = index self.proposed_streams.append(stream) break if not self.proposed_streams: invitation.send_response(488) return self.direction = 'incoming' self.state = 'incoming' self.transport = invitation.transport self._invitation = invitation self.conference = ConferenceHandler(self) if 'isfocus' in invitation.remote_contact_header.parameters: self.remote_focus = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=invitation) notification_center.post_notification('SIPSessionNewIncoming', self, NotificationData(streams=self.proposed_streams, headers=data.headers)) @transition_state(None, 'connecting') @run_in_green_thread def connect(self, from_header, to_header, route, streams, is_focus=False, contact_header=None, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') self.direction = 'outgoing' self.proposed_streams = streams self.route = route self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() self._local_identity = from_header self._remote_identity = to_header self.conference = ConferenceHandler(self) notification_center.add_observer(self, sender=self._invitation) notification_center.post_notification('SIPSessionNewOutgoing', self, NotificationData(streams=streams[:])) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 if contact_header is None: try: contact_uri = self.account.contact[self.route] except KeyError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e)) return else: contact_header = ContactHeader(contact_uri) local_ip = contact_header.uri.host connection = SDPConnection(local_ip) local_sdp = SDPSession(local_ip, name=settings.user_agent) for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection local_sdp.media.append(media) route_header = RouteHeader(self.route.uri) if is_focus: contact_header.parameters['isfocus'] = None self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, extra_headers=extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self, ) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connected': if not connected: connected = True else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.end() return notification_center.post_notification('SIPSessionWillStart', self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) [self._channel.send(notification) for notification in invitation_notifications] while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connected': if not connected: connected = True else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=0, reason=None, error=error) except InvitationDisconnectedError, e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator)) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' elif e.data.originator == 'local' and e.data.code == 408: code = e.data.code reason = e.data.reason else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None except SIPCoreError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in self.streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() def _reinvite_after_ice(self): # This function does not do any error checking, it's designed to be called at the end of connect and ad self.state = 'sending_proposal' self.greenlet = api.getcurrent() local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for index, stream in enumerate(self.streams): local_sdp.media[index] = stream.get_local_media(remote_sdp=None, index=index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False try: with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for index, stream in enumerate(self.streams): stream.update(local_sdp, remote_sdp, index) else: return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': self.end() return except Exception: pass finally: self.state = 'connected' self.greenlet = None @check_state(['incoming', 'received_proposal']) @run_in_green_thread def send_ring_indication(self): try: self._invitation.send_response(180) except SIPCoreInvalidStateError: pass # The INVITE session might have already been cancelled; ignore the error @transition_state('incoming', 'accepting') @run_in_green_thread def accept(self, streams, is_focus=False, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() self.local_focus = is_focus connected = False unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') for stream in self.proposed_streams: if stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') self.proposed_streams = streams try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 remote_sdp = self._invitation.sdp.proposed_remote sdp_connection = remote_sdp.connection or next(media.connection for media in remote_sdp.media if media.connection is not None) local_ip = host.outgoing_ip_for(sdp_connection.address) if sdp_connection.address != '0.0.0.0' else sdp_connection.address if local_ip is None: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address') return connection = SDPConnection(local_ip) local_sdp = SDPSession(local_ip, name=settings.user_agent) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection else: media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) contact_header = ContactHeader.new(self._invitation.local_contact_header) try: local_contact_uri = self.account.contact[self._invitation.transport] contact_header.uri = local_contact_uri except KeyError: pass if is_focus: contact_header.parameters['isfocus'] = None self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp, extra_headers=extra_headers) notification_center.post_notification('SIPSessionWillStart', sender=self) # Local and remote SDPs will be set after the 200 OK is sent while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) wait_count = 0 stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map.get(index, None) if stream is not None: if remote_media.port: wait_count += 1 stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): while wait_count > 0 or not connected or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() reason_header = None if isinstance(e, api.TimeoutError): if wait_count > 0: error = 'media stream timed out while starting' else: error = 'No ACK received' reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'Missing ACK' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream did not initialize' else: error = 'media stream failed: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream failed to start' self.start_time = ISOTimestamp.now() if self._invitation.state in ('incoming', 'early'): self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header) else: self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header) except InvitationDisconnectedError, e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' if e.data.prev_state in ('incoming', 'early'): notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None)) elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK': notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote')) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason)) self.greenlet = None except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) except SIPCoreError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() @transition_state('incoming', 'terminating') @run_in_green_thread def reject(self, code=603, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'disconnected': break except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' self.proposed_streams = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) except SIPCoreError, e: self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) except api.TimeoutError: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' self.proposed_streams = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None)) else: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' self.proposed_streams = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None)) @transition_state('received_proposal', 'accepting_proposal') @run_in_green_thread def accept_proposal(self, streams): self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] streams = [stream for stream in streams if stream in self.proposed_streams] for stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') try: wait_count = len(streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 remote_sdp = self._invitation.sdp.proposed_remote connection = SDPConnection(local_sdp.address) stream_map = dict((stream.index, stream) for stream in streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection if index < len(local_sdp.media): local_sdp.media[index] = media else: local_sdp.media.append(media) elif index >= len(local_sdp.media): # actually == is sufficient media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) self._invitation.send_response(200, sdp=local_sdp) prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) received_invitation_state = False received_sdp_update = False while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) for stream in streams: # TODO: check if port is 0 in local_sdp. In that case PJSIP disabled the stream becuase it couldn't # negotiation failed. If there are more streams, however, the negotiation is considered successful as a # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind io # OK, but we cannot really start the stream. -Saul stream.start(local_sdp, remote_sdp, stream.index) with api.timeout(self.media_stream_timeout): wait_count = len(streams) while wait_count > 0 or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 else: unhandled_notifications.append(notification) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e: if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason else: error = 'media stream failed: %s' % e.data.reason self._fail_proposal(originator='remote', error=error) except InvitationDisconnectedError, e: self._fail_proposal(originator='remote', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError, e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.streams + streams proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() @transition_state('received_proposal', 'rejecting_proposal') @run_in_green_thread def reject_proposal(self, code=488, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1, None): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': break except SIPCoreError, e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() def add_stream(self, stream): self.add_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def add_streams(self, streams): streams = list(set(streams).difference(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() unhandled_notifications = [] self.proposed_streams = streams for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': # This is actually the only reason for which this notification could be received if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self._fail_proposal(originator='local', error='received stream proposal') self.handle_notification(notification) return local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.proposed_streams: # Try to reuse a disabled media stream to avoid an ever-growing SDP try: index = next(index for index, media in enumerate(local_sdp.media) if media.port == 0) reuse_media = True except StopIteration: index = len(local_sdp.media) reuse_media = False stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if reuse_media: local_sdp.media[index] = media else: local_sdp.media.append(media) self._invitation.send_reinvite(sdp=local_sdp) notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:])) received_invitation_state = False received_sdp_update = False try: with api.timeout(settings.sip.invite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) else: self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True if notification.data.code >= 300: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.greenlet = None self.state = 'connected' proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) return elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.cancel_proposal() return accepted_streams = [] for stream in self.proposed_streams: try: remote_media = remote_sdp.media[stream.index] except IndexError: self._fail_proposal(originator='local', error='SDP media missing in answer') return else: if remote_media.port: stream.start(local_sdp, remote_sdp, stream.index) accepted_streams.append(stream) else: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): wait_count = len(accepted_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 except (MediaStreamDidFailError, api.TimeoutError), e: if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' else: error = 'media stream failed: %s' % e.data.reason self._fail_proposal(originator='local', error=error) except InvitationDisconnectedError, e: self._fail_proposal(originator='local', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError, e: self._fail_proposal(originator='local', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams += accepted_streams proposed_streams = self.proposed_streams self.proposed_streams = None any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in accepted_streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=accepted_streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=accepted_streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() def remove_stream(self, stream): self.remove_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def remove_streams(self, streams): streams = list(set(streams).intersection(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() self.streams.remove(stream) media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True if not (200 <= notification.data.code < 300): break elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError, e: for stream in streams: stream.end() self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except (api.TimeoutError, MediaStreamDidFailError, SIPCoreError): for stream in streams: stream.end() self.end() else: for stream in streams: stream.end() self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=streams)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() @transition_state('sending_proposal', 'cancelling_proposal') @run_in_green_thread def cancel_proposal(self): if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': if notification.data.code == 487: proposed_streams = (self.proposed_streams or [])[:] for stream in proposed_streams: stream.deactivate() stream.end() self.state = 'connected' self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) elif notification.data.code == 200: self.end() elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError, e: proposed_streams = (self.proposed_streams or [])[:] for stream in proposed_streams: stream.deactivate() stream.end() self.greenlet = None self.state = 'connected' self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=proposed_streams)) except InvitationDisconnectedError, e: self.proposed_streams = None self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) else: self.proposed_streams = None self.greenlet = None self.state = 'connected' finally: if self._hold_in_progress: self._send_hold() @run_in_green_thread def hold(self): if self.on_hold or self._hold_in_progress: return self._hold_in_progress = True streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.hold() if self.state == 'connected': self._send_hold() @run_in_green_thread def unhold(self): if not self.on_hold and not self._hold_in_progress: return self._hold_in_progress = False streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.unhold() if self.state == 'connected': self._send_unhold() @run_in_green_thread def end(self): if self.state in (None, 'terminating', 'terminated'): return if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = None notification_center = NotificationCenter() if self._invitation is None or self._invitation.state is None: # The invitation was not yet constructed self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) return invitation_state = self._invitation.state if invitation_state in ('disconnecting', 'disconnected'): return self.greenlet = api.getcurrent() self.state = 'terminating' if invitation_state == 'connected': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local')) streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() cancelling = invitation_state != 'connected' and self.direction == 'outgoing' try: self._invitation.end(timeout=1) while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': break except SIPCoreError, e: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e))) except InvitationDisconnectedError, e: # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state == 'connected': self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) else: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request')) finally: for stream in streams: stream.end() notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' @property def local_identity(self): if self._invitation is not None and self._invitation.local_identity is not None: return self._invitation.local_identity else: return self._local_identity @property def peer_address(self): return self._invitation.peer_address if self._invitation is not None else None @property def remote_identity(self): if self._invitation is not None and self._invitation.remote_identity is not None: return self._invitation.remote_identity else: return self._remote_identity @property def remote_user_agent(self): return self._invitation.remote_user_agent if self._invitation is not None else None @property def call_id(self): return self._invitation.call_id if self._invitation is not None else None + @property + def request_uri(self): + return self._invitation.request_uri if self._invitation is not None else None + def _cancel_hold(self): notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': if notification.data.code == 200: self.end() return False elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError: pass except InvitationDisconnectedError, e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return False return True def _send_hold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError, e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = True self.state = 'connected' hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams))) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._hold_in_progress = False else: for stream in self.streams: stream.unhold() self._send_unhold() def _send_unhold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError, e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = False self.state = 'connected' notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: for stream in self.streams: stream.hold() self._send_hold() def _fail(self, originator, code, reason, error, reason_header=None): notification_center = NotificationCenter() prev_inv_state = self._invitation.state self.state = 'terminating' if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'): notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator)) if self._invitation.state not in (None, 'disconnecting', 'disconnected'): try: if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'): if 400 <= code <= 699 and reason is not None: self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else []) else: self._invitation.end(extra_headers=[reason_header] if reason_header is not None else []) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': break except (api.TimeoutError, SIPCoreError): pass notification_center.remove_observer(self, sender=self._invitation) self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None)) self.greenlet = None def _fail_proposal(self, originator, error): notification_center = NotificationCenter() has_streams = bool(self.proposed_streams) for stream in self.proposed_streams: try: notification_center.remove_observer(self, sender=stream) except KeyError: # _fail_proposal can be called from reject_proposal, which means the stream will # not have been initialized or the session registered as an observer for it. pass else: stream.deactivate() stream.end() if originator == 'remote' and self._invitation.sub_state == 'received_proposal': try: self._invitation.send_response(488 if has_streams else 500) except SIPCoreError: pass notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:])) self.state = 'connected' self.proposed_streams = None self.greenlet = None @run_in_green_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPInvitationChangedState(self, notification): if self.state == 'terminated': return if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'): contact_header = notification.data.headers.get('Contact', None) if contact_header and 'isfocus' in contact_header[0].parameters: self.remote_focus = True if self.greenlet is not None: if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting': self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data)) else: self._channel.send(notification) else: self.greenlet = api.getcurrent() unhandled_notifications = [] try: if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self.state = 'received_proposal' try: proposed_remote_sdp = self._invitation.sdp.proposed_remote active_remote_sdp = self._invitation.sdp.active_remote if len(proposed_remote_sdp.media) < len(active_remote_sdp.media): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Streams cannot be deleted from the SDP')]) self.state = 'connected' return for stream in self.streams: if not stream.validate_update(proposed_remote_sdp, stream.index): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)]) self.state = 'connected' return added_media_indexes = set() removed_media_indexes = set() reused_media_indexes = set() for index, media_stream in enumerate(proposed_remote_sdp.media): if index >= len(active_remote_sdp.media): added_media_indexes.add(index) elif media_stream.port == 0 and active_remote_sdp.media[index].port > 0: removed_media_indexes.add(index) elif media_stream.port > 0 and active_remote_sdp.media[index].port == 0: reused_media_indexes.add(index) elif media_stream.media != active_remote_sdp.media[index].media: added_media_indexes.add(index) removed_media_indexes.add(index) if added_media_indexes | reused_media_indexes and removed_media_indexes: engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')]) self.state = 'connected' return elif added_media_indexes | reused_media_indexes: self.proposed_streams = [] for index in added_media_indexes | reused_media_indexes: media_stream = proposed_remote_sdp.media[index] if media_stream.port != 0: for stream_type in MediaStreamRegistry(): try: stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index) except InvalidStreamError: break except UnknownStreamError: continue else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self._invitation.send_response(100) notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:])) else: self._invitation.send_response(488) self.state = 'connected' return else: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes] prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) for stream in removed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=proposed_remote_sdp, index=stream.index) try: self._invitation.send_response(200, sdp=local_sdp) except PJSIPError: for stream in removed_streams: self.streams.remove(stream) stream.end() if removed_streams: self.end() return else: try: self._invitation.send_response(488) except PJSIPError: self.end() return else: for stream in removed_streams: self.streams.remove(stream) stream.end() received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) if removed_media_indexes: notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams)) except InvitationDisconnectedError, e: self.greenlet = None self.state == 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: self.end() else: self.state = 'connected' elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request': self.state = 'received_proposal_request' try: # An empty proposal was received, generate an offer self._invitation.send_response(100) local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip) if local_sdp.connection is not None: local_sdp.connection.address = connection_address for index, stream in enumerate(self.streams): stream.reset(index) media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is not None: media.connection.address = connection_address local_sdp.media[stream.index] = media self._invitation.send_response(200, sdp=local_sdp) received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) except InvitationDisconnectedError, e: self.greenlet = None self.state == 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: raise # FIXME else: self.state = 'connected' elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal': if notification.data.originator == 'local' and notification.data.code == 487: self.state = 'connected' proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() elif notification.data.state == 'disconnected': if self.state == 'incoming': self.state = 'terminated' if notification.data.originator == 'remote': notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: # There must have been an error involved notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator)) for stream in self.streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' self.end_time = ISOTimestamp.now() notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason)) notification.center.remove_observer(self, sender=self._invitation) finally: self.greenlet = None for notification in unhandled_notifications: self.handle_notification(notification) def _NH_SIPInvitationGotSDPUpdate(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_SIPInvitationTransferNewIncoming(self, notification): self._invitation.notify_transfer_progress(500) def _NH_MediaStreamDidInitialize(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidStart(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if self.greenlet is not None and self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data)) def _NH_MediaStreamDidFail(self, notification): if self.greenlet is not None: if self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) else: stream = notification.sender if self.streams == [stream]: self.end() else: try: self.remove_stream(stream) except IllegalStateError: self.end() class SessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = [] self.state = None self._channel = coros.queue() def start(self): self.state = 'starting' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillStart', sender=self) notification_center.add_observer(self, 'SIPInvitationChangedState') notification_center.add_observer(self, 'SIPSessionNewIncoming') notification_center.add_observer(self, 'SIPSessionNewOutgoing') notification_center.add_observer(self, 'SIPSessionDidFail') notification_center.add_observer(self, 'SIPSessionDidEnd') self.state = 'started' notification_center.post_notification('SIPSessionManagerDidStart', sender=self) def stop(self): self.state = 'stopping' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillEnd', sender=self) for session in self.sessions: session.end() while self.sessions: self._channel.wait() notification_center.remove_observer(self, 'SIPInvitationChangedState') notification_center.remove_observer(self, 'SIPSessionNewIncoming') notification_center.remove_observer(self, 'SIPSessionNewOutgoing') notification_center.remove_observer(self, 'SIPSessionDidFail') notification_center.remove_observer(self, 'SIPSessionDidEnd') self.state = 'stopped' notification_center.post_notification('SIPSessionManagerDidEnd', sender=self) @run_in_twisted_thread def handle_notification(self, notification): if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming': account_manager = AccountManager() account = account_manager.find_account(notification.data.request_uri) if account is None: account = DefaultAccount() notification.sender.send_response(100) session = Session(account) session.init_incoming(notification.sender, notification.data) elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'): self.sessions.append(notification.sender) elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): self.sessions.remove(notification.sender) if self.state == 'stopping': self._channel.send(notification)