diff --git a/sylk/applications/__init__.py b/sylk/applications/__init__.py index 79608a8..3995989 100644 --- a/sylk/applications/__init__.py +++ b/sylk/applications/__init__.py @@ -1,306 +1,306 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # __all__ = ['ISylkApplication', 'ApplicationRegistry', 'SylkApplication', 'IncomingRequestHandler', 'ApplicationLogger'] import abc import os import socket import struct import sys from application import log from application.configuration.datatypes import NetworkRange from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.types import Singleton from itertools import chain from sipsimple.threading import run_in_twisted_thread from zope.interface import implements from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig SYLK_APP_HEADER = 'X-Sylk-App' class ApplicationRegistry(object): __metaclass__ = Singleton def __init__(self): self.applications = [] def __iter__(self): return iter(self.applications) def add(self, app): if app not in self.applications: self.applications.append(app) class ApplicationName(object): def __get__(self, obj, objtype): name = objtype.__name__ return name[:-11].lower() if name.endswith('Application') else name.lower() class SylkApplicationMeta(abc.ABCMeta, Singleton): """Metaclass for defining SylkServer applications: a Singleton that also adds them to the application registry""" def __init__(cls, name, bases, dic): super(SylkApplicationMeta, cls).__init__(name, bases, dic) if name != 'SylkApplication': ApplicationRegistry().add(cls) class SylkApplication(object): """Base class for all SylkServer applications""" __metaclass__ = SylkApplicationMeta __appname__ = ApplicationName() @abc.abstractmethod def start(self): pass @abc.abstractmethod def stop(self): pass @abc.abstractmethod def incoming_session(self, session): pass @abc.abstractmethod def incoming_subscription(self, subscribe_request, data): pass @abc.abstractmethod def incoming_referral(self, refer_request, data): pass @abc.abstractmethod - def incoming_sip_message(self, message_request, data): + def incoming_message(self, message_request, data): pass def load_builtin_applications(): toplevel = os.path.dirname(__file__) app_list = [item for item in os.listdir(toplevel) if os.path.isdir(os.path.join(toplevel, item)) and '__init__.py' in os.listdir(os.path.join(toplevel, item))] for module in ['sylk.applications.%s' % item for item in set(app_list).difference(ServerConfig.disabled_applications)]: __import__(module) def load_extra_applications(): if ServerConfig.extra_applications_dir: toplevel = os.path.realpath(os.path.abspath(ServerConfig.extra_applications_dir.normalized)) if os.path.isdir(toplevel): app_list = [item for item in os.listdir(toplevel) if os.path.isdir(os.path.join(toplevel, item)) and '__init__.py' in os.listdir(os.path.join(toplevel, item))] sys.path.append(toplevel) for module in (item for item in set(app_list).difference(ServerConfig.disabled_applications)): __import__(module) def load_applications(): load_builtin_applications() load_extra_applications() [app() for app in ApplicationRegistry()] class ApplicationNotLoadedError(Exception): pass class IncomingRequestHandler(object): """ Handle incoming requests and match them to applications. """ __metaclass__ = Singleton implements(IObserver) def __init__(self): load_applications() log.msg('Loaded applications: %s' % ', '.join([app.__appname__ for app in ApplicationRegistry()])) self.application_map = dict((item.split(':')) for item in ServerConfig.application_map) self.authorization_handler = AuthorizationHandler() def start(self): [app().start() for app in ApplicationRegistry()] self.authorization_handler.start() notification_center = NotificationCenter() notification_center.add_observer(self, name='SIPSessionNewIncoming') notification_center.add_observer(self, name='SIPIncomingSubscriptionGotSubscribe') notification_center.add_observer(self, name='SIPIncomingReferralGotRefer') notification_center.add_observer(self, name='SIPIncomingRequestGotRequest') def stop(self): self.authorization_handler.stop() notification_center = NotificationCenter() notification_center.remove_observer(self, name='SIPSessionNewIncoming') notification_center.remove_observer(self, name='SIPIncomingSubscriptionGotSubscribe') notification_center.remove_observer(self, name='SIPIncomingReferralGotRefer') notification_center.remove_observer(self, name='SIPIncomingRequestGotRequest') [app().stop() for app in ApplicationRegistry()] def get_application(self, ruri, headers): if SYLK_APP_HEADER in headers: application = headers[SYLK_APP_HEADER].body.strip() else: application = ServerConfig.default_application if self.application_map: prefixes = ("%s@%s" % (ruri.user, ruri.host), ruri.host, ruri.user) for prefix in prefixes: if prefix in self.application_map: application = self.application_map[prefix] break try: app = next(app for app in ApplicationRegistry() if app.__appname__ == application) except StopIteration: log.error('Application %s is not loaded' % application) raise ApplicationNotLoadedError else: return app() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionNewIncoming(self, notification): session = notification.sender try: self.authorization_handler.authorize_source(session.peer_address.ip) except UnauthorizedRequest: session.reject(403) return try: app = self.get_application(session._invitation.request_uri, notification.data.headers) except ApplicationNotLoadedError: session.reject(404) else: app.incoming_session(session) def _NH_SIPIncomingSubscriptionGotSubscribe(self, notification): subscribe_request = notification.sender try: self.authorization_handler.authorize_source(subscribe_request.peer_address.ip) except UnauthorizedRequest: subscribe_request.reject(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: subscribe_request.reject(404) else: app.incoming_subscription(subscribe_request, notification.data) def _NH_SIPIncomingReferralGotRefer(self, notification): refer_request = notification.sender try: self.authorization_handler.authorize_source(refer_request.peer_address.ip) except UnauthorizedRequest: refer_request.reject(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: refer_request.reject(404) else: app.incoming_referral(refer_request, notification.data) def _NH_SIPIncomingRequestGotRequest(self, notification): request = notification.sender if notification.data.method != 'MESSAGE': request.answer(405) return try: self.authorization_handler.authorize_source(request.peer_address.ip) except UnauthorizedRequest: request.answer(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: request.answer(404) else: - app.incoming_sip_message(request, notification.data) + app.incoming_message(request, notification.data) class UnauthorizedRequest(Exception): pass class AuthorizationHandler(object): implements(IObserver) def __init__(self): self.state = None self.trusted_peers = SIPConfig.trusted_peers self.thor_nodes = [] @property def trusted_parties(self): if ThorNodeConfig.enabled: return self.thor_nodes return self.trusted_peers def start(self): NotificationCenter().add_observer(self, name='ThorNetworkGotUpdate') self.state = 'started' def stop(self): self.state = 'stopped' NotificationCenter().remove_observer(self, name='ThorNetworkGotUpdate') def authorize_source(self, ip_address): if self.state != 'started': raise UnauthorizedRequest for range in self.trusted_parties: if struct.unpack('!L', socket.inet_aton(ip_address))[0] & range[1] == range[0]: return True raise UnauthorizedRequest @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_ThorNetworkGotUpdate(self, notification): thor_nodes = [] for node in chain(*(n.nodes for n in notification.data.networks.values())): thor_nodes.append(NetworkRange(node)) self.thor_nodes = thor_nodes class ApplicationLogger(object): __metaclass__ = Singleton @classmethod def for_package(cls, package): return cls(package.split('.')[-1]) def __init__(self, prefix): self.prefix = '[%s] ' % prefix def info(self, message, **context): log.info(self.prefix+message, **context) def warning(self, message, **context): log.warning(self.prefix+message, **context) def debug(self, message, **context): log.debug(self.prefix+message, **context) def error(self, message, **context): log.error(self.prefix+message, **context) def critical(self, message, **context): log.critical(self.prefix+message, **context) def exception(self, message=None, **context): if message is not None: message = self.prefix+message log.exception(message, **context) # Some aliases that are commonly used msg = info warn = warning fatal = critical err = exception diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py index fa15959..95b4943 100644 --- a/sylk/applications/conference/__init__.py +++ b/sylk/applications/conference/__init__.py @@ -1,428 +1,428 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # import mimetypes import os import re from application.notification import IObserver, NotificationCenter from application.python import Null from gnutls.interfaces.twisted import X509Credentials from sipsimple.account import AccountManager from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError, Header, ContactHeader, FromHeader, ToHeader from sipsimple.lookup import DNSLookup from sipsimple.streams import AudioStream from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.applications import SylkApplication from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.logger import log from sylk.applications.conference.room import Room from sylk.applications.conference.web import ScreenSharingWebServer from sylk.bonjour import BonjourServices from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.extensions import ChatStream from sylk.session import Session from sylk.tls import Certificate, PrivateKey class ACLValidationError(Exception): pass class RoomNotFoundError(Exception): pass class ConferenceApplication(SylkApplication): implements(IObserver) def __init__(self): self._rooms = {} self.pending_sessions = [] self.invited_participants_map = {} self.bonjour_focus_service = Null() self.bonjour_room_service = Null() self.screen_sharing_web_server = None def start(self): if ServerConfig.enable_bonjour: self.bonjour_focus_service = BonjourServices(service='sipfocus') self.bonjour_focus_service.start() log.msg("Bonjour publication started for service 'sipfocus'") self.bonjour_room_service = BonjourServices(service='sipuri', name='Conference Room', uri_user='conference') self.bonjour_room_service.start() self.bonjour_room_service.presence_state = BonjourPresenceState('available', u'No participants') log.msg("Bonjour publication started for service 'sipuri'") self.screen_sharing_web_server = ScreenSharingWebServer(ConferenceConfig.screen_sharing_dir) if ConferenceConfig.screen_sharing_use_https and ConferenceConfig.screen_sharing_certificate is not None: cert = Certificate(ConferenceConfig.screen_sharing_certificate.normalized) key = PrivateKey(ConferenceConfig.screen_sharing_certificate.normalized) credentials = X509Credentials(cert, key) else: credentials = None self.screen_sharing_web_server.start(ConferenceConfig.screen_sharing_ip, ConferenceConfig.screen_sharing_port, credentials) listen_address = self.screen_sharing_web_server.listener.getHost() log.msg("ScreenSharing listener started on %s:%d" % (listen_address.host, listen_address.port)) def stop(self): self.bonjour_focus_service.stop() self.bonjour_room_service.stop() self.screen_sharing_web_server.stop() def get_room(self, uri, create=False): room_uri = '%s@%s' % (uri.user, uri.host) try: room = self._rooms[room_uri] except KeyError: if create: room = Room(room_uri) self._rooms[room_uri] = room return room else: raise RoomNotFoundError else: return room def remove_room(self, uri): room_uri = '%s@%s' % (uri.user, uri.host) self._rooms.pop(room_uri, None) def validate_acl(self, room_uri, from_uri): room_uri = '%s@%s' % (room_uri.user, room_uri.host) cfg = get_room_config(room_uri) if cfg.access_policy == 'allow,deny': if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri): return raise ACLValidationError else: if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri): raise ACLValidationError def incoming_session(self, session): log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer'] if not audio_streams and not chat_streams and not transfer_streams: log.msg(u'Session rejected: invalid media, only RTP audio and MSRP chat are supported') session.reject(488) return try: self.validate_acl(session._invitation.request_uri, session.remote_identity.uri) except ACLValidationError: log.msg(u'Session rejected: unauthorized by access list') session.reject(403) return # Check if requested files belong to this room for stream in (stream for stream in transfer_streams if stream.direction == 'sendonly'): try: room = self.get_room(session._invitation.request_uri) except RoomNotFoundError: log.msg(u'Session rejected: room not found') session.reject(404) return try: file = next(file for file in room.files if file.hash == stream.file_selector.hash) except StopIteration: log.msg(u'Session rejected: requested file not found') session.reject(404) return filename = os.path.basename(file.name) for dirpath, dirnames, filenames in os.walk(os.path.join(ConferenceConfig.file_transfer_dir, room.uri)): if filename in filenames: path = os.path.join(dirpath, filename) stream.file_selector.fd = open(path, 'r') if stream.file_selector.size is None: stream.file_selector.size = os.fstat(stream.file_selector.fd.fileno()).st_size if stream.file_selector.type is None: mime_type, encoding = mimetypes.guess_type(filename) if encoding is not None: type = 'application/x-%s' % encoding elif mime_type is not None: type = mime_type else: type = 'application/octet-stream' stream.file_selector.type = type break else: # File got removed from the filesystem log.msg(u'Session rejected: requested file removed from the filesystem') session.reject(404) return self.pending_sessions.append(session) NotificationCenter().add_observer(self, sender=session) if audio_streams: session.send_ring_indication() streams = [streams[0] for streams in (audio_streams, chat_streams, transfer_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return if subscribe_request.event != 'conference': log.msg(u'Subscription rejected: only conference event is supported') subscribe_request.reject(489) return try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: try: self.validate_acl(to_header.uri, from_header.uri) except ACLValidationError: # Check if we need to skip the ACL because this was an invited participant if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})): log.msg(u'Subscription rejected: unauthorized by access list') subscribe_request.reject(403) return try: room = self.get_room(data.request_uri) except RoomNotFoundError: try: room = self.get_room(to_header.uri) except RoomNotFoundError: log.msg(u'Subscription rejected: room not yet created') subscribe_request.reject(480) return if not room.started: log.msg(u'Subscription rejected: room not started yet') subscribe_request.reject(480) else: room.handle_incoming_subscription(subscribe_request, data) def incoming_referral(self, refer_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) refer_to_header = data.headers.get('Refer-To', Null) if Null in (from_header, to_header, refer_to_header): refer_request.reject(400) return log.msg(u'Room %s - join request from %s to %s' % ('%s@%s' % (to_header.uri.user, to_header.uri.host), from_header.uri, refer_to_header.uri)) try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: log.msg(u'Room %s - invite participant request rejected: unauthorized by access list' % data.request_uri) refer_request.reject(403) return referral_handler = IncomingReferralHandler(refer_request, data) referral_handler.start() - def incoming_sip_message(self, message_request, data): - log.msg(u'SIP Message is not supported, use MSRP media instead') + def incoming_message(self, message_request, data): + log.msg(u'SIP MESSAGE is not supported, use MSRP media instead') message_request.answer(405) def accept_session(self, session, streams): if session in self.pending_sessions: session.accept(streams, is_focus=True) def add_participant(self, session, room_uri): # Keep track of the invited participants, we must skip ACL policy # for SUBSCRIBE requests room_uri_str = '%s@%s' % (room_uri.user, room_uri.host) log.msg(u'Room %s - outgoing session to %s started' % (room_uri_str, session.remote_identity.uri)) d = self.invited_participants_map.setdefault(room_uri_str, {}) d.setdefault(str(session.remote_identity.uri), 0) d[str(session.remote_identity.uri)] += 1 NotificationCenter().add_observer(self, sender=session) room = self.get_room(room_uri, True) room.start() room.add_session(session) def remove_participant(self, participant_uri, room_uri): try: room = self.get_room(room_uri) except RoomNotFoundError: pass else: log.msg('Room %s - %s removed from conference' % (room_uri, participant_uri)) room.terminate_sessions(participant_uri) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender self.pending_sessions.remove(session) room = self.get_room(session._invitation.request_uri, True) # FIXME room.start() room.add_session(session) @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) if session.direction == 'incoming': room_uri = session._invitation.request_uri # FIXME else: # Clear invited participants mapping room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host) d = self.invited_participants_map[room_uri_str] d[str(session.remote_identity.uri)] -= 1 if d[str(session.remote_identity.uri)] == 0: del d[str(session.remote_identity.uri)] room_uri = session.local_identity.uri # We could get this notifiction even if we didn't get SIPSessionDidStart try: room = self.get_room(room_uri) except RoomNotFoundError: return if session in room.sessions: room.remove_session(session) if not room.stopping and room.empty: self.remove_room(room_uri) room.stop() def _NH_SIPSessionDidFail(self, notification): session = notification.sender self.pending_sessions.remove(session) notification.center.remove_observer(self, sender=session) log.msg(u'Session from %s failed: %s' % (session.remote_identity.uri, notification.data.reason)) class IncomingReferralHandler(object): implements(IObserver) def __init__(self, refer_request, data): self._refer_request = refer_request self._refer_headers = data.headers self.room_uri = data.request_uri self.room_uri_str = '%s@%s' % (self.room_uri.user, self.room_uri.host) self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri) self.method = data.headers.get('Refer-To').parameters.get('method', 'INVITE').upper() self.session = None self.streams = [] def start(self): if not self.refer_to_uri.startswith(('sip:', 'sips:')): self.refer_to_uri = 'sip:%s' % self.refer_to_uri try: self.refer_to_uri = SIPURI.parse(self.refer_to_uri) except SIPCoreError: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.reject(488) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._refer_request) if self.method == 'INVITE': self._refer_request.accept() settings = SIPSimpleSettings() account = AccountManager().sylkserver_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.refer_to_uri lookup = DNSLookup() notification_center.add_observer(self, sender=lookup) lookup.lookup_sip_proxy(uri, settings.sip.transport_list) elif self.method == 'BYE': log.msg('Room %s - %s removed %s from the room' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self._refer_request.accept() conference_application = ConferenceApplication() conference_application.remove_participant(self.refer_to_uri, self.room_uri) self._refer_request.end(200) else: self._refer_request.reject(488) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) account = AccountManager().sylkserver_account conference_application = ConferenceApplication() try: room = conference_application.get_room(self.room_uri) except RoomNotFoundError: log.msg('Room %s - failed to add %s to %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return else: active_media = room.active_media if not active_media: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return if 'audio' in active_media: self.streams.append(AudioStream()) if 'chat' in active_media: self.streams.append(ChatStream()) self.session = Session(account) notification_center.add_observer(self, sender=self.session) original_from_header = self._refer_headers.get('From') if original_from_header.display_name: original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host) else: original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host) from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call') to_header = ToHeader(self.refer_to_uri) transport = notification.data.result[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)) extra_headers = [] if self._refer_headers.get('Referred-By', None) is not None: extra_headers.append(Header.new(self._refer_headers.get('Referred-By'))) else: extra_headers.append(Header('Referred-By', str(original_from_header.uri))) if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(original_from_header.uri))) subject = u'Join conference request from %s' % original_identity self.session.connect(from_header, to_header, contact_header=contact_header, routes=notification.data.result, streams=self.streams, is_focus=True, subject=subject, extra_headers=extra_headers) def _NH_DNSLookupDidFail(self, notification): notification.center.remove_observer(self, sender=notification.sender) def _NH_SIPSessionGotRingIndication(self, notification): if self._refer_request is not None: self._refer_request.send_notify(180) def _NH_SIPSessionGotProvisionalResponse(self, notification): if self._refer_request is not None: self._refer_request.send_notify(notification.data.code, notification.data.reason) def _NH_SIPSessionDidStart(self, notification): notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) conference_application = ConferenceApplication() conference_application.add_participant(self.session, self.room_uri) log.msg('Room %s - %s added %s' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self.session = None self.streams = [] def _NH_SIPSessionDidFail(self, notification): log.msg('Room %s - failed to add %s: %s' % (self.room_uri_str, self.refer_to_uri, notification.data.reason)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(notification.data.code or 500, notification.data.reason or notification.data.code) self.session = None self.streams = [] def _NH_SIPSessionDidEnd(self, notification): # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) self.session = None self.streams = [] def _NH_SIPIncomingReferralDidEnd(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._refer_request = None diff --git a/sylk/applications/echo/__init__.py b/sylk/applications/echo/__init__.py index e87c24d..e69c985 100644 --- a/sylk/applications/echo/__init__.py +++ b/sylk/applications/echo/__init__.py @@ -1,148 +1,148 @@ # Copyright (C) 2013 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter from application.python import Null from twisted.internet import reactor from zope.interface import implements from sylk.applications import SylkApplication, ApplicationLogger log = ApplicationLogger.for_package(__package__) def format_identity(identity): return u'%s ' % (identity.display_name, identity.uri.user, identity.uri.host) class EchoApplication(SylkApplication): implements(IObserver) def start(self): self.pending = set() self.sessions = set() def stop(self): self.pending.clear() self.sessions.clear() def incoming_session(self, session): session.call_id = session._invitation.call_id log.msg(u'New incoming session %s from %s' % (session.call_id, format_identity(session.remote_identity))) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: log.msg(u'Session %s rejected: invalid media, only RTP audio and MSRP chat are supported' % session.call_id) session.reject(488) return NotificationCenter().add_observer(self, sender=session) if audio_streams: session.send_ring_indication() streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] reactor.callLater(2 if audio_streams else 0, self._accept_session, session, streams) self.pending.add(session) session._end_timer = None def incoming_subscription(self, request, data): request.reject(405) def incoming_referral(self, request, data): request.reject(405) - def incoming_sip_message(self, request, data): + def incoming_message(self, request, data): request.reject(405) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _accept_session(self, session, streams): if session in self.pending: session.accept(streams) def _make_audio_stream_echo(self, stream): if stream.producer_slot is not None and stream.consumer_slot is not None: # TODO: handle slot changes stream.bridge.remove(stream.device) stream.mixer.connect_slots(stream.producer_slot, stream.consumer_slot) def _NH_SIPSessionDidStart(self, notification): session = notification.sender self.pending.remove(session) self.sessions.add(session) try: audio_stream = next(stream for stream in session.streams if stream.type == 'audio') except StopIteration: audio_stream = None try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: chat_stream = None log.msg('Session %s started' % session.call_id) if audio_stream is not None: self._make_audio_stream_echo(audio_stream) notification.center.add_observer(self, sender=audio_stream) if chat_stream is not None: notification.center.add_observer(self, sender=chat_stream) session._end_timer = reactor.callLater(600, session.end) def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.msg('Session %s ended' % session.call_id) notification.center.remove_observer(self, sender=session) # We could get DidEnd even if we never got DidStart self.sessions.discard(session) self.pending.discard(session) if session._end_timer is not None and session._end_timer.active(): session._end_timer.cancel() session._end_timer = None def _NH_SIPSessionDidFail(self, notification): session = notification.sender log.msg(u'Session %s failed from %s' % (session.call_id, format_identity(session.remote_identity))) self.pending.remove(session) notification.center.remove_observer(self, sender=session) if session._end_timer is not None and session._end_timer.active(): session._end_timer.cancel() session._end_timer = None def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender message = notification.data.message content_type = message.content_type.lower() if content_type.startswith('text/'): stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') stream.send_message(message.body, message.content_type) else: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] session.accept_proposal(streams) def _NH_SIPSessionDidRenegotiateStreams(self, notification): session = notification.sender for stream in notification.data.added_streams: notification.center.add_observer(self, sender=stream) log.msg(u'Session %s has added %s' % (session.call_id, stream.type)) if stream.type == 'audio': self._make_audio_stream_echo(stream) for stream in notification.data.removed_streams: notification.center.remove_observer(self, sender=stream) log.msg(u'Session %s has removed %s' % (session.call_id, stream.type)) if not session.streams: log.msg(u'Session %s has removed all streams, session will be terminated' % session.call_id) session.end() def _NH_SIPSessionTransferNewIncoming(self, notification): notification.sender.reject_transfer(403) diff --git a/sylk/applications/ircconference/__init__.py b/sylk/applications/ircconference/__init__.py index aca8ea5..59ad8ea 100644 --- a/sylk/applications/ircconference/__init__.py +++ b/sylk/applications/ircconference/__init__.py @@ -1,100 +1,100 @@ # Copyright (C) 2011 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter from application.python import Null from twisted.internet import reactor from zope.interface import implements from sylk.applications import SylkApplication from sylk.applications.ircconference.logger import log from sylk.applications.ircconference.room import IRCRoom class IRCConferenceApplication(SylkApplication): implements(IObserver) def __init__(self): self.rooms = set() self.pending_sessions = [] def start(self): pass def stop(self): pass def incoming_session(self, session): log.msg('New incoming session from %s' % session.remote_identity.uri) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject(488) return self.pending_sessions.append(session) NotificationCenter().add_observer(self, sender=session) if audio_streams: session.send_ring_indication() if chat_streams: for stream in chat_streams: # Disable chatroom capabilities other than nickname stream.chatroom_capabilities = ['nickname'] streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): to_header = data.headers.get('To', Null) if to_header is Null: subscribe_request.reject(400) return room = IRCRoom.get_room(data.request_uri) if not room.started: room = IRCRoom.get_room(to_header.uri) if not room.started: subscribe_request.reject(480) return room.handle_incoming_subscription(subscribe_request, data) - def incoming_referral(self, refer_request, data): - pass + def incoming_referral(self, request, data): + request.reject(405) - def incoming_sip_message(self, message_request, data): - pass + def incoming_message(self, request, data): + request.reject(405) def accept_session(self, session, streams): if session in self.pending_sessions: session.accept(streams, is_focus=True) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender self.pending_sessions.remove(session) room = IRCRoom.get_room(session._invitation.request_uri) # FIXME room.start() room.add_session(session) self.rooms.add(room) def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.msg('Session from %s ended' % session.remote_identity.uri) NotificationCenter().remove_observer(self, sender=session) room = IRCRoom.get_room(session._invitation.request_uri) # FIXME if session in room.sessions: # We could get this notifiction even if we didn't get SIPSessionDidStart room.remove_session(session) if room.empty: room.stop() try: self.rooms.remove(room) except KeyError: pass def _NH_SIPSessionDidFail(self, notification): session = notification.sender self.pending_sessions.remove(session) log.msg('Session from %s failed' % session.remote_identity.uri) diff --git a/sylk/applications/playback/__init__.py b/sylk/applications/playback/__init__.py index 38acfbd..ca0d517 100644 --- a/sylk/applications/playback/__init__.py +++ b/sylk/applications/playback/__init__.py @@ -1,124 +1,124 @@ # Copyright (C) 2013 AG Projects. See LICENSE for details # from application.python import Null from application.notification import IObserver, NotificationCenter from eventlib import proc from sipsimple.audio import WavePlayer, WavePlayerError from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.applications import SylkApplication, ApplicationLogger from sylk.applications.playback.configuration import get_file_for_uri log = ApplicationLogger.for_package(__package__) class PlaybackApplication(SylkApplication): implements(IObserver) def start(self): pass def stop(self): pass def incoming_session(self, session): log.msg('Incoming session %s from %s to %s' % (session._invitation.call_id, session.remote_identity.uri, session.local_identity.uri)) try: audio_stream = next(stream for stream in session.proposed_streams if stream.type=='audio') except StopIteration: log.msg(u'Session %s rejected: invalid media, only RTP audio is supported' % session.call_id) session.reject(488) return else: notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) session.send_ring_indication() # TODO: configurable answer delay reactor.callLater(1, self._accept_session, session, audio_stream) def _accept_session(self, session, audio_stream): if session.state == 'incoming': session.accept([audio_stream]) def incoming_subscription(self, request, data): request.reject(405) def incoming_referral(self, request, data): request.reject(405) - def incoming_sip_message(self, request, data): + def incoming_message(self, request, data): request.reject(405) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) @run_in_green_thread def _NH_SIPSessionDidStart(self, notification): session = notification.sender log.msg('Session %s started' % session._invitation.call_id) handler = PlaybackHandler(session) handler.run() def _NH_SIPSessionDidFail(self, notification): session = notification.sender log.msg('Session %s failed' % session._invitation.call_id) NotificationCenter().remove_observer(self, sender=session) def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.msg('Session %s ended' % session._invitation.call_id) NotificationCenter().remove_observer(self, sender=session) def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender session.reject_proposal() class InterruptPlayback(Exception): pass class PlaybackHandler(object): implements(IObserver) def __init__(self, session): self.session = session self.proc = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) def run(self): self.proc = proc.spawn(self._play) def _play(self): ruri = self.session._invitation.request_uri file = get_file_for_uri('%s@%s' % (ruri.user, ruri.host)) audio_stream = self.session.streams[0] player = WavePlayer(audio_stream.mixer, file) audio_stream.bridge.add(player) log.msg(u"Playing file %s for session %s" % (file, self.session._invitation.call_id)) try: player.play().wait() except (ValueError, WavePlayerError), e: log.warning(u"Error playing file %s: %s" % (file, e)) except InterruptPlayback: pass finally: self.proc = None audio_stream.bridge.remove(player) self.session.end() self.session = None def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): notification.center.remove_observer(self, sender=notification.sender) if self.proc: self.proc.kill(InterruptPlayback) diff --git a/sylk/applications/xmppgateway/__init__.py b/sylk/applications/xmppgateway/__init__.py index c69487f..0a043a5 100644 --- a/sylk/applications/xmppgateway/__init__.py +++ b/sylk/applications/xmppgateway/__init__.py @@ -1,540 +1,540 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter from application.python import Null from sipsimple.core import SIPURI, SIPCoreError from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.applications import SylkApplication from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource from sylk.applications.xmppgateway.im import SIPMessageSender, SIPMessageError, ChatSessionHandler from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.presence import S2XPresenceHandler, X2SPresenceHandler from sylk.applications.xmppgateway.media import MediaSessionHandler from sylk.applications.xmppgateway.muc import X2SMucInvitationHandler, S2XMucInvitationHandler, X2SMucHandler from sylk.applications.xmppgateway.util import format_uri from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, NormalMessage class XMPPGatewayApplication(SylkApplication): implements(IObserver) def __init__(self): self.xmpp_manager = XMPPManager() self.pending_sessions = {} self.chat_sessions = set() self.media_sessions = set() self.s2x_muc_sessions = {} self.x2s_muc_sessions = {} self.s2x_presence_subscriptions = {} self.x2s_presence_subscriptions = {} self.s2x_muc_add_participant_handlers = {} self.x2s_muc_add_participant_handlers = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.xmpp_manager) notification_center.add_observer(self, name='JingleSessionNewIncoming') self.xmpp_manager.start() def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.xmpp_manager) notification_center.add_observer(self, name='JingleSessionNewIncoming') self.xmpp_manager.stop() def incoming_session(self, session): stream_types = set([stream.type for stream in session.proposed_streams]) if 'chat' in stream_types: log.msg('New chat session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) self.incoming_chat_session(session) elif 'audio' in stream_types: log.msg('New audio session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) self.incoming_media_session(session) else: log.msg('New session from %s to %s rejected. Unsupported media: %s ' % (session.remote_identity.uri, session.local_identity.uri, stream_types)) session.reject(488) def incoming_chat_session(self, session): # Check if this session is really an invitation to add a participant to a conference room / muc if session.remote_identity.uri.host in self.xmpp_manager.muc_domains and 'isfocus' in session._invitation.remote_contact_header.parameters: try: referred_by_uri = SIPURI.parse(session.transfer_info.referred_by) except SIPCoreError: log.msg("SIP multiparty session invitation %s failed: invalid Referred-By header" % session._invitation.call_id) session.reject(488) return muc_uri = FrozenURI(session.remote_identity.uri.user, session.remote_identity.uri.host) inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) recipient_uri = FrozenURI(session.local_identity.uri.user, session.local_identity.uri.host) sender = Identity(muc_uri) recipient = Identity(recipient_uri) inviter = Identity(inviter_uri) try: handler = self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] except KeyError: handler = S2XMucInvitationHandler(session, sender, recipient, inviter) self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] = handler NotificationCenter().add_observer(self, sender=handler) handler.start() else: log.msg("SIP multiparty session invitation %s failed: there is another invitation in progress from %s to %s" % (session._invitation.call_id, format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'))) session.reject(480) return # Check domain if session.remote_identity.uri.host not in XMPPGatewayConfig.domains: log.msg('Session rejected: From domain is not a local XMPP domain') session.reject(606, 'Not Acceptable') return # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) # Get URI representing the XMPP side request_uri = session._invitation.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: pass else: # There is another pending session with same identifiers, can't accept this one log.msg('Session rejected: other session with same identifiers in progress') session.reject(488) return sip_identity = Identity(sip_leg_uri, session.remote_identity.display_name) handler = ChatSessionHandler.new_from_sip_session(sip_identity, session) NotificationCenter().add_observer(self, sender=handler) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler if xmpp_leg_uri.resource is not None: # Incoming session target contained GRUU, so create XMPPChatSession immediately xmpp_session = XMPPChatSession(local_identity=handler.sip_identity, remote_identity=Identity(xmpp_leg_uri)) handler.xmpp_identity = xmpp_session.remote_identity handler.xmpp_session = xmpp_session def incoming_media_session(self, session): if session.remote_identity.uri.host not in self.xmpp_manager.domains|self.xmpp_manager.muc_domains: log.msg('Session rejected: From domain is not a local XMPP domain') session.reject(403) return handler = MediaSessionHandler.new_from_sip_session(session) if handler is not None: NotificationCenter().add_observer(self, sender=handler) def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return log.msg('SIP subscription from %s to %s' % (format_uri(from_header.uri, 'sip'), format_uri(to_header.uri, 'xmpp'))) if subscribe_request.event != 'presence': log.msg('SIP subscription rejected: only presence event is supported') subscribe_request.reject(489) return # Check domain remote_identity_uri = data.headers['From'].uri if remote_identity_uri.host not in XMPPGatewayConfig.domains: log.msg('SIP subscription rejected: From domain is not a local XMPP domain') subscribe_request.reject(606) return # Get URI representing the SIP side sip_leg_uri = FrozenURI(remote_identity_uri.user, remote_identity_uri.host) # Get URI representing the XMPP side request_uri = data.request_uri xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host) try: handler = self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: sip_identity = Identity(sip_leg_uri, data.headers['From'].display_name) xmpp_identity = Identity(xmpp_leg_uri) handler = S2XPresenceHandler(sip_identity, xmpp_identity) self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] = handler NotificationCenter().add_observer(self, sender=handler) handler.start() handler.add_sip_subscription(subscribe_request) def incoming_referral(self, refer_request, data): refer_request.reject(405) - def incoming_sip_message(self, message_request, data): + def incoming_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null).content_type from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (content_type, from_header, to_header): message_request.answer(400) return log.msg('New SIP Message from %s to %s' % (from_header.uri, to_header.uri)) # Check domain if from_header.uri.host not in XMPPGatewayConfig.domains: log.msg('Message rejected: From domain is not a local XMPP domain') message_request.answer(606) return if content_type == 'message/cpim': try: cpim_message = CPIMMessage.parse(data.body) except CPIMParserError: log.msg('Message rejected: CPIM parse error') message_request.answer(400) return else: body = cpim_message.body content_type = cpim_message.content_type sender = cpim_message.sender or from_header from_uri = sender.uri else: body = data.body from_uri = from_header.uri to_uri = str(to_header.uri) message_request.answer(200) if from_uri.parameters.get('gr', None) is None: from_uri = SIPURI.new(from_uri) from_uri.parameters['gr'] = generate_sylk_resource() sender = Identity(FrozenURI.parse(from_uri)) recipient = Identity(FrozenURI.parse(to_uri)) if content_type in ('text/plain', 'text/html'): if content_type == 'text/plain': html_body = None else: html_body = body body = None if XMPPGatewayConfig.use_msrp_for_chat: message = NormalMessage(sender, recipient, body, html_body, use_receipt=False) self.xmpp_manager.send_stanza(message) else: message = ChatMessage(sender, recipient, body, html_body, use_receipt=False) self.xmpp_manager.send_stanza(message) elif content_type == IsComposingDocument.content_type: if not XMPPGatewayConfig.use_msrp_for_chat: try: msg = IsComposingMessage.parse(body) except ParserError: pass else: state = 'composing' if msg.state == 'active' else 'paused' message = ChatComposingIndication(sender, recipient, state, use_receipt=False) self.xmpp_manager.send_stanza(message) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Out of band XMPP stanza handling @run_in_green_thread def _NH_XMPPGotChatMessage(self, notification): # This notification is only processed here untill the ChatSessionHandler # has both (SIP and XMPP) sessions established message = notification.data.message sender = message.sender recipient = message.recipient if XMPPGatewayConfig.use_msrp_for_chat: if recipient.uri.resource is None: # If recipient resource is not set the session is started from # the XMPP side sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Not found, need to create a new handler and a outgoing SIP session xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler NotificationCenter().add_observer(self, sender=handler) handler.enqueue_xmpp_message(message) else: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI(sender.uri.user, sender.uri.host) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI(recipient.uri.user, recipient.uri.host) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # It's a new XMPP session to a full JID, disregard the full JID and start a new SIP session to the bare JID xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler NotificationCenter().add_observer(self, sender=handler) handler.enqueue_xmpp_message(message) else: # Found handle, create XMPP session and establish session session = XMPPChatSession(local_identity=recipient, remote_identity=sender) handler.enqueue_xmpp_message(message) handler.xmpp_identity = session.remote_identity handler.xmpp_session = session else: sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) @run_in_green_thread def _NH_XMPPGotNormalMessage(self, notification): message = notification.data.message sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) @run_in_green_thread def _NH_XMPPGotComposingIndication(self, notification): composing_indication = notification.data.composing_indication sender = composing_indication.sender recipient = composing_indication.recipient if not XMPPGatewayConfig.use_msrp_for_chat: state = 'active' if composing_indication.state == 'composing' else 'idle' body = IsComposingMessage(state=state, refresh=composing_indication.interval or 30).toxml() message = NormalMessage(sender, recipient, body, IsComposingDocument.content_type) sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) def _NH_XMPPGotPresenceSubscriptionRequest(self, notification): stanza = notification.data.stanza # Disregard the resource part, the presence request could be a probe instead of a subscribe sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: handler = self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] except KeyError: xmpp_identity = stanza.sender xmpp_identity.uri = sender_uri_bare sip_identity = stanza.recipient handler = X2SPresenceHandler(sip_identity, xmpp_identity) self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] = handler notification.center.add_observer(self, sender=handler) handler.start() def _NH_XMPPGotMucJoinRequest(self, notification): stanza = notification.data.stanza muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) nickname = stanza.recipient.uri.resource try: handler = self.x2s_muc_sessions[(stanza.sender.uri, muc_uri)] except KeyError: xmpp_identity = stanza.sender sip_identity = stanza.recipient sip_identity.uri = muc_uri handler = X2SMucHandler(sip_identity, xmpp_identity, nickname) handler._first_stanza = stanza notification.center.add_observer(self, sender=handler) handler.start() # Check if there was a pending join request on the SIP side try: handler = self.s2x_muc_add_participant_handlers[(muc_uri, FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host))] except KeyError: pass else: handler.stop() def _NH_XMPPGotMucAddParticipantRequest(self, notification): sender = notification.data.sender recipient = notification.data.recipient participant = notification.data.participant muc_uri = FrozenURI(recipient.uri.user, recipient.uri.host) sender_uri = FrozenURI(sender.uri.user, sender.uri.host) participant_uri = FrozenURI(participant.uri.user, participant.uri.host) sender = Identity(sender_uri) recipient = Identity(muc_uri) participant = Identity(participant_uri) try: handler = self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] except KeyError: handler = X2SMucInvitationHandler(sender, recipient, participant) self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] = handler notification.center.add_observer(self, sender=handler) handler.start() # Chat session handling def _NH_ChatSessionDidStart(self, notification): handler = notification.sender log.msg('Chat session established sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) for k,v in self.pending_sessions.items(): if v is handler: del self.pending_sessions[k] break self.chat_sessions.add(handler) def _NH_ChatSessionDidEnd(self, notification): handler = notification.sender log.msg('Chat session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.chat_sessions.remove(handler) notification.center.remove_observer(self, sender=handler) def _NH_ChatSessionDidFail(self, notification): handler = notification.sender uris = None for k,v in self.pending_sessions.items(): if v is handler: uris = k del self.pending_sessions[k] break sip_uri, xmpp_uri = uris log.msg('Chat session failed sip:%s <--> xmpp:%s (%s)' % (sip_uri, xmpp_uri, notification.data.reason)) notification.center.remove_observer(self, sender=handler) # Presence handling def _NH_S2XPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) def _NH_S2XPresenceHandlerDidEnd(self, notification): handler = notification.sender self.s2x_presence_subscriptions.pop((handler.sip_identity.uri, handler.xmpp_identity.uri), None) notification.center.remove_observer(self, sender=handler) log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) def _NH_X2SPresenceHandlerDidStart(self, notification): handler = notification.sender log.msg('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) def _NH_X2SPresenceHandlerDidEnd(self, notification): handler = notification.sender self.x2s_presence_subscriptions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification.center.remove_observer(self, sender=handler) log.msg('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip'))) log.msg('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions.keys()), len(self.x2s_presence_subscriptions.keys()))) # MUC handling def _NH_X2SMucHandlerDidStart(self, notification): handler = notification.sender log.msg('Multiparty session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_muc_sessions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler def _NH_X2SMucHandlerDidEnd(self, notification): handler = notification.sender log.msg('Multiparty session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_muc_sessions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification.center.remove_observer(self, sender=handler) def _NH_X2SMucInvitationHandlerDidStart(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.msg('%s invited %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'))) def _NH_X2SMucInvitationHandlerDidEnd(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.msg('%s added %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'))) del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] notification.center.remove_observer(self, sender=handler) def _NH_X2SMucInvitationHandlerDidFail(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'), notification.data.failure)) del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] notification.center.remove_observer(self, sender=handler) def _NH_S2XMucInvitationHandlerDidStart(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.msg("%s invited %s to multiparty chat %s" % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'))) def _NH_S2XMucInvitationHandlerDidEnd(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.msg('%s added %s to multiparty chat %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'))) del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] notification.center.remove_observer(self, sender=handler) def _NH_S2XMucInvitationHandlerDidFail(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.msg('%s could not add %s to multiparty chat %s: %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'), str(notification.data.failure))) del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] notification.center.remove_observer(self, sender=handler) # Media sessions def _NH_JingleSessionNewIncoming(self, notification): session = notification.sender handler = MediaSessionHandler.new_from_jingle_session(session) if handler is not None: notification.center.add_observer(self, sender=handler) def _NH_MediaSessionHandlerDidStart(self, notification): handler = notification.sender log.msg('Media session started sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.media_sessions.add(handler) def _NH_MediaSessionHandlerDidEnd(self, notification): handler = notification.sender log.msg('Media session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.media_sessions.remove(handler) notification.center.remove_observer(self, sender=handler) def _NH_MediaSessionHandlerDidFail(self, notification): handler = notification.sender log.msg('Media session failed sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) notification.center.remove_observer(self, sender=handler)