diff --git a/sylk/__init__.py b/sylk/__init__.py index 0524d4b..9ad252c 100644 --- a/sylk/__init__.py +++ b/sylk/__init__.py @@ -1,7 +1,4 @@ -"""SylkServer""" - __version__ = '4.1.0' -configuration_filename = "config.ini" - +configuration_filename = 'config.ini' diff --git a/sylk/accounts.py b/sylk/accounts.py index a570a80..01054ff 100644 --- a/sylk/accounts.py +++ b/sylk/accounts.py @@ -1,74 +1,75 @@ from application.system import host from sipsimple.account import Account, AccountManager from sipsimple.configuration import SettingsObject from sipsimple.configuration.datatypes import SIPAddress from sipsimple.core import Engine, Route, SIPURI from sylk.configuration import SIPConfig -__all__ = ['DefaultAccount'] + +__all__ = 'DefaultAccount', class DefaultContactURIFactory(object): def __init__(self): self.username = 'sylkserver' def __getitem__(self, key): if isinstance(key, tuple): # The first part of the key might be PublicGRUU and so on, but we don't care about # those here, so ignore them _, key = key if not isinstance(key, (basestring, Route)): raise KeyError("key must be a transport name or Route instance") transport = key if isinstance(key, basestring) else key.transport parameters = {} if transport=='udp' else {'transport': transport} if SIPConfig.local_ip not in (None, '0.0.0.0'): ip = SIPConfig.local_ip.normalized elif isinstance(key, basestring): ip = host.default_ip else: ip = host.outgoing_ip_for(key.address) if ip is None: raise KeyError("could not get outgoing IP address") port = getattr(Engine(), '%s_port' % transport, None) if port is None: raise KeyError("unsupported transport: %s" % transport) uri = SIPURI(user=self.username, host=ip, port=port) uri.parameters.update(parameters) return uri class DefaultAccount(Account): """ Subclass of Account which doesn't start any subsystem. SylkServer just uses it as the default account for all applications as a settings object. """ __id__ = SIPAddress('default@sylkserver') id = property(lambda self: self.__id__) enabled = True def __new__(cls): with AccountManager.load.lock: if not AccountManager.load.called: raise RuntimeError("cannot instantiate %s before calling AccountManager.load" % cls.__name__) return SettingsObject.__new__(cls) def __init__(self): super(DefaultAccount, self).__init__('default@sylkserver') self.contact = DefaultContactURIFactory() @property def uri(self): return SIPURI(user='sylkserver', host=SIPConfig.local_ip.normalized) def _activate(self): pass def _deactivate(self): pass diff --git a/sylk/applications/__init__.py b/sylk/applications/__init__.py index f63415d..d5f2cac 100644 --- a/sylk/applications/__init__.py +++ b/sylk/applications/__init__.py @@ -1,330 +1,331 @@ -__all__ = ['ISylkApplication', 'ApplicationRegistry', 'SylkApplication', 'IncomingRequestHandler', 'ApplicationLogger'] - import abc import imp import logging import os import socket import struct import sys from application import log from application.configuration.datatypes import NetworkRange from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.decorator import execute_once from application.python.types import Singleton from collections import defaultdict from itertools import chain from sipsimple.threading import run_in_twisted_thread from zope.interface import implements from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig +__all__ = 'ISylkApplication', 'ApplicationRegistry', 'SylkApplication', 'IncomingRequestHandler', 'ApplicationLogger' + + SYLK_APP_HEADER = 'X-Sylk-App' def find_builtin_applications(): applications_directory = os.path.dirname(__file__) for path, dirs, files in os.walk(applications_directory): parent_directory, name = os.path.split(path) if parent_directory == applications_directory and '__init__.py' in files and name not in ServerConfig.disabled_applications: yield name if path != applications_directory: del dirs[:] # do not descend more than 1 level def find_extra_applications(): if ServerConfig.extra_applications_dir: applications_directory = os.path.realpath(ServerConfig.extra_applications_dir.normalized) for path, dirs, files in os.walk(applications_directory): parent_directory, name = os.path.split(path) if parent_directory == applications_directory and '__init__.py' in files and name not in ServerConfig.disabled_applications: yield name if path != applications_directory: del dirs[:] # do not descend more than 1 level def find_applications(): return chain(find_builtin_applications(), find_extra_applications()) class ApplicationRegistry(object): __metaclass__ = Singleton def __init__(self): self.application_map = {} def __getitem__(self, name): return self.application_map[name] def __contains__(self, name): return name in self.application_map def __iter__(self): return iter(self.application_map.values()) def __len__(self): return len(self.application_map) @execute_once def load_applications(self): for name in find_builtin_applications(): try: __import__('sylk.applications.{name}'.format(name=name)) except ImportError as e: log.error('Failed to load builtin application {name!r}: {exception!s}'.format(name=name, exception=e)) for name in find_extra_applications(): if name in sys.modules: # being able to log this is contingent on this function only executing once log.warning('Not loading extra application {name!r} as it would overshadow a system package/module'.format(name=name)) continue try: imp.load_module(name, *imp.find_module(name, [ServerConfig.extra_applications_dir.normalized])) except ImportError as e: log.error('Failed to load extra application {name!r}: {exception!s}'.format(name=name, exception=e)) def add(self, app_class): try: app = app_class() except Exception as e: log.exception('Failed to initialize {app.__appname__!r} application: {exception!s}'.format(app=app_class, exception=e)) else: self.application_map[app.__appname__] = app def get(self, name, default=None): return self.application_map.get(name, default) class ApplicationName(object): def __get__(self, instance, instance_type): name = instance_type.__name__ return name[:-11].lower() if name.endswith('Application') else name.lower() class SylkApplicationMeta(abc.ABCMeta, Singleton): """Metaclass for defining SylkServer applications: a Singleton that also adds them to the application registry""" def __init__(cls, name, bases, dic): super(SylkApplicationMeta, cls).__init__(name, bases, dic) if name != 'SylkApplication': ApplicationRegistry().add(cls) class SylkApplication(object): """Base class for all SylkServer applications""" __metaclass__ = SylkApplicationMeta __appname__ = ApplicationName() @abc.abstractmethod def start(self): pass @abc.abstractmethod def stop(self): pass @abc.abstractmethod def incoming_session(self, session): pass @abc.abstractmethod def incoming_subscription(self, subscribe_request, data): pass @abc.abstractmethod def incoming_referral(self, refer_request, data): pass @abc.abstractmethod def incoming_message(self, message_request, data): pass class ApplicationNotLoadedError(Exception): pass class IncomingRequestHandler(object): """Handle incoming requests and match them to applications""" __metaclass__ = Singleton implements(IObserver) def __init__(self): self.application_registry = ApplicationRegistry() self.application_registry.load_applications() log.info('Loaded applications: {}'.format(', '.join(sorted(app.__appname__ for app in self.application_registry)))) if ServerConfig.default_application not in self.application_registry: log.warning('Default application "%s" does not exist, falling back to "conference"' % ServerConfig.default_application) ServerConfig.default_application = 'conference' else: log.info('Default application: %s' % ServerConfig.default_application) self.application_map = dict((item.split(':')) for item in ServerConfig.application_map) if self.application_map: txt = 'Application map:\n' inverted_app_map = defaultdict(list) for url, app in self.application_map.iteritems(): inverted_app_map[app].append(url) for app, urls in inverted_app_map.iteritems(): txt += ' {}: {}\n'.format(app, ', '.join(urls)) log.info(txt[:-1]) self.authorization_handler = AuthorizationHandler() def start(self): for app in self.application_registry: try: app.start() except Exception as e: log.exception('Failed to start {app.__appname__!r} application: {exception!s}'.format(app=app, exception=e)) self.authorization_handler.start() notification_center = NotificationCenter() notification_center.add_observer(self, name='SIPSessionNewIncoming') notification_center.add_observer(self, name='SIPIncomingSubscriptionGotSubscribe') notification_center.add_observer(self, name='SIPIncomingReferralGotRefer') notification_center.add_observer(self, name='SIPIncomingRequestGotRequest') def stop(self): self.authorization_handler.stop() notification_center = NotificationCenter() notification_center.remove_observer(self, name='SIPSessionNewIncoming') notification_center.remove_observer(self, name='SIPIncomingSubscriptionGotSubscribe') notification_center.remove_observer(self, name='SIPIncomingReferralGotRefer') notification_center.remove_observer(self, name='SIPIncomingRequestGotRequest') for app in self.application_registry: try: app.stop() except Exception as e: log.exception('Failed to stop {app.__appname__!r} application: {exception!s}'.format(app=app, exception=e)) def get_application(self, ruri, headers): if SYLK_APP_HEADER in headers: application_name = headers[SYLK_APP_HEADER].body.strip() else: application_name = ServerConfig.default_application if self.application_map: prefixes = ("%s@%s" % (ruri.user, ruri.host), ruri.host, ruri.user) for prefix in prefixes: if prefix in self.application_map: application_name = self.application_map[prefix] break try: return self.application_registry[application_name] except KeyError: log.error('Application %s is not loaded' % application_name) raise ApplicationNotLoadedError @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionNewIncoming(self, notification): session = notification.sender try: self.authorization_handler.authorize_source(session.peer_address.ip) except UnauthorizedRequest: session.reject(403) return try: app = self.get_application(session.request_uri, notification.data.headers) except ApplicationNotLoadedError: session.reject(404) else: app.incoming_session(session) def _NH_SIPIncomingSubscriptionGotSubscribe(self, notification): subscribe_request = notification.sender try: self.authorization_handler.authorize_source(subscribe_request.peer_address.ip) except UnauthorizedRequest: subscribe_request.reject(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: subscribe_request.reject(404) else: app.incoming_subscription(subscribe_request, notification.data) def _NH_SIPIncomingReferralGotRefer(self, notification): refer_request = notification.sender try: self.authorization_handler.authorize_source(refer_request.peer_address.ip) except UnauthorizedRequest: refer_request.reject(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: refer_request.reject(404) else: app.incoming_referral(refer_request, notification.data) def _NH_SIPIncomingRequestGotRequest(self, notification): request = notification.sender if notification.data.method != 'MESSAGE': request.answer(405) return try: self.authorization_handler.authorize_source(request.peer_address.ip) except UnauthorizedRequest: request.answer(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: request.answer(404) else: app.incoming_message(request, notification.data) class UnauthorizedRequest(Exception): pass class AuthorizationHandler(object): implements(IObserver) def __init__(self): self.state = None self.trusted_peers = SIPConfig.trusted_peers self.thor_nodes = [] @property def trusted_parties(self): if ThorNodeConfig.enabled: return self.thor_nodes return self.trusted_peers def start(self): NotificationCenter().add_observer(self, name='ThorNetworkGotUpdate') self.state = 'started' def stop(self): self.state = 'stopped' NotificationCenter().remove_observer(self, name='ThorNetworkGotUpdate') def authorize_source(self, ip_address): if self.state != 'started': raise UnauthorizedRequest for range in self.trusted_parties: if struct.unpack('!L', socket.inet_aton(ip_address))[0] & range[1] == range[0]: return True raise UnauthorizedRequest @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_ThorNetworkGotUpdate(self, notification): self.thor_nodes = [NetworkRange(node) for node in chain.from_iterable(n.nodes for n in notification.data.networks.values())] class ApplicationLogger(object): def __new__(cls, package): return logging.getLogger(package.split('.')[-1]) diff --git a/sylk/applications/conference/logger.py b/sylk/applications/conference/logger.py index 4eb1438..78a9ea7 100644 --- a/sylk/applications/conference/logger.py +++ b/sylk/applications/conference/logger.py @@ -1,7 +1,6 @@ -__all__ = ['log'] - from sylk.applications import ApplicationLogger -log = ApplicationLogger(__package__) +__all__ = 'log', +log = ApplicationLogger(__package__) diff --git a/sylk/applications/ircconference/configuration.py b/sylk/applications/ircconference/configuration.py index b15194e..56b908a 100644 --- a/sylk/applications/ircconference/configuration.py +++ b/sylk/applications/ircconference/configuration.py @@ -1,29 +1,31 @@ -__all__ = ['get_room_configuration'] - from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import EndpointAddress +__all__ = 'get_room_configuration', + + def get_room_configuration(room): IRCConferenceConfig.read(section=room) config = Configuration(dict(IRCConferenceConfig)) IRCConferenceConfig.reset() return config class Configuration(object): def __init__(self, data): self.__dict__.update(data) + class IRCServer(EndpointAddress): default_port = 6667 name = 'IRC server address' + class IRCConferenceConfig(ConfigSection): __cfgfile__ = 'ircconference.ini' channel = 'test' server = ConfigSetting(type=IRCServer, value='irc.freenode.net:6667') website = 'http://sylkserver.com' - diff --git a/sylk/applications/ircconference/logger.py b/sylk/applications/ircconference/logger.py index 4eb1438..78a9ea7 100644 --- a/sylk/applications/ircconference/logger.py +++ b/sylk/applications/ircconference/logger.py @@ -1,7 +1,6 @@ -__all__ = ['log'] - from sylk.applications import ApplicationLogger -log = ApplicationLogger(__package__) +__all__ = 'log', +log = ApplicationLogger(__package__) diff --git a/sylk/applications/playback/configuration.py b/sylk/applications/playback/configuration.py index 88cc01e..17c4b76 100644 --- a/sylk/applications/playback/configuration.py +++ b/sylk/applications/playback/configuration.py @@ -1,46 +1,46 @@ -__all__ = ['get_config'] - import os from application.configuration import ConfigFile, ConfigSection, ConfigSetting from sylk.configuration.datatypes import Path from sylk.resources import Resources +__all__ = 'get_config', + + class GeneralConfig(ConfigSection): __cfgfile__ = 'playback.ini' __section__ = 'Playback' files_dir = ConfigSetting(type=Path, value=Path(Resources.get('sounds/playback'))) enable_video = False answer_delay = 1 class PlaybackConfig(ConfigSection): __cfgfile__ = 'playback.ini' file = ConfigSetting(type=Path, value=None) enable_video = GeneralConfig.enable_video answer_delay = GeneralConfig.answer_delay class Configuration(object): def __init__(self, data): self.__dict__.update(data) def get_config(uri): config_file = ConfigFile(PlaybackConfig.__cfgfile__) GeneralConfig.read(cfgfile=config_file) section = config_file.get_section(uri) if section is not None: PlaybackConfig.read(section=uri) if not os.path.isabs(PlaybackConfig.file): PlaybackConfig.file = os.path.join(GeneralConfig.files_dir, PlaybackConfig.file) config = Configuration(dict(PlaybackConfig)) PlaybackConfig.reset() return config return None - diff --git a/sylk/applications/webrtcgateway/models/sylkrtc.py b/sylk/applications/webrtcgateway/models/sylkrtc.py index a1a088f..7f86f95 100644 --- a/sylk/applications/webrtcgateway/models/sylkrtc.py +++ b/sylk/applications/webrtcgateway/models/sylkrtc.py @@ -1,200 +1,199 @@ -__all__ = ['AccountAddRequest', 'AccountRemoveRequest', 'AccountRegisterRequest', 'AccountUnregisterRequest', - 'SessionCreateRequest', 'SessionAnswerRequest', 'SessionTrickleRequest', 'SessionTerminateRequest', - 'AckResponse', 'ErrorResponse', - 'ReadyEvent'] - import re from jsonmodels import models, fields, errors, validators from sipsimple.core import SIPURI, SIPCoreError +__all__ = ('AccountAddRequest', 'AccountRemoveRequest', 'AccountRegisterRequest', 'AccountUnregisterRequest', + 'SessionCreateRequest', 'SessionAnswerRequest', 'SessionTrickleRequest', 'SessionTerminateRequest', + 'AckResponse', 'ErrorResponse', + 'ReadyEvent') + SIP_PREFIX_RE = re.compile('^sips?:') class DefaultValueField(fields.BaseField): def __init__(self, value): self.default_value = value super(DefaultValueField, self).__init__() def validate(self, value): if value != self.default_value: raise errors.ValidationError('%s does not match the expected value %s' % (value, self.default_value)) def get_default_value(self): return self.default_value def URIValidator(value): uri = SIP_PREFIX_RE.sub('', value) try: SIPURI.parse('sip:%s' % uri) except SIPCoreError: raise errors.ValidationError('invalid URI: %s' % value) def URIListValidator(values): for item in values: URIValidator(item) class OptionsValidator(object): def __init__(self, options): self.options = options def __call__(self, value): if value not in self.options: raise errors.ValidationError('invalid option: %s' % value) # Base models class SylkRTCRequestBase(models.Base): transaction = fields.StringField(required=True) class SylkRTCResponseBase(models.Base): transaction = fields.StringField(required=True) # Miscellaneous models class AckResponse(SylkRTCResponseBase): sylkrtc = DefaultValueField('ack') class ErrorResponse(SylkRTCResponseBase): sylkrtc = DefaultValueField('error') error = fields.StringField(required=True) class ICECandidate(models.Base): candidate = fields.StringField(required=True) sdpMLineIndex = fields.IntField(required=True) sdpMid = fields.StringField(required=True) class ReadyEvent(models.Base): sylkrtc = DefaultValueField('event') event = DefaultValueField('ready') # Account models class AccountRequestBase(SylkRTCRequestBase): account = fields.StringField(required=True, validators=[URIValidator]) class AccountAddRequest(AccountRequestBase): sylkrtc = DefaultValueField('account-add') password = fields.StringField(required=True, validators=[validators.Length(minimum_value=1, maximum_value=9999)]) display_name = fields.StringField(required=False) user_agent = fields.StringField(required=False) class AccountRemoveRequest(AccountRequestBase): sylkrtc = DefaultValueField('account-remove') class AccountRegisterRequest(AccountRequestBase): sylkrtc = DefaultValueField('account-register') class AccountUnregisterRequest(AccountRequestBase): sylkrtc = DefaultValueField('account-unregister') class AccountDeviceTokenRequest(AccountRequestBase): sylkrtc = DefaultValueField('account-devicetoken') old_token = fields.StringField(required=False) new_token = fields.StringField(required=False) # Session models class SessionRequestBase(SylkRTCRequestBase): session = fields.StringField(required=True) class SessionCreateRequest(SessionRequestBase): sylkrtc = DefaultValueField('session-create') account = fields.StringField(required=True, validators=[URIValidator]) uri = fields.StringField(required=True, validators=[URIValidator]) sdp = fields.StringField(required=True) class SessionAnswerRequest(SessionRequestBase): sylkrtc = DefaultValueField('session-answer') sdp = fields.StringField(required=True) class SessionTrickleRequest(SessionRequestBase): sylkrtc = DefaultValueField('session-trickle') candidates = fields.ListField([ICECandidate]) class SessionTerminateRequest(SessionRequestBase): sylkrtc = DefaultValueField('session-terminate') # VideoRoom models class VideoRoomRequestBase(SylkRTCRequestBase): session = fields.StringField(required=True) class VideoRoomJoinRequest(VideoRoomRequestBase): sylkrtc = DefaultValueField('videoroom-join') account = fields.StringField(required=True, validators=[URIValidator]) uri = fields.StringField(required=True, validators=[URIValidator]) sdp = fields.StringField(required=True) class VideoRoomControlTrickleRequest(models.Base): # ID for the subscriber session, if specified, otherwise the publisher is considered session = fields.StringField(required=False) candidates = fields.ListField([ICECandidate]) class VideoRoomControlUpdateRequest(models.Base): audio = fields.BoolField(required=False) video = fields.BoolField(required=False) bitrate = fields.IntField(required=False) class VideoRoomControlFeedAttachRequest(models.Base): session = fields.StringField(required=True) publisher = fields.StringField(required=True) class VideoRoomControlFeedAnswerRequest(models.Base): session = fields.StringField(required=True) sdp = fields.StringField(required=True) class VideoRoomControlFeedDetachRequest(models.Base): session = fields.StringField(required=True) class VideoRoomControlInviteParticipantsRequest(models.Base): participants = fields.ListField([str, unicode], validators=[URIListValidator]) class VideoRoomControlRequest(VideoRoomRequestBase): sylkrtc = DefaultValueField('videoroom-ctl') option = fields.StringField(required=True, validators=[OptionsValidator(['trickle', 'update', 'feed-attach', 'feed-answer', 'feed-detach', 'invite-participants'])]) # all other options should have optional fields below, and the application needs to do a little validation trickle = fields.EmbeddedField(VideoRoomControlTrickleRequest, required=False) update = fields.EmbeddedField(VideoRoomControlUpdateRequest, required=False) feed_attach = fields.EmbeddedField(VideoRoomControlFeedAttachRequest, required=False) feed_answer = fields.EmbeddedField(VideoRoomControlFeedAnswerRequest, required=False) feed_detach = fields.EmbeddedField(VideoRoomControlFeedDetachRequest, required=False) invite_participants = fields.EmbeddedField(VideoRoomControlInviteParticipantsRequest, required=False) class VideoRoomTerminateRequest(VideoRoomRequestBase): sylkrtc = DefaultValueField('videoroom-terminate') - diff --git a/sylk/applications/webrtcgateway/push.py b/sylk/applications/webrtcgateway/push.py index 7a27baa..b56b338 100644 --- a/sylk/applications/webrtcgateway/push.py +++ b/sylk/applications/webrtcgateway/push.py @@ -1,88 +1,81 @@ import json from sipsimple.util import ISOTimestamp from twisted.internet import defer, reactor from twisted.web.client import Agent from twisted.web.iweb import IBodyProducer from twisted.web.http_headers import Headers from zope.interface import implementer from sylk.applications.webrtcgateway.configuration import GeneralConfig from sylk.applications.webrtcgateway.logger import log -__all__ = ['incoming_session', 'missed_session'] + +__all__ = 'incoming_session', 'missed_session' agent = Agent(reactor) headers = Headers({'User-Agent': ['SylkServer'], 'Content-Type': ['application/json'], 'Authorization': ['key=%s' % GeneralConfig.firebase_server_key]}) FIREBASE_API_URL = 'https://fcm.googleapis.com/fcm/send' @implementer(IBodyProducer) class StringProducer(object): def __init__(self, data): self.body = data self.length = len(data) def startProducing(self, consumer): consumer.write(self.body) return defer.succeed(None) def pauseProducing(self): pass def stopProducing(self): pass def incoming_session(originator, destination, tokens): for token in tokens: - data = {'to': token, - 'notification': {}, - 'data': {'sylkrtc': {}}, - 'content_available': True - } + data = dict(to=token, notification={}, data={'sylkrtc': {}}, content_available=True) data['notification']['body'] = 'Incoming call from %s' % originator data['notification']['sound'] = 'Blow' data['priority'] = 'high' data['time_to_live'] = 60 # don't deliver if phone is out for over a minute data['data']['sylkrtc']['event'] = 'incoming_session' data['data']['sylkrtc']['originator'] = originator data['data']['sylkrtc']['destination'] = destination data['data']['sylkrtc']['timestamp'] = str(ISOTimestamp.now()) _send_push_notification(json.dumps(data)) def missed_session(originator, destination, tokens): for token in tokens: - data = {'to': token, - 'notification': {}, - 'data': {'sylkrtc': {}}, - 'content_available': True - } + data = dict(to=token, notification={}, data={'sylkrtc': {}}, content_available=True) data['notification']['body'] = 'Missed call from %s' % originator data['notification']['sound'] = 'Blow' data['priority'] = 'high' # No TTL, default is 4 weeks data['data']['sylkrtc']['event'] = 'missed_session' data['data']['sylkrtc']['originator'] = originator data['data']['sylkrtc']['destination'] = destination data['data']['sylkrtc']['timestamp'] = str(ISOTimestamp.now()) _send_push_notification(json.dumps(data)) @defer.inlineCallbacks def _send_push_notification(payload): if GeneralConfig.firebase_server_key: try: r = yield agent.request('POST', FIREBASE_API_URL, headers, StringProducer(payload)) except Exception as e: log.info('Error sending Firebase message: %s', e) else: if r.code != 200: log.warn('Error sending Firebase message: %s' % r.phrase) else: log.warn('Cannot send push notification: no Firebase server key configured') diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py index 49f9c04..4f86977 100644 --- a/sylk/applications/webrtcgateway/storage.py +++ b/sylk/applications/webrtcgateway/storage.py @@ -1,49 +1,50 @@ -__all__ = ['TokenStorage'] - import cPickle as pickle import os from application.python.types import Singleton from collections import defaultdict from sipsimple.threading import run_in_thread from sylk.configuration import ServerConfig +__all__ = 'TokenStorage', + + # TODO: This implementation is a prototype. It should be refactored to store tokens in a # distributed DB so other SylkServer instances can access them. Also add some metadata # like the modification date so we know when a token was refreshed, and thus it's ok to # scrap it after a reasonable amount of time. class TokenStorage(object): __metaclass__ = Singleton def __init__(self): self._tokens = defaultdict(set) @run_in_thread('file-io') def _save(self): with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) @run_in_thread('file-io') def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): return self._tokens[key] def add(self, account, token): self._tokens[account].add(token) self._save() def remove(self, account, token): self._tokens[account].discard(token) self._save() diff --git a/sylk/applications/webrtcgateway/web.py b/sylk/applications/webrtcgateway/web.py index 635f854..ecf0282 100644 --- a/sylk/applications/webrtcgateway/web.py +++ b/sylk/applications/webrtcgateway/web.py @@ -1,181 +1,182 @@ + import json from application.python.types import Singleton from autobahn.twisted.resource import WebSocketResource from twisted.internet import reactor from twisted.web.server import Site from sylk import __version__ as sylk_version from sylk.applications.webrtcgateway import push from sylk.applications.webrtcgateway.configuration import GeneralConfig, JanusConfig from sylk.applications.webrtcgateway.factory import SylkWebSocketServerFactory from sylk.applications.webrtcgateway.janus.backend import JanusBackend from sylk.applications.webrtcgateway.logger import log from sylk.applications.webrtcgateway.protocol import SYLK_WS_PROTOCOL from sylk.applications.webrtcgateway.storage import TokenStorage from sylk.resources import Resources from sylk.web import Klein, StaticFileResource, server __all__ = 'WebHandler', 'AdminWebHandler' class WebRTCGatewayWeb(object): __metaclass__ = Singleton app = Klein() _resource = None _ws_resource = None def __init__(self, ws_factory): self._ws_resource = WebSocketResource(ws_factory) def resource(self): if self._resource is None: self._resource = self.app.resource() return self._resource @app.route('/') def index(self, request): path = Resources.get('html/webrtcgateway/index.html') r = StaticFileResource(path) r.isLeaf = True return r @app.route('/ws') def ws(self, request): return self._ws_resource class WebHandler(object): def __init__(self): self.backend = None self.factory = None self.resource = None self.web = None def start(self): ws_url = 'ws' + server.url[4:] + '/webrtcgateway/ws' self.factory = SylkWebSocketServerFactory(ws_url, protocols=[SYLK_WS_PROTOCOL], server='SylkServer/%s' % sylk_version) self.factory.setProtocolOptions(allowedOrigins=GeneralConfig.web_origins, autoPingInterval=GeneralConfig.websocket_ping_interval, autoPingTimeout=GeneralConfig.websocket_ping_interval/2) self.web = WebRTCGatewayWeb(self.factory) server.register_resource('webrtcgateway', self.web.resource()) log.info('WebSocket handler started at %s' % ws_url) log.info('Allowed web origins: %s' % ', '.join(GeneralConfig.web_origins)) log.info('Allowed SIP domains: %s' % ', '.join(GeneralConfig.sip_domains)) log.info('Using Janus API: %s' % JanusConfig.api_url) self.backend = JanusBackend() self.backend.start() self.factory.backend = self.backend def stop(self): if self.factory is not None: for conn in self.factory.connections.copy(): conn.dropConnection(abort=True) self.factory = None if self.backend is not None: self.backend.stop() self.backend = None # TODO: This implementation is a prototype. Moving forward it probably makes sense to provide admin API # capabilities for other applications too. This could be done in a number of ways: # # * On the main web server, under a /admin/ parent route. # * On a separate web server, which could listen on a different IP and port. # # In either case, HTTPS aside, a token based authentication mechanism would be desired. # Which one is best is not 100% clear at this point. class AuthError(Exception): pass class AdminWebHandler(object): __metaclass__ = Singleton app = Klein() def __init__(self): self.listener = None def start(self): host, port = GeneralConfig.http_management_interface # noinspection PyUnresolvedReferences self.listener = reactor.listenTCP(port, Site(self.app.resource()), interface=host) log.info('Admin web handler started at http://%s:%d' % (host, port)) def stop(self): if self.listener is not None: self.listener.stopListening() self.listener = None # Admin web API def _check_auth(self, request): auth_secret = GeneralConfig.http_management_auth_secret if auth_secret: auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None) if not auth_headers or auth_headers[0] != auth_secret: raise AuthError() @app.handle_errors(AuthError) def auth_error(self, request, failure): request.setResponseCode(403) return 'Authentication error' @app.route('/incoming_session', methods=['POST']) def incoming_session(self, request): self._check_auth(request) request.setHeader('Content-Type', 'application/json') try: data = json.load(request.content) originator = data['originator'] destination = data['destination'] except Exception as e: return json.dumps({'success': False, 'error': str(e)}) else: storage = TokenStorage() tokens = storage[destination] push.incoming_session(originator, destination, tokens) return json.dumps({'success': True}) @app.route('/missed_session', methods=['POST']) def missed_session(self, request): self._check_auth(request) request.setHeader('Content-Type', 'application/json') try: data = json.load(request.content) originator = data['originator'] destination = data['destination'] except Exception as e: return json.dumps({'success': False, 'error': str(e)}) else: storage = TokenStorage() tokens = storage[destination] push.missed_session(originator, destination, tokens) return json.dumps({'success': True}) @app.route('/tokens/') def get_tokens(self, request, account): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() tokens = storage[account] return json.dumps({'tokens': list(tokens)}) @app.route('/tokens//', methods=['POST', 'DELETE']) def process_token(self, request, account, token): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() if request.method == 'POST': storage.add(account, token) elif request.method == 'DELETE': storage.remove(account, token) return json.dumps({'success': True}) diff --git a/sylk/applications/xmppgateway/datatypes.py b/sylk/applications/xmppgateway/datatypes.py index 90c2759..f8f9a2f 100644 --- a/sylk/applications/xmppgateway/datatypes.py +++ b/sylk/applications/xmppgateway/datatypes.py @@ -1,153 +1,156 @@ import hashlib import random import string from application.python.descriptor import WriteOnceAttribute from sipsimple.core import BaseSIPURI, SIPURI, SIPCoreError from twisted.words.protocols.jabber.jid import JID sylkserver_prefix = hashlib.md5('sylkserver').hexdigest() + def generate_sylk_resource(): r = 'sylk-'+''.join(random.choice(string.ascii_letters+string.digits) for x in range(32)) return r.encode('hex') + def is_sylk_resource(r): if r.startswith('urn:uuid:') or len(r) != 74: return False try: decoded = r.decode('hex') except TypeError: return False else: return decoded.startswith('sylk-') + def encode_resource(r): return r.encode('utf-8').encode('hex') + def decode_resource(r): return r.decode('hex').decode('utf-8') class BaseURI(object): def __init__(self, user, host, resource=None): self.user = user self.host = host self.resource = resource @classmethod def parse(cls, value): if isinstance(value, BaseSIPURI): user = unicode(value.user) host = unicode(value.host) resource = unicode(value.parameters.get('gr', '')) or None return cls(user, host, resource) elif isinstance(value, JID): user = value.user host = value.host resource = value.resource return cls(user, host, resource) elif not isinstance(value, basestring): raise TypeError('uri needs to be a string') if not value.startswith(('sip:', 'sips:', 'xmpp:')): raise ValueError('invalid uri scheme for %s' % value) if value.startswith(('sip:', 'sips:')): try: uri = SIPURI.parse(value) except SIPCoreError: raise ValueError('invalid SIP uri: %s' % value) user = unicode(uri.user) host = unicode(uri.host) resource = unicode(uri.parameters.get('gr', '')) or None else: try: jid = JID(value[5:]) except Exception: raise ValueError('invalid XMPP uri: %s' % value) user = jid.user host = jid.host resource = jid.resource return cls(user, host, resource) @classmethod def new(cls, uri): if not isinstance(uri, BaseURI): raise TypeError('%s is not a valid URI type' % type(uri)) return cls(uri.user, uri.host, uri.resource) def as_sip_uri(self): uri = SIPURI(user=str(self.user), host=str(self.host)) if self.resource is not None: uri.parameters['gr'] = self.resource.encode('utf-8') return uri def as_xmpp_jid(self): return JID(tuple=(self.user, self.host, self.resource)) def __eq__(self, other): if isinstance(other, BaseURI): return self.user == other.user and self.host == other.host and self.resource == other.resource elif isinstance(other, basestring): try: other = BaseURI.parse(other) except ValueError: return False else: return self.user == other.user and self.host == other.host and self.resource == other.resource else: return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal def __repr__(self): return '%s(user=%r, host=%r, resource=%r)' % (self.__class__.__name__, self.user, self.host, self.resource) def __unicode__(self): return u'%s@%s' % (self.user, self.host) def __str__(self): return unicode(self).encode('utf-8') class URI(BaseURI): pass class FrozenURI(BaseURI): user = WriteOnceAttribute() host = WriteOnceAttribute() resource = WriteOnceAttribute() def __hash__(self): return hash((self.user, self.host, self.resource)) class Identity(object): def __init__(self, uri, display_name=None): self.uri = uri self.display_name = display_name def __eq__(self, other): if isinstance(other, Identity): return self.uri == other.uri and self.display_name == other.display_name else: return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal def __unicode__(self): if self.display_name is not None: return u'%s <%s>' % (self.display_name, self.uri) else: return u'%s' % self.uri def __str__(self): return unicode(self).encode('utf-8') - diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py index 87cfcb3..67d6888 100644 --- a/sylk/applications/xmppgateway/im.py +++ b/sylk/applications/xmppgateway/im.py @@ -1,442 +1,447 @@ from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from collections import deque from eventlib import coros from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Message as SIPMessageRequest from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.msrp.chat import ChatIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage from sylk.session import Session -__all__ = ['ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError'] +__all__ = 'ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError' SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout class ChatSessionHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self): self.started = False self.ended = False self.sip_session = None self.msrp_stream = None self._sip_session_timer = None self.use_receipts = False self.xmpp_session = None self._xmpp_message_queue = deque() self._pending_msrp_chunks = {} self._pending_xmpp_stanzas = {} + def _get_started(self): + return self.__dict__['started'] + def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('ChatSessionDidStart', sender=self) self._send_queued_messages() - def _get_started(self): - return self.__dict__['started'] + started = property(_get_started, _set_started) del _get_started, _set_started + def _get_xmpp_session(self): + return self.__dict__['xmpp_session'] + def _set_xmpp_session(self, session): self.__dict__['xmpp_session'] = session if session is not None: # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) NotificationCenter().add_observer(self, sender=session) session.start() # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) - def _get_xmpp_session(self): - return self.__dict__['xmpp_session'] + xmpp_session = property(_get_xmpp_session, _set_xmpp_session) del _get_xmpp_session, _set_xmpp_session @classmethod def new_from_sip_session(cls, sip_identity, session): instance = cls() instance.sip_identity = sip_identity instance._start_incoming_sip_session(session) return instance @classmethod def new_from_xmpp_stanza(cls, xmpp_identity, recipient): instance = cls() instance.xmpp_identity = xmpp_identity instance._start_outgoing_sip_session(recipient) return instance @run_in_green_thread def _start_incoming_sip_session(self, session): self.sip_session = session self.msrp_stream = next(stream for stream in session.proposed_streams if stream.type=='chat') notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.accept([self.msrp_stream]) @run_in_green_thread def _start_outgoing_sip_session(self, target_uri): notification_center = NotificationCenter() # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = target_uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return self.msrp_stream = MediaStreamRegistry.get('chat')() route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self.msrp_stream]) def end(self): if self.ended: return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.cancel() self._sip_session_timer = None notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session.end() self.sip_session = None self.msrp_stream = None if self.xmpp_session is not None: notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session.end() self.xmpp_session = None self.ended = True if self.started: notification_center.post_notification('ChatSessionDidEnd', sender=self) else: notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='Ended before actually started')) def enqueue_xmpp_message(self, message): self._xmpp_message_queue.append(message) if self.started: self._send_queued_messages() def _send_queued_messages(self): sender = None while self._xmpp_message_queue: message = self._xmpp_message_queue.popleft() if message.body is None: continue sender_uri = message.sender.uri.as_sip_uri() sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8')) sender = ChatIdentity(sender_uri) self.msrp_stream.send_message(message.body, 'text/plain', sender=sender, message_id=str(message.id), notify_progress=message.use_receipt) if sender: self.msrp_stream.send_composing_indication('idle', 30, sender=sender) def _inactivity_timeout(self): log.info("Ending SIP session %s due to inactivity" % self.sip_session.call_id) self.sip_session.end() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.info("SIP session %s started" % self.sip_session.call_id) self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout) if self.sip_session.direction == 'outgoing': # Time to set sip_identity and create the XMPPChatSession contact_uri = self.sip_session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = self.sip_session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name) session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) self.xmpp_session = session # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: if self.xmpp_session is not None: # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: # Try to wakeup XMPP clients sender = self.sip_identity tmp = self.sip_session.local_identity.uri recipient_uri = FrozenURI(tmp.user, tmp.host) recipient = Identity(recipient_uri) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, ' ', 'text/plain')) # Send queued messages self._send_queued_messages() def _NH_SIPSessionDidEnd(self, notification): log.info("SIP session %s ended" % self.sip_session.call_id) notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.info("SIP session %s failed" % self.sip_session.call_id) notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.content else: html_body = message.content body = None if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) chunk = notification.data.chunk if self.started: self.xmpp_session.send_message(body, html_body, message_id=chunk.message_id) if self.use_receipts: self._pending_msrp_chunks[chunk.message_id] = chunk else: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') else: sender = self.sip_identity recipient_uri = FrozenURI.parse(message.recipients[0].uri) recipient = Identity(recipient_uri, message.recipients[0].display_name) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, body, html_body)) self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_ChatStreamGotComposingIndication(self, notification): # Notification is sent by the MSRP stream if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) if not self.started: return state = None if notification.data.state == 'active': state = 'composing' elif notification.data.state == 'idle': state = 'paused' if state is not None: self.xmpp_session.send_composing_indication(state) def _NH_ChatStreamDidDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_ChatStreamDidNotDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_error(message, 'TODO', []) # TODO def _NH_XMPPChatSessionDidStart(self, notification): if self.sip_session is not None: # Session is now established on both ends self.started = True def _NH_XMPPChatSessionDidEnd(self, notification): notification.center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session = None self.end() def _NH_XMPPChatSessionGotMessage(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': self._xmpp_message_queue.append(notification.data.message) return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = ChatIdentity(sender_uri) self.use_receipts = message.use_receipt if not message.use_receipt: notify_progress = False else: notify_progress = True self._pending_xmpp_stanzas[message.id] = message # Prefer plaintext self.msrp_stream.send_message(message.body, 'text/plain', sender=sender, message_id=str(message.id), notify_progress=notify_progress) self.msrp_stream.send_composing_indication('idle', 30, sender=sender) def _NH_XMPPChatSessionGotComposingIndication(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message state = None if message.state == 'composing': state = 'active' elif message.state == 'paused': state = 'idle' if state is not None: sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = ChatIdentity(sender_uri) self.msrp_stream.send_composing_indication(state, 30, sender=sender) if message.use_receipt: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_XMPPChatSessionDidDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason) def chunks(text, size): for i in xrange(0, len(text), size): yield text[i:i+size] + class SIPMessageError(Exception): def __init__(self, code, reason): Exception.__init__(self, reason) self.code = code self.reason = reason + class SIPMessageSender(object): implements(IObserver) def __init__(self, message): # TODO: sometimes we may want to send it to the GRUU, for example when a XMPP client # replies to one of our messages. MESSAGE requests don't need a Contact header, though # so how should we communicate our GRUU to the recipient? self.from_uri = message.sender.uri.as_sip_uri() self.from_uri.parameters.pop('gr', None) # No GRUU in From header self.to_uri = message.recipient.uri.as_sip_uri() self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU self.body = message.body self.content_type = 'text/plain' self._requests = set() self._channel = coros.queue() @run_in_waitable_green_thread def send(self): lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: msg = 'DNS lookup error while looking for %s proxy' % uri log.warning(msg) raise SIPMessageError(0, msg) else: route = routes.pop(0) from_header = FromHeader(self.from_uri) to_header = ToHeader(self.to_uri) route_header = RouteHeader(route.uri) notification_center = NotificationCenter() for chunk in chunks(self.body, 1000): request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body) notification_center.add_observer(self, sender=request) self._requests.add(request) request.send() error = None count = len(self._requests) while count > 0: notification = self._channel.wait() if notification.name == 'SIPMessageDidFail': error = (notification.data.code, notification.data.reason) count -= 1 self._requests.clear() if error is not None: raise SIPMessageError(*error) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPMessageDidSucceed(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._channel.send(notification) def _NH_SIPMessageDidFail(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._channel.send(notification) - diff --git a/sylk/applications/xmppgateway/media.py b/sylk/applications/xmppgateway/media.py index fab365c..99df9c6 100644 --- a/sylk/applications/xmppgateway/media.py +++ b/sylk/applications/xmppgateway/media.py @@ -1,326 +1,326 @@ from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib.twistedutil import block_on from sipsimple.audio import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import FromHeader, ToHeader from sipsimple.core import SIPURI, SIPCoreError from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry as SIPMediaStreamRegistry from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry as JingleMediaStreamRegistry from sylk.applications.xmppgateway.xmpp.stanzas import jingle from sylk.session import Session -__all__ = ['MediaSessionHandler'] +__all__ = 'MediaSessionHandler', class MediaSessionHandler(object): implements(IObserver) def __init__(self): self.started = False self.ended = False self._sip_identity = None self._xmpp_identity = None self._audio_bidge = AudioConference() self.sip_session = None self.jingle_session = None @classmethod def new_from_sip_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = JingleMediaStreamRegistry.get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject(488) return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) instance._sip_identity = Identity(sip_leg_uri) # Get URI representing the XMPP side request_uri = session.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) instance._xmpp_identity = Identity(xmpp_leg_uri) instance.sip_session = session instance._start_outgoing_jingle_session(streams) return instance @classmethod def new_from_jingle_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = SIPMediaStreamRegistry.get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject('unsupported-applications') return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) instance._xmpp_identity = session.remote_identity instance._sip_identity = session.local_identity instance.jingle_session = session instance._start_outgoing_sip_session(streams) return instance @property def sip_identity(self): return self._sip_identity @property def xmpp_identity(self): return self._xmpp_identity def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('MediaSessionHandlerDidStart', sender=self) def _get_started(self): return self.__dict__['started'] started = property(_get_started, _set_started) del _get_started, _set_started @run_in_green_thread def _start_outgoing_sip_session(self, streams): notification_center = NotificationCenter() # self.xmpp_identity is our local identity on the SIP side from_uri = self.xmpp_identity.uri.as_sip_uri() from_uri.parameters.pop('gr', None) # no GRUU in From header to_uri = self.sip_identity.uri.as_sip_uri() to_uri.parameters.pop('gr', None) # no GRUU in To header # TODO: need to fix GRUU in the proxy #contact_uri = self.xmpp_identity.uri.as_sip_uri() #contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('MedialSessionHandlerDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) self.sip_session.connect(from_header, to_header, route=route, streams=streams) @run_in_green_thread def _start_outgoing_jingle_session(self, streams): if self.xmpp_identity.uri.resource is not None: self.sip_session.reject() return xmpp_manager = XMPPManager() local_jid = self.sip_identity.uri.as_xmpp_jid() remote_jid = self.xmpp_identity.uri.as_xmpp_jid() # If this was an invitation to a conference, use the information in the Referred-By header if self.sip_identity.uri.host in xmpp_manager.muc_domains and self.sip_session.transfer_info and self.sip_session.transfer_info.referred_by: try: referred_by_uri = SIPURI.parse(self.sip_session.transfer_info.referred_by) except SIPCoreError: self.sip_session.reject(488) return else: inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) local_jid = inviter_uri.as_xmpp_jid() # Use disco to gather potential JIDs to call d = xmpp_manager.disco_client_protocol.requestItems(remote_jid, sender=local_jid) try: items = block_on(d) except Exception: items = [] if not items: self.sip_session.reject(480) return # Check which items support Jingle valid = [] for item in items: d = xmpp_manager.disco_client_protocol.requestInfo(item.entity, nodeIdentifier=item.nodeIdentifier, sender=local_jid) try: info = block_on(d) except Exception: continue if jingle.NS_JINGLE in info.features and jingle.NS_JINGLE_APPS_RTP in info.features: valid.append(item.entity) if not valid: self.sip_session.reject(480) return # TODO: start multiple sessions? self._xmpp_identity = Identity(FrozenURI.parse(valid[0])) notification_center = NotificationCenter() if self.sip_identity.uri.host in xmpp_manager.muc_domains: self.jingle_session = JingleSession(xmpp_manager.jingle_coin_protocol) else: self.jingle_session = JingleSession(xmpp_manager.jingle_protocol) notification_center.add_observer(self, sender=self.jingle_session) self.jingle_session.connect(self.sip_identity, self.xmpp_identity, streams, is_focus=self.sip_session.remote_focus) def end(self): if self.ended: return notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) if self.sip_session.direction == 'incoming' and not self.started: self.sip_session.reject() else: self.sip_session.end() self.sip_session = None if self.jingle_session is not None: notification_center.remove_observer(self, sender=self.jingle_session) if self.jingle_session.direction == 'incoming' and not self.started: self.jingle_session.reject() else: self.jingle_session.end() self.jingle_session = None self.ended = True if self.started: notification_center.post_notification('MediaSessionHandlerDidEnd', sender=self) else: notification_center.post_notification('MediaSessionHandlerDidFail', sender=self) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.info("SIP session %s started" % self.sip_session.call_id) if self.sip_session.direction == 'outgoing': # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.jingle_session.accept(self.jingle_session.proposed_streams, is_focus=self.sip_session.remote_focus) else: # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) def _NH_SIPSessionDidEnd(self, notification): log.info("SIP session %s ended" % self.sip_session.call_id) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionDidFail(self, notification): log.info("SIP session %s failed (%s)" % (self.sip_session.call_id, notification.data.reason)) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_SIPSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.jingle_session.hold() else: self.jingle_session.unhold() def _NH_SIPSessionGotConferenceInfo(self, notification): self.jingle_session._send_conference_info(notification.data.conference_info.toxml()) def _NH_JingleSessionDidStart(self, notification): log.info("Jingle session %s started" % notification.sender.id) if self.jingle_session.direction == 'incoming': # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) else: # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.sip_session.accept(self.sip_session.proposed_streams) def _NH_JingleSessionDidEnd(self, notification): log.info("Jingle session %s ended" % notification.sender.id) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidFail(self, notification): log.info("Jingle session %s failed (%s)" % (notification.sender.id, notification.data.reason)) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.sip_session.hold() else: self.sip_session.unhold() diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py index 52eab6f..78c8626 100644 --- a/sylk/applications/xmppgateway/muc.py +++ b/sylk/applications/xmppgateway/muc.py @@ -1,465 +1,466 @@ import uuid from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute from eventlib import coros, proc from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError, Referral, sip_status_messages from sipsimple.core import ContactHeader, FromHeader, ToHeader, ReferToHeader, RouteHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.msrp.chat import ChatStreamError, ChatIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from time import time from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, OutgoingInvitationMessage, STANZAS_NS from sylk.configuration import SIPConfig from sylk.session import Session class ReferralError(Exception): def __init__(self, error, code=0): self.error = error self.code = code class SIPReferralDidFail(Exception): def __init__(self, data): self.data = data class MucInvitationFailure(object): def __init__(self, code, reason): self.code = code self.reason = reason + def __str__(self): return '%s (%s)' % (self.code, self.reason) class X2SMucInvitationHandler(object): implements(IObserver) def __init__(self, sender, recipient, participant): self.sender = sender self.recipient = recipient self.participant = participant self.active = False self.route = None self._channel = coros.queue() self._referral = None self._failure = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='NetworkConditionsDidChange') proc.spawn(self._run) notification_center.post_notification('X2SMucInvitationHandlerDidStart', sender=self) def _run(self): notification_center = NotificationCenter() settings = SIPSimpleSettings() sender_uri = self.sender.uri.as_sip_uri() recipient_uri = self.recipient.uri.as_sip_uri() participant_uri = self.participant.uri.as_sip_uri() try: # Lookup routes account = DefaultAccount() if account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(recipient_uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: raise ReferralError(error='DNS lookup failed: %s' % e) timeout = time() + 30 for route in routes: self.route = route remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) refer_to_header = ReferToHeader(str(participant_uri)) refer_to_header.parameters['method'] = 'INVITE' referral = Referral(recipient_uri, FromHeader(sender_uri), ToHeader(recipient_uri), refer_to_header, ContactHeader(contact_uri), RouteHeader(route.uri), account.credentials) notification_center.add_observer(self, sender=referral) try: referral.send_refer(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=referral) timeout = 5 raise ReferralError(error='Internal error') self._referral = referral try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidStart': break except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=referral) self._referral = None if e.data.code in (403, 405): raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code) else: # Otherwise just try the next route continue else: break else: self.route = None raise ReferralError(error='No more routes to try') # At this point it is subscribed. Handle notifications and ending/failures. try: self.active = True while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidEnd': break except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=self._referral) raise ReferralError(error=e.data.reason, code=e.data.code) else: notification_center.remove_observer(self, sender=self._referral) finally: self.active = False except ReferralError as e: self._failure = MucInvitationFailure(e.code, e.error) finally: notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._referral = None if self._failure is not None: notification_center.post_notification('X2SMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure)) else: notification_center.post_notification('X2SMucInvitationHandlerDidEnd', sender=self) def _refresh(self): account = DefaultAccount() transport = self.route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) contact_header = ContactHeader(contact_uri) self._referral.refresh(contact_header=contact_header, timeout=2) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPReferralDidStart(self, notification): self._channel.send(notification) def _NH_SIPReferralDidEnd(self, notification): self._channel.send(notification) def _NH_SIPReferralDidFail(self, notification): self._channel.send_exception(SIPReferralDidFail(notification.data)) def _NH_SIPReferralGotNotify(self, notification): self._channel.send(notification) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._refresh() class S2XMucInvitationHandler(object): implements(IObserver) def __init__(self, session, sender, recipient, inviter): self.session = session self.sender = sender self.recipient = recipient self.inviter = inviter self._timer = None self._failure = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) stanza = OutgoingInvitationMessage(self.sender, self.recipient, self.inviter, id='MUC.'+uuid.uuid4().hex) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) self._timer = reactor.callLater(90, self._timeout) notification_center.post_notification('S2XMucInvitationHandlerDidStart', sender=self) def stop(self): if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None notification_center = NotificationCenter() if self.session is not None: notification_center.remove_observer(self, sender=self.session) reactor.callLater(5, self._end_session, self.session) self.session = None if self._failure is not None: notification_center.post_notification('S2XMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure)) else: notification_center.post_notification('S2XMucInvitationHandlerDidEnd', sender=self) def _end_session(self, session): try: session.end(480) except Exception: pass def _timeout(self): NotificationCenter().remove_observer(self, sender=self.session) try: self.session.end(408) except Exception: pass self.session = None self._failure = MucInvitationFailure('Timeout', 408) self.stop() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidFail(self, notification): notification.center.remove_observer(self, sender=self.session) self.session = None self._failure = MucInvitationFailure(notification.data.reason or notification.data.failure_reason, notification.data.code) self.stop() class X2SMucHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity, nickname): self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.nickname = nickname self._xmpp_muc_session = None self._sip_session = None self._msrp_stream = None self._first_stanza = None self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza self._participants = set() # set of (URI, nickname) tuples self.ended = False def start(self): notification_center = NotificationCenter() self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session.start() notification_center.post_notification('X2SMucHandlerDidStart', sender=self) self._start_sip_session() def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_muc_session is not None: notification_center.remove_observer(self, sender=self._xmpp_muc_session) # Send indication that the user has been kicked from the room sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('307') xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) self._xmpp_muc_session.end() self._xmpp_muc_session = None if self._sip_session is not None: notification_center.remove_observer(self, sender=self._sip_session) self._sip_session.end() self._sip_session = None self.ended = True notification_center.post_notification('X2SMucHandlerDidEnd', sender=self) @run_in_green_thread def _start_sip_session(self): # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = self.sip_identity.uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) self.end() return self._msrp_stream = MediaStreamRegistry.get('chat')() route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self._sip_session = Session(account) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._sip_session) notification_center.add_observer(self, sender=self._msrp_stream) self._sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self._msrp_stream]) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.info("SIP multiparty session %s started" % self._sip_session.call_id) if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed: self.end() return message_id = self._msrp_stream.set_local_nickname(self.nickname) self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza) self._first_stanza = None def _NH_SIPSessionDidEnd(self, notification): log.info("SIP multiparty session %s ended" % self._sip_session.call_id) notification.center.remove_observer(self, sender=self._sip_session) notification.center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.info("SIP multiparty session %s failed" % self._sip_session.call_id) notification.center.remove_observer(self, sender=self._sip_session) notification.center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self._sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self._sip_session.reject_transfer(403) def _NH_SIPSessionGotConferenceInfo(self, notification): # Translate to XMPP payload xmpp_manager = XMPPManager() own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host) conference_info = notification.data.conference_info new_participants = set() for user in conference_info.users: user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity) nickname = user.display_text.value if user.display_text else user.entity new_participants.add((user_uri, nickname)) # Remove participants that are no longer in the room for uri, nickname in self._participants - new_participants: sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) xmpp_manager.send_muc_stanza(stanza) # Send presence for current participants for uri, nickname in new_participants: if uri == own_uri: continue sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = Identity(uri) xmpp_manager.send_muc_stanza(stanza) self._participants = new_participants # Send own status last sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('110') xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream if not self._xmpp_muc_session: return message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.content else: html_body = message.content body = None resource = message.sender.display_name or str(message.sender.uri) sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource)) self._xmpp_muc_session.send_message(sender, body, html_body, message_id='MUC.'+uuid.uuid4().hex) self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') def _NH_ChatStreamDidSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) self.nickname = nickname def _NH_ChatStreamDidNotSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', [('conflict', STANZAS_NS)]) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(error_stanza) def _NH_ChatStreamDidDeliverMessage(self, notification): # Echo back the message to the sender stanza = self._pending_messages_map.pop(notification.data.message_id) stanza.sender, stanza.recipient = stanza.recipient, stanza.sender stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamDidNotDeliverMessage(self, notification): self._pending_messages_map.pop(notification.data.message_id) def _NH_XMPPIncomingMucSessionDidEnd(self, notification): notification.center.remove_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session = None self.end() def _NH_XMPPIncomingMucSessionGotMessage(self, notification): if not self._sip_session: return message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = ChatIdentity(sender_uri, display_name=self.nickname) message_id = self._msrp_stream.send_message(message.body, 'text/plain', sender=sender) self._pending_messages_map[message_id] = message # Message will be echoed back to the sender on ChatStreamDidDeliverMessage def _NH_XMPPIncomingMucSessionChangedNickname(self, notification): if not self._sip_session: return nickname = notification.data.nickname try: message_id = self._msrp_stream.set_local_nickname(nickname) except ChatStreamError: return self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza) diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py index db3a3a8..68e58a1 100644 --- a/sylk/applications/xmppgateway/presence.py +++ b/sylk/applications/xmppgateway/presence.py @@ -1,520 +1,521 @@ import hashlib import random from application.notification import IObserver, NotificationCenter from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute from eventlib import coros, proc from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Subscription from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import pidf, rpid, caps from sipsimple.payloads import ParserError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import ISOTimestamp from time import time from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.util import format_uri from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscription, XMPPIncomingSubscription from sylk.configuration import SIPConfig -__all__ = ['S2XPresenceHandler', 'X2SPresenceHandler'] +__all__ = 'S2XPresenceHandler', 'X2SPresenceHandler' class S2XPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self._sip_subscriptions = [] self._stanza_cache = {} self._pidf = None self._xmpp_subscription = None self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None while self._sip_subscriptions: subscription = self._sip_subscriptions.pop() notification_center.remove_observer(self, sender=subscription) try: subscription.end() except SIPCoreError: pass self.ended = True notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self) def add_sip_subscription(self, subscription): # If s subscription is received after the handle has ended but before # S2XPresenceHandlerDidEnd has been processed we need to ignore it and wait for a retransmission # which we will handle by creating a new S2XPresenceHandler if self.ended: return self._sip_subscriptions.append(subscription) NotificationCenter().add_observer(self, sender=subscription) if self._xmpp_subscription.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None try: subscription.accept(content_type, pidf_doc) except SIPCoreError as e: log.warning('Error accepting SIP subscription: %s' % e) subscription.end() else: try: subscription.accept_pending() except SIPCoreError as e: log.warning('Error accepting SIP subscription: %s' % e) subscription.end() if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s added to presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _build_pidf(self): if not self._stanza_cache: self._pidf = None return None pidf_doc = pidf.PIDF(str(self.xmpp_identity)) uri = next(self._stanza_cache.iterkeys()) person = pidf.Person("PID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()) person.activities = rpid.Activities() pidf_doc.add(person) for stanza in self._stanza_cache.itervalues(): if not stanza.available: status = pidf.Status('closed') status.extended = 'offline' else: status = pidf.Status('open') if stanza.show == 'away': status.extended = 'away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'xa': status.extended = 'away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'dnd': status.extended = 'busy' if 'busy' not in person.activities: person.activities.add('busy') else: status.extended = 'available' if stanza.sender.uri.resource: resource = encode_resource(stanza.sender.uri.resource) else: # Workaround for clients not sending the resource under certain (unknown) circumstances resource = hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest() service_id = "SID-%s" % resource sip_uri = stanza.sender.uri.as_sip_uri() sip_uri.parameters['gr'] = resource sip_uri.parameters['xmpp'] = None contact = pidf.Contact(str(sip_uri)) service = pidf.Service(service_id, status=status, contact=contact) service.add(pidf.DeviceID(resource)) service.device_info = pidf.DeviceInfo(resource, description=stanza.sender.uri.resource) service.timestamp = pidf.ServiceTimestamp(stanza.timestamp) service.capabilities = caps.ServiceCapabilities(text=True, message=True) for lang, note in stanza.statuses.iteritems(): service.notes.add(pidf.PIDFNote(note, lang=lang)) pidf_doc.add(service) if not person.activities: person.activities = None self._pidf = pidf_doc.toxml() return self._pidf @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification.center.remove_observer(self, sender=subscription) self._sip_subscriptions.remove(subscription) if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s removed from presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) if not self._sip_subscriptions: self.end() def _NH_SIPIncomingSubscriptionNotifyDidFail(self, notification): if XMPPGatewayConfig.log_presence: log.info('Sending SIP NOTIFY failed from %s to %s for presence flow 0x%x: %s (%s)' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), notification.data.code, notification.data.reason)) def _NH_SIPIncomingSubscriptionGotUnsubscribe(self, notification): if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s was terminated by user for presence flow 1x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _NH_SIPIncomingSubscriptionGotRefreshingSubscribe(self, notification): if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s was refreshed for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _NH_SIPIncomingSubscriptionDidTimeout(self, notification): if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s timed out for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _NH_XMPPSubscriptionChangedState(self, notification): if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'): subscription.accept(content_type, pidf_doc) def _NH_XMPPSubscriptionGotNotify(self, notification): stanza = notification.data.presence self._stanza_cache[stanza.sender.uri] = stanza stanza.timestamp = ISOTimestamp.now() # TODO: mirror the one in the stanza, if present pidf_doc = self._build_pidf() if XMPPGatewayConfig.log_presence: log.info('XMPP notification from %s to %s for presence flow 0x%x' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self))) for subscription in self._sip_subscriptions: try: subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc) except SIPCoreError as e: if XMPPGatewayConfig.log_presence: log.info('Failed to send SIP NOTIFY from %s to %s for presence flow 0x%x: %s' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), e)) if not stanza.available: # Only inform once about this device being unavailable del self._stanza_cache[stanza.sender.uri] def _NH_XMPPSubscriptionDidFail(self, notification): notification.center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription = None self.end() _NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail class InterruptSubscription(Exception): pass - class TerminateSubscription(Exception): pass + class SubscriptionError(Exception): def __init__(self, error, timeout, refresh_interval=None, fatal=False): self.error = error self.refresh_interval = refresh_interval self.timeout = timeout self.fatal = fatal + class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data + class X2SPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._sip_subscription = None self._sip_subscription_proc = None self._sip_subscription_timer = None self._xmpp_subscription = None def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() self._command_proc = proc.spawn(self._run) self._subscribe_sip() notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None if self._sip_subscription: self._unsubscribe_sip() self.ended = True notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self) @run_in_green_thread def _subscribe_sip(self): command = Command('subscribe') self._command_channel.send(command) @run_in_green_thread def _unsubscribe_sip(self): command = Command('unsubscribe') self._command_channel.send(command) command.wait() self._command_proc.kill() self._command_proc = None def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_subscribe(self, command): if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._sip_subscription_proc = None command.signal() def _process_pidf(self, body): try: pidf_doc = pidf.PIDF.parse(body) except ParserError as e: log.warn('Error parsing PIDF document: %s' % e) return # Build XML stanzas out of PIDF documents try: person = next(p for p in pidf_doc.persons) except StopIteration: person = None for service in pidf_doc.services: sip_contact = self.sip_identity.uri.as_sip_uri() if service.device_info is not None: sip_contact.parameters['gr'] = 'urn:uuid:%s' % service.device_info.id else: sip_contact.parameters['gr'] = service.id sender = Identity(FrozenURI.parse(sip_contact)) if service.status.extended is not None: available = service.status.extended != 'offline' else: available = service.status.basic == 'open' stanza = AvailabilityPresence(sender, self.xmpp_identity, available) for note in service.notes: stanza.statuses[note.lang] = note if service.status.extended is not None: if service.status.extended == 'away': stanza.show = 'away' elif service.status.extended == 'busy': stanza.show = 'dnd' elif person is not None and person.activities is not None: activities = set(list(person.activities)) if 'away' in activities: stanza.show = 'away' elif {'holiday', 'vacation'}.intersection(activities): stanza.show = 'xa' elif 'busy' in activities: stanza.show = 'dnd' self._xmpp_subscription.send_presence(stanza) def _sip_subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() account = DefaultAccount() refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval try: # Lookup routes if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) subscription_uri = self.sip_identity.uri.as_sip_uri() subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()), ToHeader(subscription_uri), ContactHeader(contact_uri), 'presence', RouteHeader(route.uri), refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) raise SubscriptionError(error='Internal error', timeout=5) self._sip_subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail as e: notification_center.remove_observer(self, sender=subscription) self._sip_subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120)) elif e.data.code == 403: # Forbidden raise SubscriptionError(error='Forbidden', timeout=None, fatal=True) elif e.data.code == 423: # Get the value of the Min-Expires header if e.data.min_expires is not None and e.data.min_expires > refresh_interval: interval = e.data.min_expires else: interval = None raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval) elif e.data.code in (405, 406, 489): raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True) elif e.data.code == 1400: raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True) else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, give up raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._sip_subscription: continue if self._xmpp_subscription is None: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'presence': subscription_state = notification.data.headers.get('Subscription-State').state if subscription_state == 'active' and self._xmpp_subscription.state != 'active': self._xmpp_subscription.accept() elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active': # The state went from active to pending, hide the presence state? pass if notification.data.body: if XMPPGatewayConfig.log_presence: log.info('SIP NOTIFY from %s to %s' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'))) self._process_pidf(notification.data.body) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail as e: if e.data.code == 0 and e.data.reason == 'rejected': self._xmpp_subscription.reject() else: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._sip_subscription) except InterruptSubscription as e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: notification_center.remove_observer(self, sender=self._sip_subscription) try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription as e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._sip_subscription) except SubscriptionError as e: if not e.fatal: self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval)) finally: self.subscribed = False self._sip_subscription = None self._sip_subscription_proc = None reactor.callLater(0, self.end) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification): self.end() def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification): if self._sip_subscription is not None and self._sip_subscription.state.lower() == 'active': self._xmpp_subscription.accept() _NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe - diff --git a/sylk/applications/xmppgateway/util.py b/sylk/applications/xmppgateway/util.py index a879317..7f5f2e2 100644 --- a/sylk/applications/xmppgateway/util.py +++ b/sylk/applications/xmppgateway/util.py @@ -1,30 +1,32 @@ import lxml.html import lxml.html.clean -__all__ = ['html2text', 'text2html', 'format_uri'] + +__all__ = 'html2text', 'text2html', 'format_uri' def html2text(data): try: doc = lxml.html.document_fromstring(data) cleaner = lxml.html.clean.Cleaner(style=True) doc = cleaner.clean_html(doc) return doc.text_content().strip('\n') except Exception: return '' xhtml_im_template = """ %(data)s """ + def text2html(data): return xhtml_im_template % {'data': data} def format_uri(uri, scheme=''): return '%s%s@%s' % ('' if not scheme else scheme+':', uri.user, uri.host) diff --git a/sylk/applications/xmppgateway/xmpp/jingle/streams/__init__.py b/sylk/applications/xmppgateway/xmpp/jingle/streams/__init__.py index dfb034f..0828038 100644 --- a/sylk/applications/xmppgateway/xmpp/jingle/streams/__init__.py +++ b/sylk/applications/xmppgateway/xmpp/jingle/streams/__init__.py @@ -1,120 +1,123 @@ """ This module automatically registers media streams to a stream registry allowing for a plug and play mechanism of various types of media negoticated in a SIP session that can be added to this library by using a generic API. For actual usage see rtp.py and msrp.py that implement media streams based on their respective RTP and MSRP protocols. """ - from operator import attrgetter from application.python.types import Singleton from zope.interface import Interface, Attribute class StreamError(Exception): pass class InvalidStreamError(StreamError): pass class UnknownStreamError(StreamError): pass # The MediaStream interface # class IMediaStream(Interface): type = Attribute("A string identifying the stream type (ex: audio, video, ...)") priority = Attribute("An integer value indicating the stream priority relative to the other streams types (higher numbers have higher priority).") session = Attribute("Session object to which this stream is attached") hold_supported = Attribute("True if the stream supports hold") on_hold_by_local = Attribute("True if the stream is on hold by the local party") on_hold_by_remote = Attribute("True if the stream is on hold by the remote") on_hold = Attribute("True if either on_hold_by_local or on_hold_by_remote is true") # this should be a classmethod, but zopeinterface complains if we decorate it with @classmethod -Dan def new_from_sdp(cls, session, remote_sdp, stream_index): pass def get_local_media(self, for_offer): pass def initialize(self, session, direction): pass def start(self, local_sdp, remote_sdp, stream_index): pass def deactivate(self): pass def end(self): pass def validate_update(self, remote_sdp, stream_index): pass def update(self, local_sdp, remote_sdp, stream_index): pass def hold(self): pass def unhold(self): pass def reset(self, stream_index): pass + # The MediaStream registry # class StreamDescriptor(object): def __init__(self, type): self.type = type + def __get__(self, obj, objtype): return self if obj is None else obj.get(self.type) + def __set__(self, obj, value): raise AttributeError('cannot set attribute') + def __delete__(self, obj): raise AttributeError('cannot delete attribute') class MediaStreamRegistry(object): __metaclass__ = Singleton def __init__(self): self.__types__ = [] def __iter__(self): return iter(self.__types__) def add(self, cls): if cls.priority is not None and cls not in self.__types__: self.__types__.append(cls) self.__types__.sort(key=attrgetter('priority'), reverse=True) setattr(self.__class__, cls.type.title().translate(None, ' -_') + 'Stream', StreamDescriptor(cls.type)) def get(self, type): try: return next(cls for cls in self.__types__ if cls.type == type) except StopIteration: raise UnknownStreamError("unknown stream type: %s" % type) MediaStreamRegistry = MediaStreamRegistry() class MediaStreamRegistrar(type): """Metaclass for adding a MediaStream to the media stream's class registry""" def __init__(cls, name, bases, dic): super(MediaStreamRegistrar, cls).__init__(name, bases, dic) MediaStreamRegistry.add(cls) # Import the streams defined in submodules # from sylk.applications.xmppgateway.xmpp.jingle.streams import rtp from sylk.applications.xmppgateway.xmpp.jingle.streams.rtp import * -__all__ = ['StreamError', 'InvalidStreamError', 'UnknownStreamError', 'IMediaStream', 'MediaStreamRegistry', 'MediaStreamRegistrar'] + rtp.__all__ +__all__ = ('StreamError', 'InvalidStreamError', 'UnknownStreamError', 'IMediaStream', 'MediaStreamRegistry', 'MediaStreamRegistrar') + rtp.__all__ diff --git a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py index f66fb7d..edeea9f 100644 --- a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py +++ b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py @@ -1,439 +1,440 @@ """ Handling of RTP media streams according to RFC3550, RFC3605, RFC3581, RFC2833 and RFC3711, RFC3489 and draft-ietf-mmusic-ice-19. """ -__all__ = ['AudioStream'] - from threading import RLock from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from zope.interface import implements from sipsimple.audio import AudioBridge, AudioDevice, IAudioPort from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioTransport, PJSIPError, RTPTransport, SIPCoreError from sipsimple.streams.rtp import RTPStreamEncryption from sylk.applications.xmppgateway.xmpp.jingle.streams import IMediaStream, InvalidStreamError, MediaStreamRegistrar, UnknownStreamError +__all__ = 'AudioStream', + + class AudioStream(object): __metaclass__ = MediaStreamRegistrar implements(IMediaStream, IAudioPort, IObserver) type = 'audio' priority = 1 hold_supported = True def __init__(self): from sipsimple.application import SIPApplication self.mixer = SIPApplication.voice_audio_mixer self.bridge = AudioBridge(self.mixer) self.device = AudioDevice(self.mixer) self.notification_center = NotificationCenter() self.on_hold_by_local = False self.on_hold_by_remote = False self.direction = None self.state = 'NULL' self._transport = None self._hold_request = None self._ice_state = 'NULL' self._lock = RLock() self._rtp_transport = None self.session = None self.encryption = RTPStreamEncryption(self) self._srtp_encryption = None self._try_ice = False self._initialized = False self._done = False self._failure_reason = None self.bridge.add(self.device) # Audio properties # @property def codec(self): return self._transport.codec if self._transport else None @property def consumer_slot(self): return self._transport.slot if self._transport else None @property def producer_slot(self): return self._transport.slot if self._transport and not self.muted else None @property def sample_rate(self): return self._transport.sample_rate if self._transport else None @property def statistics(self): return self._transport.statistics if self._transport else None def _get_muted(self): return self.__dict__.get('muted', False) def _set_muted(self, value): if not isinstance(value, bool): raise ValueError('illegal value for muted property: %r' % (value,)) if value == self.muted: return old_producer_slot = self.producer_slot self.__dict__['muted'] = value notification_center = NotificationCenter() data = NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot) notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=data) muted = property(_get_muted, _set_muted) del _get_muted, _set_muted # RTP properties # @property def local_rtp_address(self): return self._rtp_transport.local_rtp_address if self._rtp_transport else None @property def local_rtp_port(self): return self._rtp_transport.local_rtp_port if self._rtp_transport else None @property def remote_rtp_address(self): if self._ice_state == 'IN_USE': return self._rtp_transport.remote_rtp_address_received if self._rtp_transport else None else: return self._rtp_transport.remote_rtp_address_sdp if self._rtp_transport else None @property def remote_rtp_port(self): if self._ice_state == 'IN_USE': return self._rtp_transport.remote_rtp_port_received if self._rtp_transport else None else: return self._rtp_transport.remote_rtp_port_sdp if self._rtp_transport else None @property def local_rtp_candidate_type(self): return self._rtp_transport.local_rtp_candidate_type if self._rtp_transport else None @property def remote_rtp_candidate_type(self): return self._rtp_transport.remote_rtp_candidate_type if self._rtp_transport else None @property def ice_active(self): return self._ice_state == 'IN_USE' # Generic properties # @property def on_hold(self): return self.on_hold_by_local or self.on_hold_by_remote # Public methods # @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): # TODO: actually validate the SDP settings = SIPSimpleSettings() remote_stream = remote_sdp.media[stream_index] if remote_stream.media != 'audio': raise UnknownStreamError if remote_stream.transport not in ('RTP/AVP', 'RTP/SAVP'): raise InvalidStreamError('expected RTP/AVP or RTP/SAVP transport in audio stream, got %s' % remote_stream.transport) local_encryption_policy = 'sdes_optional' if local_encryption_policy == 'sdes_mandatory' and not 'crypto' in remote_stream.attributes: raise InvalidStreamError("SRTP/SDES is locally mandatory but it's not remotely enabled") if remote_stream.transport == 'RTP/SAVP' and 'crypto' in remote_stream.attributes and local_encryption_policy not in ('opportunistic', 'sdes_optional', 'sdes_mandatory'): raise InvalidStreamError("SRTP/SDES is remotely mandatory but it's not locally enabled") supported_codecs = session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs): raise InvalidStreamError('no compatible codecs found') stream = cls() stream._incoming_remote_sdp = remote_sdp stream._incoming_stream_index = stream_index if 'zrtp-hash' in remote_stream.attributes: stream._incoming_stream_encryption = 'zrtp' elif 'crypto' in remote_stream.attributes: stream._incoming_stream_encryption = 'sdes_mandatory' if remote_stream.transport == 'RTP/SAVP' else 'sdes_optional' else: stream._incoming_stream_encryption = None return stream def initialize(self, session, direction): with self._lock: if self.state != 'NULL': raise RuntimeError('AudioStream.initialize() may only be called in the NULL state') self.state = 'INITIALIZING' self.session = session local_encryption_policy = 'sdes_optional' if hasattr(self, '_incoming_remote_sdp'): # ICE attributes could come at the session level or at the media level remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index] self._try_ice = (remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates if self._incoming_stream_encryption is not None and local_encryption_policy == 'opportunistic': self._srtp_encryption = self._incoming_stream_encryption else: self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy del self._incoming_stream_encryption else: self._try_ice = True self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy self._init_rtp_transport() def get_local_media(self, remote_sdp=None, index=0): with self._lock: if self.state not in ['INITIALIZED', 'WAIT_ICE', 'ESTABLISHED']: raise RuntimeError('AudioStream.get_local_media() may only be called in the INITIALIZED, WAIT_ICE or ESTABLISHED states') if remote_sdp is None: # offer old_direction = self._transport.direction if old_direction is None: new_direction = 'sendrecv' elif 'send' in old_direction: new_direction = ('sendonly' if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else 'sendrecv') else: new_direction = ('inactive' if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else 'recvonly') else: new_direction = None return self._transport.get_local_media(remote_sdp, index, new_direction) def start(self, local_sdp, remote_sdp, stream_index): with self._lock: if self.state != 'INITIALIZED': raise RuntimeError('AudioStream.start() may only be called in the INITIALIZED state') settings = SIPSimpleSettings() self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout) self._check_hold(self._transport.direction, True) if self._try_ice: self.state = 'WAIT_ICE' else: self.state = 'ESTABLISHED' self.notification_center.post_notification('MediaStreamDidStart', sender=self) def validate_update(self, remote_sdp, stream_index): with self._lock: # TODO: implement return True def update(self, local_sdp, remote_sdp, stream_index): with self._lock: connection = remote_sdp.media[stream_index].connection or remote_sdp.connection if not self._rtp_transport.ice_active and (connection.address != self._rtp_transport.remote_rtp_address_sdp or self._rtp_transport.remote_rtp_port_sdp != remote_sdp.media[stream_index].port): settings = SIPSimpleSettings() old_consumer_slot = self.consumer_slot old_producer_slot = self.producer_slot self.notification_center.remove_observer(self, sender=self._transport) self._transport.stop() try: self._transport = AudioTransport(self.mixer, self._rtp_transport, remote_sdp, stream_index, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) except SIPCoreError as e: self.state = 'ENDED' self._failure_reason = e.args[0] self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=self._failure_reason)) return self.notification_center.add_observer(self, sender=self._transport) self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout) self.notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=True, old_consumer_slot=old_consumer_slot, new_consumer_slot=self.consumer_slot, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot)) if connection.address == '0.0.0.0' and remote_sdp.media[stream_index].direction == 'sendrecv': self._transport.update_direction('recvonly') self._check_hold(self._transport.direction, False) self.notification_center.post_notification('RTPStreamDidChangeRTPParameters', sender=self) else: new_direction = local_sdp.media[stream_index].direction self._transport.update_direction(new_direction) self._check_hold(new_direction, False) self._hold_request = None def hold(self): with self._lock: if self.on_hold_by_local or self._hold_request == 'hold': return if self.state == 'ESTABLISHED' and self.direction != 'inactive': self.bridge.remove(self) self._hold_request = 'hold' def unhold(self): with self._lock: if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold': return if self.state == 'ESTABLISHED' and self._hold_request == 'hold': self.bridge.add(self) self._hold_request = None if self._hold_request == 'hold' else 'unhold' def deactivate(self): with self._lock: self.bridge.stop() def end(self): with self._lock: if not self._initialized or self._done: return self._done = True self.notification_center.post_notification('MediaStreamWillEnd', sender=self) if self._transport is not None: self._transport.stop() self.notification_center.remove_observer(self, sender=self._transport) self._transport = None self.notification_center.remove_observer(self, sender=self._rtp_transport) self._rtp_transport = None self.state = 'ENDED' self.notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason)) self.session = None def reset(self, stream_index): with self._lock: if self.direction == 'inactive' and not self.on_hold_by_local: new_direction = 'sendrecv' self._transport.update_direction(new_direction) self._check_hold(new_direction, False) # TODO: do a full reset, re-creating the AudioTransport, so that a new offer # would contain all codecs and ICE would be renegotiated -Saul def send_dtmf(self, digit): with self._lock: if self.state != 'ESTABLISHED': raise RuntimeError('AudioStream.send_dtmf() cannot be used in %s state' % self.state) try: self._transport.send_dtmf(digit) except PJSIPError as e: if not e.args[0].endswith('(PJ_ETOOMANY)'): raise # Notification handling # def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPTransportDidFail(self, notification): with self._lock: self.notification_center.remove_observer(self, sender=notification.sender) if self.state == 'ENDED': return self._try_next_rtp_transport(notification.data.reason) def _NH_RTPTransportDidInitialize(self, notification): settings = SIPSimpleSettings() rtp_transport = notification.sender with self._lock: if self.state == 'ENDED': return del self._rtp_args del self._stun_servers try: if hasattr(self, '_incoming_remote_sdp'): try: audio_transport = AudioTransport(self.mixer, rtp_transport, self._incoming_remote_sdp, self._incoming_stream_index, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) finally: del self._incoming_remote_sdp del self._incoming_stream_index else: audio_transport = AudioTransport(self.mixer, rtp_transport, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) except SIPCoreError as e: self.state = "ENDED" self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=e.args[0])) return self._rtp_transport = rtp_transport self._transport = audio_transport self.notification_center.add_observer(self, sender=audio_transport) self._initialized = True self.state = 'INITIALIZED' self.notification_center.post_notification('MediaStreamDidInitialize', sender=self) def _NH_RTPAudioStreamGotDTMF(self, notification): self.notification_center.post_notification('AudioStreamGotDTMF', sender=self, data=NotificationData(digit=notification.data.digit)) def _NH_RTPAudioTransportDidTimeout(self, notification): self.notification_center.post_notification('RTPStreamDidTimeout', sender=self) def _NH_RTPTransportICENegotiationStateDidChange(self, notification): with self._lock: if self._ice_state != 'NULL' or self.state not in ('INITIALIZING', 'INITIALIZED', 'WAIT_ICE'): return self.notification_center.post_notification('RTPStreamICENegotiationStateDidChange', sender=self, data=notification.data) def _NH_RTPTransportICENegotiationDidSucceed(self, notification): with self._lock: if self.state != 'WAIT_ICE': return self._ice_state = 'IN_USE' self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidSucceed', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) def _NH_RTPTransportICENegotiationDidFail(self, notification): with self._lock: if self.state != 'WAIT_ICE': return self._ice_state = 'FAILED' self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidFail', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) # Private methods # def _init_rtp_transport(self, stun_servers=None): self._rtp_args = dict() self._rtp_args['encryption'] = self._srtp_encryption self._rtp_args['use_ice'] = self._try_ice self._stun_servers = [(None, None)] if stun_servers: self._stun_servers.extend(reversed(stun_servers)) self._try_next_rtp_transport() def _try_next_rtp_transport(self, failure_reason=None): if self._stun_servers: stun_address, stun_port = self._stun_servers.pop() rtp_transport = None try: rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args) self.notification_center.add_observer(self, sender=rtp_transport) rtp_transport.set_INIT() except SIPCoreError as e: if rtp_transport is not None: self.notification_center.remove_observer(self, sender=rtp_transport) self._try_next_rtp_transport(e.args[0]) else: self.state = 'ENDED' self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=failure_reason)) def _check_hold(self, direction, is_initial): was_on_hold_by_local = self.on_hold_by_local was_on_hold_by_remote = self.on_hold_by_remote was_inactive = self.direction == 'inactive' self.direction = direction inactive = self.direction == 'inactive' self.on_hold_by_local = was_on_hold_by_local if inactive else direction == 'sendonly' self.on_hold_by_remote = 'send' not in direction if (is_initial or was_on_hold_by_local or was_inactive) and not inactive and not self.on_hold_by_local and self._hold_request != 'hold': self.bridge.add(self) if not was_on_hold_by_local and self.on_hold_by_local: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='local', on_hold=True)) if was_on_hold_by_local and not self.on_hold_by_local: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='local', on_hold=False)) if not was_on_hold_by_remote and self.on_hold_by_remote: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='remote', on_hold=True)) if was_on_hold_by_remote and not self.on_hold_by_remote: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='remote', on_hold=False)) diff --git a/sylk/applications/xmppgateway/xmpp/jingle/util.py b/sylk/applications/xmppgateway/xmpp/jingle/util.py index 7dd946b..b232a2d 100644 --- a/sylk/applications/xmppgateway/xmpp/jingle/util.py +++ b/sylk/applications/xmppgateway/xmpp/jingle/util.py @@ -1,153 +1,155 @@ import re from collections import defaultdict from itertools import count from sipsimple.core import SDPSession, SDPMediaStream, SDPAttribute, SDPConnection from sylk.applications.xmppgateway.xmpp.stanzas import jingle -__all__ = ['jingle_to_sdp', 'sdp_to_jingle'] +__all__ = 'jingle_to_sdp', 'sdp_to_jingle' # IPv4 only for now, I'm sorry ipv4_re = re.compile("^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}$") + def content_to_sdpstream(content): if content.description is None or content.transport is None: raise ValueError media_stream = SDPMediaStream(str(content.description.media), 0, 'RTP/AVP') formats = [] attributes = [] for item in content.description.payloads: formats.append(item.id) attributes.append(SDPAttribute('rtpmap', '%d %s/%d' % (item.id, str(item.name), item.clockrate))) if item.maxptime: attributes.append(SDPAttribute('maxptime', str(item.maxptime))) if item.ptime: attributes.append(SDPAttribute('ptime', str(item.ptime))) if item.parameters: parameters_str = ';'.join(('%s=%s' % (p.name, p.value) for p in item.parameters)) attributes.append(SDPAttribute('fmtp', '%d %s' % (item.id, str(parameters_str)))) media_stream.formats = map(str, formats) media_stream.attributes = attributes # set attributes so that _codec_list is generated if content.description.encryption: if content.description.encryption.required: media_stream.transport = 'RTP/SAVP' for crypto in content.description.encryption.cryptos: crypto_str = '%s %s %s' % (crypto.tag, crypto.crypto_suite, crypto.key_params) if crypto.session_params: crypto_str += ' %s' % crypto.session_params media_stream.attributes.append(SDPAttribute('crypto', str(crypto_str))) if isinstance(content.transport, jingle.IceUdpTransport): if content.transport.ufrag: media_stream.attributes.append(SDPAttribute('ice-ufrag', str(content.transport.ufrag))) if content.transport.password: media_stream.attributes.append(SDPAttribute('ice-pwd', str(content.transport.password))) for candidate in content.transport.candidates: if not ipv4_re.match(candidate.ip): continue candidate_str = '%s %d %s %d %s %d typ %s' % (candidate.foundation, candidate.component, candidate.protocol.upper(), candidate.priority, candidate.ip, candidate.port, candidate.typ) if candidate.related_addr and candidate.related_port: candidate_str += ' raddr %s rport %d' % (candidate.related_addr, candidate.related_port) media_stream.attributes.append(SDPAttribute('candidate', str(candidate_str))) if content.transport.remote_candidate: remote_candidate = content.transport.remote_candidate remote_candidates_str = '%d %s %d' % (remote_candidate.component, remote_candidate.ip, remote_candidate.port) media_stream.attributes.append(SDPAttribute('remote-candidates', str(remote_candidates_str))) elif isinstance(content.transport, jingle.RawUdpTransport): # Nothing to do here pass else: raise ValueError # Set the proper connection information, pick the first RTP candidate and use that try: candidate = next(c for c in content.transport.candidates if c.component == 1 and ipv4_re.match(c.ip)) except StopIteration: raise ValueError media_stream.connection = SDPConnection(str(candidate.ip)) media_stream.port = candidate.port return media_stream def jingle_to_sdp(payload): sdp = SDPSession('127.0.0.1') for c in payload.content: try: media_stream = content_to_sdpstream(c) except ValueError: continue sdp.media.append(media_stream) return sdp ice_candidate_re = re.compile(r"""^(?P[a-zA-Z0-9+/]+) (?P\d+) (?P[a-zA-Z]+) (?P\d+) (?P[0-9a-fA-F.:]+) (?P\d+) typ (?P[a-zA-Z]+)(?: raddr (?P[0-9a-fA-F.:]+) rport (?P\d+))?$""", re.MULTILINE) crypto_re = re.compile(r"""^(?P\d+) (?P[a-zA-Z0-9\_]+) (?P[a-zA-Z0-9\:\+]+)(?: (?P[a-zA-Z0-9\:\+]+))?$""", re.MULTILINE) + def sdpstream_to_content(sdp, index): media_stream = sdp.media[index] content = jingle.Content('initiator', media_stream.media) content.description = jingle.RTPDescription(media=media_stream.media) try: ptime = next(attr.value for attr in media_stream.attributes if attr.name=='ptime') except StopIteration: ptime = None try: maxptime = next(attr.value for attr in media_stream.attributes if attr.name=='maxptime') except StopIteration: maxptime = None rtp_mappings = media_stream.rtp_mappings.copy() MediaCodec = rtp_mappings[0].__class__ rtpmap_lines = '\n'.join(attr.value for attr in media_stream.attributes if attr.name=='rtpmap') rtpmap_codecs = dict([(int(type), MediaCodec(name, rate)) for type, name, rate in media_stream.rtpmap_re.findall(rtpmap_lines)]) rtp_mappings.update(rtpmap_codecs) for item in media_stream.formats: codec = rtp_mappings.get(int(item), None) if codec is not None: pt = jingle.PayloadType(int(item), codec.name, codec.rate, 1, ptime=ptime, maxptime=maxptime) for attr in (attr for attr in media_stream.attributes if attr.name=='fmtp' and attr.value.startswith(item)): value = attr.value.split(' ', 1)[1] for v in value.split(';'): fmtp_name, sep, fmtp_value = v.partition('=') pt.parameters.append(jingle.Parameter(fmtp_name, fmtp_value)) content.description.payloads.append(pt) content.description.encryption = jingle.Encryption(required=media_stream.transport=='RTP/SAVP') crypto_lines = '\n'.join(attr.value for attr in media_stream.attributes if attr.name=='crypto') for tag, suite, key_params, session_params in crypto_re.findall(crypto_lines): content.description.encryption.cryptos.append(jingle.Crypto(suite, key_params, tag, session_params)) if media_stream.has_ice_candidates: foundation_counter = count(1) foundation_map = defaultdict(foundation_counter.next) id_counter = count(100) if not media_stream.has_ice_attributes and not sdp.has_ice_attributes: raise ValueError ufrag_attr = next(attr for attr in media_stream.attributes+sdp.attributes if attr.name=='ice-ufrag') pwd_attr = next(attr for attr in media_stream.attributes+sdp.attributes if attr.name=='ice-pwd') content.transport = jingle.IceUdpTransport(ufrag=ufrag_attr.value, pwd=pwd_attr.value) candidate_lines = '\n'.join(attr.value for attr in media_stream.attributes if attr.name=='candidate') for foundation, component, protocol, priority, ip, port, type, raddr, rport in ice_candidate_re.findall(candidate_lines): candidate = jingle.ICECandidate(component, foundation_map[foundation], 0, next(id_counter), ip, 0, port, priority, protocol.lower(), type, raddr or None, rport or None) content.transport.candidates.append(candidate) # TODO: translate remote-candidate else: content.transport = jingle.RawUdpTransport() connection = media_stream.connection or sdp.connection if not connection: raise ValueError content.transport.candidates.append(jingle.UDPCandidate(1, 0, 100, connection.address, media_stream.port, 'UDP')) # TODO: component for RTCP return content def sdp_to_jingle(sdp): payload = jingle.Jingle(None, None) # action and sid will be filled up by the session for index, media_stream in enumerate(sdp.media): try: content = sdpstream_to_content(sdp, index) except ValueError: continue payload.content.append(content) return payload diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py index cabc1ac..56205d9 100644 --- a/sylk/applications/xmppgateway/xmpp/protocols.py +++ b/sylk/applications/xmppgateway/xmpp/protocols.py @@ -1,380 +1,380 @@ from application.notification import NotificationCenter, NotificationData from twisted.internet import defer, reactor from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.jid import JID from wokkel import disco, muc, ping, xmppim from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI -from sylk.applications.xmppgateway.xmpp.stanzas import (RECEIPTS_NS, CHATSTATES_NS, MUC_USER_NS, ErrorStanza, - NormalMessage, MessageReceipt, ChatMessage, ChatComposingIndication, - AvailabilityPresence, SubscriptionPresence, ProbePresence, - MUCAvailabilityPresence, GroupChatMessage, IncomingInvitationMessage) +from sylk.applications.xmppgateway.xmpp.stanzas import RECEIPTS_NS, CHATSTATES_NS, MUC_USER_NS, ErrorStanza, ChatComposingIndication +from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, GroupChatMessage, IncomingInvitationMessage, NormalMessage, MessageReceipt +from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence, SubscriptionPresence, ProbePresence, MUCAvailabilityPresence from sylk.applications.xmppgateway.xmpp.stanzas import jingle -__all__ = ['DiscoProtocol', 'JingleProtocol', 'MessageProtocol', 'MUCServerProtocol', 'MUCPresenceProtocol', 'PresenceProtocol'] + +__all__ = 'DiscoProtocol', 'JingleProtocol', 'MessageProtocol', 'MUCServerProtocol', 'MUCPresenceProtocol', 'PresenceProtocol' class MessageProtocol(xmppim.MessageProtocol): messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType not in self.messageTypes: message["type"] = 'normal' self.onMessage(message) def onMessage(self, msg): notification_center = NotificationCenter() sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) msg_type = msg.getAttribute('type') msg_id = msg.getAttribute('id', None) is_empty = msg.body is None and msg.html is None if msg_type == 'error': error_type = msg.error['type'] conditions = [(child.name, child.defaultUri) for child in msg.error.elements()] error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg_id) notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=NotificationData(error_message=error_message)) return if msg_type in (None, 'normal', 'chat') and not is_empty: body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) try: elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS) except StopIteration: use_receipt = False else: use_receipt = elem.name == u'request' if msg_type == 'chat': message = ChatMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt) notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=NotificationData(message=message)) else: message = NormalMessage(sender, recipient, body, html_body, id=msg_id, use_receipt=use_receipt) notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=NotificationData(message=message)) return # Check if it's a composing indication if msg_type == 'chat' and is_empty: for elem in msg.elements(): try: elem = next(c for c in msg.elements() if c.uri == CHATSTATES_NS) except StopIteration: pass else: composing_indication = ChatComposingIndication(sender, recipient, elem.name, id=msg_id) notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=NotificationData(composing_indication=composing_indication)) return # Check if it's a receipt acknowledgement if is_empty: try: elem = next(c for c in msg.elements() if c.uri == RECEIPTS_NS) except StopIteration: pass else: if elem.name == u'received' and msg_id is not None: receipt = MessageReceipt(sender, recipient, msg_id) notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=NotificationData(receipt=receipt)) class PresenceProtocol(xmppim.PresenceProtocol): def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') show = stanza.show statuses = stanza.statuses presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) NotificationCenter().post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def _process_subscription_stanza(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') type = stanza.element.getAttribute('type') presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) NotificationCenter().post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def subscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def subscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def probeReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = ProbePresence(sender, recipient, id=id) NotificationCenter().post_notification('XMPPGotPresenceProbe', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) class MUCServerProtocol(xmppim.BasePresenceProtocol): messageTypes = None, 'normal', 'chat', 'groupchat' presenceTypeParserMap = {'available': muc.UserPresence, 'unavailable': muc.UserPresence} def connectionInitialized(self): self.xmlstream.addObserver('/presence/x[@xmlns="%s"]' % muc.NS_MUC, self._onPresence) self.xmlstream.addObserver('/message', self._onMessage) def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType == 'error': return if messageType not in self.messageTypes: message['type'] = 'normal' if messageType == 'groupchat': self.onGroupChat(message) else: to_uri = FrozenURI.parse('xmpp:'+message['to']) if to_uri.host in self.parent.domains: # Check if it's an invitation if message.x is not None and message.x.invite is not None and message.x.invite.uri == MUC_USER_NS: self.onInvitation(message) else: # TODO: give error, private messages not supported pass def onGroupChat(self, msg): sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) message = GroupChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None)) NotificationCenter().post_notification('XMPPMucGotGroupChat', sender=self.parent, data=NotificationData(message=message)) def onInvitation(self, msg): sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) invited_user_uri = FrozenURI.parse('xmpp:'+msg.x.invite['to']) invited_user = Identity(invited_user_uri) if msg.x.invite.reason is not None and msg.x.invite.reason.uri == MUC_USER_NS: reason = unicode(msg.x.invite.reason) else: reason = None invitation = IncomingInvitationMessage(sender, recipient, invited_user=invited_user, reason=reason, id=msg.getAttribute('id', None)) NotificationCenter().post_notification('XMPPMucGotInvitation', sender=self.parent, data=NotificationData(invitation=invitation)) def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=True, id=id) NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=False, id=id) NotificationCenter().post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) class DiscoProtocol(disco.DiscoHandler): def info(self, requestor, target, nodeIdentifier): """ Gather data for a disco info request. @param requestor: The entity that sent the request. @type requestor: L{JID} @param target: The entity the request was sent to. @type target: L{JID} @param nodeIdentifier: The optional node being queried, or C{''}. @type nodeIdentifier: C{unicode} @return: Deferred with the gathered results from sibling handlers. @rtype: L{defer.Deferred} """ xmpp_manager = self.parent.manager if target.host not in xmpp_manager.domains | xmpp_manager.muc_domains: return defer.fail(StanzaError('service-unavailable')) elements = [disco.DiscoFeature(disco.NS_DISCO_INFO), disco.DiscoFeature(disco.NS_DISCO_ITEMS), disco.DiscoFeature('http://sylkserver.com')] if target.host in xmpp_manager.muc_domains: elements.append(disco.DiscoIdentity('conference', 'text', 'SylkServer Chat Service')) elements.append(disco.DiscoFeature('http://jabber.org/protocol/muc')) elements.append(disco.DiscoFeature('urn:ietf:rfc:3264')) elements.append(disco.DiscoFeature('urn:xmpp:coin')) elements.append(disco.DiscoFeature(jingle.NS_JINGLE)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_AUDIO)) #elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_VIDEO)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_ICE_UDP_TRANSPORT)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_RAW_UDP_TRANSPORT)) if target.user: # We can't say much more here, because the actual conference may end up on a different server elements.append(disco.DiscoFeature('muc_temporary')) elements.append(disco.DiscoFeature('muc_unmoderated')) else: elements.append(disco.DiscoFeature(ping.NS_PING)) if not target.user: elements.append(disco.DiscoIdentity('gateway', 'simple', 'SylkServer')) elements.append(disco.DiscoIdentity('server', 'im', 'SylkServer')) else: elements.append(disco.DiscoIdentity('client', 'pc')) elements.append(disco.DiscoFeature('http://jabber.org/protocol/caps')) elements.append(disco.DiscoFeature('http://jabber.org/protocol/chatstates')) elements.append(disco.DiscoFeature('urn:ietf:rfc:3264')) elements.append(disco.DiscoFeature('urn:xmpp:coin')) elements.append(disco.DiscoFeature(jingle.NS_JINGLE)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_AUDIO)) #elements.append(disco.DiscoFeature(jingle.NS_JINGLE_APPS_RTP_VIDEO)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_ICE_UDP_TRANSPORT)) elements.append(disco.DiscoFeature(jingle.NS_JINGLE_RAW_UDP_TRANSPORT)) return defer.succeed(elements) def items(self, requestor, target, nodeIdentifier): """ Gather data for a disco items request. @param requestor: The entity that sent the request. @type requestor: L{JID} @param target: The entity the request was sent to. @type target: L{JID} @param nodeIdentifier: The optional node being queried, or C{''}. @type nodeIdentifier: C{unicode} @return: Deferred with the gathered results from sibling handlers. @rtype: L{defer.Deferred} """ xmpp_manager = self.parent.manager items = [] if not target.user and target.host in xmpp_manager.domains: items.append(disco.DiscoItem(JID('%s.%s' % (XMPPGatewayConfig.muc_prefix, target.host)), name='Multi-User Chat')) return defer.succeed(items) class JingleProtocol(jingle.JingleHandler): # Functions here need to return immediately so that the IQ result is sent, so schedule them in the reactor # TODO: review and remove this, just post notifications? def onSessionInitiate(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleSessionInitiate', sender=self.parent, data=NotificationData(stanza=request, protocol=self)) def onSessionTerminate(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleSessionTerminate', sender=self.parent, data=NotificationData(stanza=request)) def onSessionAccept(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleSessionAccept', sender=self.parent, data=NotificationData(stanza=request)) def onSessionInfo(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleSessionInfo', sender=self.parent, data=NotificationData(stanza=request)) def onDescriptionInfo(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleDescriptionInfo', sender=self.parent, data=NotificationData(stanza=request)) def onTransportInfo(self, request): reactor.callLater(0, NotificationCenter().post_notification, 'XMPPGotJingleTransportInfo', sender=self.parent, data=NotificationData(stanza=request)) class MUCPresenceProtocol(xmppim.PresenceProtocol): """Protocol implementation to handle presence subscription to MUC URIs """ def subscribeReceived(self, stanza): """ Subscription request was received. """ self.subscribed(stanza.sender, sender=stanza.recipient) self.send_available(stanza) def unsubscribeReceived(self, stanza): """ Unsubscription request was received. """ self.unsubscribed(stanza.sender, sender=stanza.recipient) def probeReceived(self, stanza): """ Probe presence was received. """ self.send_available(stanza) def send_available(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) available = AvailabilityPresence(sender=recipient, recipient=sender) self.send(available.to_xml_element()) diff --git a/sylk/applications/xmppgateway/xmpp/server.py b/sylk/applications/xmppgateway/xmpp/server.py index afef023..28bc339 100644 --- a/sylk/applications/xmppgateway/xmpp/server.py +++ b/sylk/applications/xmppgateway/xmpp/server.py @@ -1,116 +1,116 @@ from application.notification import NotificationCenter, NotificationData from twisted.internet import defer, reactor from twisted.words.protocols.jabber import error, xmlstream from twisted.words.protocols.jabber.jid import internJID from wokkel.component import InternalComponent, Router from wokkel.server import XMPPS2SServerFactory, DeferredS2SClientFactory -__all__ = ['SylkRouter', 'SylkInternalComponent'] +__all__ = 'SylkRouter', 'SylkInternalComponent' class SylkInternalComponent(InternalComponent): def __init__(self, *args, **kwargs): InternalComponent.__init__(self, *args, **kwargs) self._iqDeferreds = {} def startService(self): InternalComponent.startService(self) self.xmlstream.addObserver('/iq[@type="result"]', self._onIQResponse) self.xmlstream.addObserver('/iq[@type="error"]', self._onIQResponse) def stopService(self): InternalComponent.stopService(self) iqDeferreds = self._iqDeferreds self._iqDeferreds = {} for d in iqDeferreds.itervalues(): d.errback(xmlstream.TimeoutError("Shutting down")) def request(self, request): if request.stanzaKind != 'iq' or request.stanzaType not in ('get', 'set'): return defer.fail(ValueError("Not a request")) element = request.toElement() # Make sure we have a trackable id on the stanza if not request.stanzaID: element.addUniqueId() request.stanzaID = element['id'] # Set up iq response tracking d = defer.Deferred() self._iqDeferreds[element['id']] = d timeout = getattr(request, 'timeout', None) if timeout is not None: def onTimeout(): del self._iqDeferreds[element['id']] d.errback(xmlstream.TimeoutError("IQ timed out")) call = reactor.callLater(timeout, onTimeout) def cancelTimeout(result): if call.active(): call.cancel() return result d.addBoth(cancelTimeout) self.send(element) return d def _onIQResponse(self, iq): try: d = self._iqDeferreds[iq["id"]] except KeyError: return del self._iqDeferreds[iq["id"]] iq.handled = True if iq['type'] == 'error': d.errback(error.exceptionFromStanza(iq)) else: d.callback(iq) class SylkRouter(Router): def route(self, stanza): """ Route a stanza. (subclassed to avoid vebose logging) @param stanza: The stanza to be routed. @type stanza: L{domish.Element}. """ destination = internJID(stanza['to']) if destination.host in self.routes: self.routes[destination.host].send(stanza) else: self.routes[None].send(stanza) class LoggingXMLStream(xmlstream.XmlStream): notification_center = NotificationCenter() def __init__(self, *args, **kw): xmlstream.XmlStream.__init__(self, *args, **kw) self.rawDataInFn = self._log_incoming_message self.rawDataOutFn = self._log_outgoing_message def _log_incoming_message(self, message): self.notification_center.post_notification('XMPPMessageTrace', sender=self, data=NotificationData(direction='INCOMING', message=message)) def _log_outgoing_message(self, message): self.notification_center.post_notification('XMPPMessageTrace', sender=self, data=NotificationData(direction='OUTGOING', message=message)) # Modify wokkel's factories to not be noisy and to use our logging protocol XMPPS2SServerFactory.noisy = False XMPPS2SServerFactory.protocol = LoggingXMLStream DeferredS2SClientFactory.noisy = False DeferredS2SClientFactory.protocol = LoggingXMLStream diff --git a/sylk/applications/xmppgateway/xmpp/session.py b/sylk/applications/xmppgateway/xmpp/session.py index 53d499d..458566f 100644 --- a/sylk/applications/xmppgateway/xmpp/session.py +++ b/sylk/applications/xmppgateway/xmpp/session.py @@ -1,214 +1,215 @@ from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from application.python.types import Singleton from eventlib import coros, proc from twisted.internet import reactor from zope.interface import implements from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, MessageReceipt, ErrorStanza, GroupChatMessage, MUCAvailabilityPresence -__all__ = ['XMPPChatSession', 'XMPPChatSessionManager', 'XMPPIncomingMucSession', 'XMPPMucSessionManager'] + +__all__ = 'XMPPChatSession', 'XMPPChatSessionManager', 'XMPPIncomingMucSession', 'XMPPMucSessionManager' # Chat sessions class XMPPChatSession(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.pending_receipts = {} self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def start(self): NotificationCenter().post_notification('XMPPChatSessionDidStart', sender=self) self._proc = proc.spawn(self._run) self.state = 'started' def end(self): self.send_composing_indication('gone') self._clear_pending_receipts() self._proc.kill() self._proc = None NotificationCenter().post_notification('XMPPChatSessionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def send_message(self, body, html_body, message_id=None, use_receipt=True): message = ChatMessage(self.local_identity, self.remote_identity, body, html_body, id=message_id, use_receipt=use_receipt) self.xmpp_manager.send_stanza(message) if message_id is not None: timer = reactor.callLater(30, self._receipt_timer_expired, message_id) self.pending_receipts[message_id] = timer NotificationCenter().post_notification('XMPPChatSessionDidSendMessage', sender=self, data=NotificationData(message=message)) def send_composing_indication(self, state, message_id=None, use_receipt=False): message = ChatComposingIndication(self.local_identity, self.remote_identity, state, id=message_id, use_receipt=use_receipt) self.xmpp_manager.send_stanza(message) if message_id is not None: timer = reactor.callLater(30, self._receipt_timer_expired, message_id) self.pending_receipts[message_id] = timer NotificationCenter().post_notification('XMPPChatSessionDidSendMessage', sender=self, data=NotificationData(message=message)) def send_receipt_acknowledgement(self, receipt_id): message = MessageReceipt(self.local_identity, self.remote_identity, receipt_id) self.xmpp_manager.send_stanza(message) def send_error(self, stanza, error_type, conditions): message = ErrorStanza.from_stanza(stanza, error_type, conditions) self.xmpp_manager.send_stanza(message) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, ChatMessage): notification_center.post_notification('XMPPChatSessionGotMessage', sender=self, data=NotificationData(message=item)) elif isinstance(item, ChatComposingIndication): if item.state == 'gone': self._clear_pending_receipts() notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=NotificationData(originator='remote')) self.state = 'terminated' break else: notification_center.post_notification('XMPPChatSessionGotComposingIndication', sender=self, data=NotificationData(message=item)) elif isinstance(item, MessageReceipt): if item.receipt_id in self.pending_receipts: timer = self.pending_receipts.pop(item.receipt_id) timer.cancel() notification_center.post_notification('XMPPChatSessionDidDeliverMessage', sender=self, data=NotificationData(message_id=item.receipt_id)) elif isinstance(item, ErrorStanza): if item.id in self.pending_receipts: timer = self.pending_receipts.pop(item.id) timer.cancel() # TODO: translate cause notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=item.id, code=503, reason='Service Unavailable')) self._proc = None def _receipt_timer_expired(self, message_id): self.pending_receipts.pop(message_id) NotificationCenter().post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=message_id, code=408, reason='Timeout')) def _clear_pending_receipts(self): notification_center = NotificationCenter() while self.pending_receipts: message_id, timer = self.pending_receipts.popitem() timer.cancel() notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=message_id, code=408, reason='Timeout')) class XMPPChatSessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPChatSessionDidStart') notification_center.add_observer(self, name='XMPPChatSessionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPChatSessionDidStart') notification_center.remove_observer(self, name='XMPPChatSessionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPChatSessionDidStart(self, notification): session = notification.sender self.sessions[(session.local_identity.uri, session.remote_identity.uri)] = session def _NH_XMPPChatSessionDidEnd(self, notification): session = notification.sender del self.sessions[(session.local_identity.uri, session.remote_identity.uri)] # MUC sessions class XMPPIncomingMucSession(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def start(self): NotificationCenter().post_notification('XMPPIncomingMucSessionDidStart', sender=self) self._proc = proc.spawn(self._run) self.state = 'started' def end(self): self._proc.kill() self._proc = None NotificationCenter().post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def send_message(self, sender, body, html_body, message_id=None): # TODO: timestamp? message = GroupChatMessage(sender, self.remote_identity, body, html_body, id=message_id) self.xmpp_manager.send_muc_stanza(message) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, GroupChatMessage): notification_center.post_notification('XMPPIncomingMucSessionGotMessage', sender=self, data=NotificationData(message=item)) elif isinstance(item, MUCAvailabilityPresence): if item.available: nickname = item.recipient.uri.resource notification_center.post_notification('XMPPIncomingMucSessionChangedNickname', sender=self, data=NotificationData(stanza=item, nickname=nickname)) else: notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' break self._proc = None class XMPPMucSessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.incoming = {} self.outgoing = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPIncomingMucSessionDidStart') notification_center.add_observer(self, name='XMPPIncomingMucSessionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidStart') notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPIncomingMucSessionDidStart(self, notification): muc = notification.sender self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)] = muc def _NH_XMPPIncomingMucSessionDidEnd(self, notification): muc = notification.sender del self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)] diff --git a/sylk/applications/xmppgateway/xmpp/subscription.py b/sylk/applications/xmppgateway/xmpp/subscription.py index 234e11c..ac70b8c 100644 --- a/sylk/applications/xmppgateway/xmpp/subscription.py +++ b/sylk/applications/xmppgateway/xmpp/subscription.py @@ -1,198 +1,199 @@ from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from application.python.types import Singleton from eventlib import coros, proc from zope.interface import implements from sylk.applications.xmppgateway.xmpp.stanzas import SubscriptionPresence, ProbePresence, AvailabilityPresence -__all__ = ['XMPPSubscription', 'XMPPIncomingSubscription', 'XMPPSubscriptionManager'] + +__all__ = 'XMPPSubscription', 'XMPPIncomingSubscription', 'XMPPSubscriptionManager' class XMPPSubscription(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def _set_state(self, new_state): prev_state = self.__dict__.get('state', None) self.__dict__['state'] = new_state if prev_state != new_state: NotificationCenter().post_notification('XMPPSubscriptionChangedState', sender=self, data=NotificationData(prev_state=prev_state, state=new_state)) def _get_state(self): return self.__dict__['state'] state = property(_get_state, _set_state) del _get_state, _set_state def start(self): NotificationCenter().post_notification('XMPPSubscriptionDidStart', sender=self) self._proc = proc.spawn(self._run) self.subscribe() def end(self): if self.state == 'terminated': return self._proc.kill() self._proc = None NotificationCenter().post_notification('XMPPSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def subscribe(self): self.state = 'subscribe_sent' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribe') self.xmpp_manager.send_stanza(stanza) # If we are already subscribed we may not receive an answer, send a probe just in case self._send_probe() def unsubscribe(self): self.state = 'unsubscribe_sent' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribe') self.xmpp_manager.send_stanza(stanza) def _send_probe(self): self.state = 'subscribe_sent' stanza = ProbePresence(self.local_identity, self.remote_identity) self.xmpp_manager.send_stanza(stanza) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, AvailabilityPresence): if self.state == 'subscribe_sent': self.state = 'active' notification_center.post_notification('XMPPSubscriptionGotNotify', sender=self, data=NotificationData(presence=item)) elif isinstance(item, SubscriptionPresence): if self.state == 'subscribe_sent' and item.type == 'subscribed': self.state = 'active' elif item.type == 'unsubscribed': prev_state = self.state self.state = 'terminated' if prev_state in ('active', 'unsubscribe_sent'): notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self) else: notification_center.post_notification('XMPPSubscriptionDidFail', sender=self) break self._proc = None class XMPPIncomingSubscription(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def _set_state(self, new_state): prev_state = self.__dict__.get('state', None) self.__dict__['state'] = new_state if prev_state != new_state: NotificationCenter().post_notification('XMPPIncomingSubscriptionChangedState', sender=self, data=NotificationData(prev_state=prev_state, state=new_state)) def _get_state(self): return self.__dict__['state'] state = property(_get_state, _set_state) del _get_state, _set_state def start(self): NotificationCenter().post_notification('XMPPIncomingSubscriptionDidStart', sender=self) self._proc = proc.spawn(self._run) def end(self): if self.state == 'terminated': return self.state = 'terminated' self._proc.kill() self._proc = None NotificationCenter().post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) def accept(self): self.state = 'active' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribed') self.xmpp_manager.send_stanza(stanza) def reject(self): self.state = 'terminating' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribed') self.xmpp_manager.send_stanza(stanza) self.end() def send_presence(self, stanza): self.xmpp_manager.send_stanza(stanza) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, SubscriptionPresence): if item.type == 'subscribe': notification_center.post_notification('XMPPIncomingSubscriptionGotSubscribe', sender=self) elif item.type == 'unsubscribe': self.state = 'terminated' notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingSubscriptionGotUnsubscribe', sender=self) notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) break elif isinstance(item, ProbePresence): notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingSubscriptionGotProbe', sender=self) self._proc = None class XMPPSubscriptionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.incoming_subscriptions = {} self.outgoing_subscriptions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPSubscriptionDidStart') notification_center.add_observer(self, name='XMPPSubscriptionDidEnd') notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidStart') notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPSubscriptionDidStart') notification_center.remove_observer(self, name='XMPPSubscriptionDidEnd') notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidStart') notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPSubscriptionDidStart(self, notification): subscription = notification.sender self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription def _NH_XMPPSubscriptionDidEnd(self, notification): subscription = notification.sender del self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] def _NH_XMPPIncomingSubscriptionDidStart(self, notification): subscription = notification.sender self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription def _NH_XMPPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender del self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] diff --git a/sylk/bonjour.py b/sylk/bonjour.py index 4e948cb..354e184 100644 --- a/sylk/bonjour.py +++ b/sylk/bonjour.py @@ -1,256 +1,258 @@ import uuid from application import log from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib import api, coros, proc from eventlib.green import select from sipsimple.account.bonjour import _bonjour, BonjourPresenceState, BonjourRegistrationFile from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.threading import call_in_twisted_thread, run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from threading import Lock from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount -class RestartSelect(Exception): pass +class RestartSelect(Exception): + pass + class BonjourService(object): implements(IObserver) def __init__(self, service='sipfocus', name='SylkServer', uri_user=None, is_focus=True): self.account = DefaultAccount() self.service = service self.name = name self.uri_user = uri_user self.is_focus = is_focus self.id = str(uuid.uuid4()) self._stopped = True self._files = [] self._command_channel = coros.queue() self._select_proc = None self._register_timer = None self._update_timer = None self._lock = Lock() self.__dict__['presence_state'] = None @run_in_green_thread def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='NetworkConditionsDidChange') self._select_proc = proc.spawn(self._process_files) proc.spawn(self._handle_commands) self._activate() @run_in_green_thread def stop(self): self._deactivate() notification_center = NotificationCenter() notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._select_proc.kill() self._command_channel.send_exception(api.GreenletExit) def _activate(self): self._stopped = False self._command_channel.send(Command('register')) def _deactivate(self): command = Command('stop') self._command_channel.send(command) command.wait() self._stopped = True def restart_registration(self): self._command_channel.send(Command('unregister')) self._command_channel.send(Command('register')) def update_registrations(self): self._command_channel.send(Command('update_registrations')) def _get_presence_state(self): return self.__dict__['presence_state'] def _set_presence_state(self, state): if state is not None and not isinstance(state, BonjourPresenceState): raise ValueError("state must be a %s instance or None" % BonjourPresenceState.__name__) with self._lock: old_state = self.__dict__['presence_state'] self.__dict__['presence_state'] = state if state != old_state: call_in_twisted_thread(self.update_registrations) presence_state = property(_get_presence_state, _set_presence_state) del _get_presence_state, _set_presence_state def _register_cb(self, file, flags, error_code, name, regtype, domain): notification_center = NotificationCenter() file = BonjourRegistrationFile.find_by_file(file) if error_code == _bonjour.kDNSServiceErr_NoError: notification_center.post_notification('BonjourServiceRegistrationDidSucceed', sender=self, data=NotificationData(name=name, transport=file.transport)) else: error = _bonjour.BonjourError(error_code) notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, data=NotificationData(reason=str(error), transport=file.transport)) self._files.remove(file) self._select_proc.kill(RestartSelect) file.close() if self._register_timer is None: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register')) def _process_files(self): while True: try: ready = select.select([f for f in self._files if not f.active and not f.closed], [], [])[0] except RestartSelect: continue else: for file in ready: file.active = True self._command_channel.send(Command('process_results', files=[f for f in ready if not f.closed])) def _handle_commands(self): while True: command = self._command_channel.wait() if not self._stopped: handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_unregister(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile)): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() def _CH_register(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None supported_transports = set(transport for transport in settings.sip.transport_list if transport!='tls' or self.account.tls.certificate is not None) registered_transports = set(file.transport for file in self._files if isinstance(file, BonjourRegistrationFile)) missing_transports = supported_transports - registered_transports added_transports = set() for transport in missing_transports: notification_center.post_notification('BonjourServiceWillRegister', sender=self, data=NotificationData(transport=transport)) try: contact_uri = self.account.contact[transport] contact_uri.user = self.uri_user if self.is_focus: contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri), instance_id=self.id) state = self.presence_state if state is not None: txtdata['state'] = state.state txtdata['note'] = state.note.encode('utf-8') file = _bonjour.DNSServiceRegister(name=str(contact_uri), regtype="_%s._%s" % (self.service, transport if transport == 'udp' else 'tcp'), port=contact_uri.port, callBack=self._register_cb, txtRecord=_bonjour.TXTRecord(items=txtdata)) except (_bonjour.BonjourError, KeyError) as e: notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, data=NotificationData(reason=str(e), transport=transport)) else: self._files.append(BonjourRegistrationFile(file, transport)) added_transports.add(transport) if added_transports: self._select_proc.kill(RestartSelect) if added_transports != missing_transports: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register', command.event)) else: command.signal() def _CH_update_registrations(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None available_transports = settings.sip.transport_list old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile) and f.transport not in available_transports): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() update_failure = False for file in (f for f in self._files if isinstance(f, BonjourRegistrationFile)): try: contact_uri = self.account.contact[file.transport] contact_uri.user = self.uri_user if self.is_focus: contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri), instance_id=self.id) state = self.presence_state if state is not None: txtdata['state'] = state.state txtdata['note'] = state.note.encode('utf-8') _bonjour.DNSServiceUpdateRecord(file.file, None, flags=0, rdata=_bonjour.TXTRecord(items=txtdata), ttl=0) except (_bonjour.BonjourError, KeyError) as e: notification_center.post_notification('BonjourServiceRegistrationUpdateDidFail', sender=self, data=NotificationData(reason=str(e), transport=file.transport)) update_failure = True self._command_channel.send(Command('register')) if update_failure: self._update_timer = reactor.callLater(1, self._command_channel.send, Command('update_registrations', command.event)) else: command.signal() def _CH_process_results(self, command): for file in (f for f in command.files if not f.closed): try: _bonjour.DNSServiceProcessResult(file.file) except: # Should we close the file? The documentation doesn't say anything about this. -Luci log.exception() for file in command.files: file.active = False self._files = [f for f in self._files if not f.closed] self._select_proc.kill(RestartSelect) def _CH_stop(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None old_files = self._files self._files = [] self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_NetworkConditionsDidChange(self, notification): if self._files: self.restart_registration() diff --git a/sylk/configuration/settings.py b/sylk/configuration/settings.py index bfc8d98..089b2e2 100644 --- a/sylk/configuration/settings.py +++ b/sylk/configuration/settings.py @@ -1,170 +1,170 @@ -""" -SIP SIMPLE SDK settings extensions. -""" - -__all__ = ['AccountExtension', 'BonjourAccountExtension', 'SylkServerSettingsExtension'] +"""SIP SIMPLE SDK settings extensions""" import os from application import log from sipsimple.account import MSRPSettings as AccountMSRPSettings, NATTraversalSettings as AccountNATTraversalSettings from sipsimple.account import RTPSettings as AccountRTPSettings, SIPSettings as AccountSIPSettings, TLSSettings as AccountTLSSettings, SRTPEncryptionSettings as AccountSRTPEncryptionSettings from sipsimple.account import MessageSummarySettings as AccountMessageSummarySettings, PresenceSettings as AccountPresenceSettingss, XCAPSettings as AccountXCAPSettings from sipsimple.configuration import CorrelatedSetting, Setting, SettingsObjectExtension from sipsimple.configuration.datatypes import MSRPConnectionModel, MSRPTransport, NonNegativeInteger, PortRange, SampleRate, SIPTransportList, SRTPKeyNegotiation from sipsimple.configuration.settings import AudioSettings, EchoCancellerSettings, FileTransferSettings, LogsSettings, RTPSettings, SIPSettings, TLSSettings from sylk import __version__ as server_version from sylk.configuration import ServerConfig, SIPConfig, MSRPConfig, RTPConfig from sylk.configuration.datatypes import AudioCodecs, Path, Port, SIPProxyAddress +__all__ = 'AccountExtension', 'BonjourAccountExtension', 'SylkServerSettingsExtension' + + # Account settings extensions class AccountMessageSummarySettingsExtension(AccountMessageSummarySettings): enabled = Setting(type=bool, default=False) class AccountMSRPSettingsExtension(AccountMSRPSettings): transport = Setting(type=MSRPTransport, default='tls' if MSRPConfig.use_tls else 'tcp') connection_model = Setting(type=MSRPConnectionModel, default='relay' if ServerConfig.enable_bonjour else 'acm') class AccountNATTraversalSettingsExtension(AccountNATTraversalSettings): use_ice = Setting(type=bool, default=SIPConfig.enable_ice) use_msrp_relay_for_outbound = Setting(type=bool, default=False) class AccountPresenceSettingssExtension(AccountPresenceSettingss): enabled = Setting(type=bool, default=False) if RTPConfig.srtp_encryption == 'disabled': # doesn't matter because it's disabled srtp_key_negotiation = 'opportunistic' elif RTPConfig.srtp_encryption == 'sdes': srtp_key_negotiation = 'sdes_optional' else: srtp_key_negotiation = RTPConfig.srtp_encryption + class AccountSRTPEncryptionSettingsExtension(AccountSRTPEncryptionSettings): enabled = Setting(type=bool, default=RTPConfig.srtp_encryption!='disabled') key_negotiation = Setting(type=SRTPKeyNegotiation, default=srtp_key_negotiation) class AccountRTPSettingsExtension(AccountRTPSettings): audio_codec_list = Setting(type=AudioCodecs, default=None, nillable=True) encryption = AccountSRTPEncryptionSettingsExtension class AccountSIPSettingsExtension(AccountSIPSettings): register = Setting(type=bool, default=False) outbound_proxy = Setting(type=SIPProxyAddress, default=SIPConfig.outbound_proxy, nillable=True) account_cert = ServerConfig.certificate if account_cert is not None and not os.path.isfile(account_cert): account_cert = None + class AccountTLSSettingsExtension(AccountTLSSettings): certificate = Setting(type=Path, default=account_cert, nillable=True) verify_server = Setting(type=bool, default=ServerConfig.verify_server) class AccountXCAPSettingsExtension(AccountXCAPSettings): enabled = Setting(type=bool, default=False) class AccountExtension(SettingsObjectExtension): enabled = Setting(type=bool, default=True) message_summary = AccountMessageSummarySettingsExtension msrp = AccountMSRPSettingsExtension nat_traversal = AccountNATTraversalSettingsExtension presence = AccountPresenceSettingssExtension rtp = AccountRTPSettingsExtension sip = AccountSIPSettingsExtension tls = AccountTLSSettingsExtension xcap = AccountXCAPSettingsExtension class BonjourAccountExtension(SettingsObjectExtension): enabled = Setting(type=bool, default=False) # General settings extensions class EchoCancellerSettingsExtension(EchoCancellerSettings): enabled = Setting(type=bool, default=False) tail_length = Setting(type=NonNegativeInteger, default=0) class AudioSettingsExtension(AudioSettings): input_device = Setting(type=str, default=None, nillable=True) output_device = Setting(type=str, default=None, nillable=True) sample_rate = Setting(type=SampleRate, default=RTPConfig.sample_rate) echo_canceller = EchoCancellerSettings class FileTransferSettingsExtension(FileTransferSettings): directory = Setting(type=Path, default=Path(os.path.join(ServerConfig.spool_dir.normalized, 'file_transfers'))) class LogsSettingsExtension(LogsSettings): directory = Setting(type=Path, default=ServerConfig.trace_dir) trace_sip = Setting(type=bool, default=ServerConfig.trace_sip) trace_msrp = Setting(type=bool, default=ServerConfig.trace_msrp) trace_pjsip = Setting(type=bool, default=ServerConfig.trace_core) class RTPSettingsExtension(RTPSettings): audio_codec_list = Setting(type=AudioCodecs, default=RTPConfig.audio_codecs) port_range = Setting(type=PortRange, default=PortRange(RTPConfig.port_range.start, RTPConfig.port_range.end)) timeout = Setting(type=NonNegativeInteger, default=RTPConfig.timeout) ca_file = ServerConfig.ca_file if ca_file is not None and not os.path.isfile(ca_file): ca_file = None class TLSSettingsExtension(TLSSettings): ca_list = Setting(type=Path, default=ca_file, nillable=True) def sip_port_validator(port, sibling_port): if port == sibling_port != 0: raise ValueError("the TCP and TLS ports must be different") transport_list = [] if SIPConfig.local_udp_port is not None: transport_list.append('udp') if SIPConfig.local_tcp_port is not None: transport_list.append('tcp') tls_port = SIPConfig.local_tls_port if tls_port is not None and None in (ca_file, account_cert): log.warning('Cannot enable TLS because the CA or the certificate are not specified') tls_port = None if tls_port is not None: transport_list.append('tls') class SIPSettingsExtension(SIPSettings): udp_port = Setting(type=Port, default=SIPConfig.local_udp_port, nillable=True) tcp_port = CorrelatedSetting(type=Port, sibling='tls_port', validator=sip_port_validator, default=SIPConfig.local_tcp_port, nillable=True) tls_port = CorrelatedSetting(type=Port, sibling='tcp_port', validator=sip_port_validator, default=tls_port, nillable=True) transport_list = Setting(type=SIPTransportList, default=transport_list) class SylkServerSettingsExtension(SettingsObjectExtension): user_agent = Setting(type=str, default='SylkServer-%s' % server_version) audio = AudioSettingsExtension file_transfer = FileTransferSettingsExtension logs = LogsSettingsExtension rtp = RTPSettingsExtension sip = SIPSettingsExtension tls = TLSSettingsExtension - diff --git a/sylk/interfaces/sipthor.py b/sylk/interfaces/sipthor.py index 9c5dd9a..6490252 100644 --- a/sylk/interfaces/sipthor.py +++ b/sylk/interfaces/sipthor.py @@ -1,108 +1,110 @@ -__all__ = ['ConferenceNode'] - from application import log from application.notification import NotificationCenter, NotificationData from application.python.types import Singleton from eventlib.twistedutil import block_on, callInGreenThread from gnutls.interfaces.twisted import TLSContext, X509Credentials from twisted.internet import defer from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from thor.scheduler import KeepRunning import sylk from sylk.configuration import SIPConfig, ThorNodeConfig +__all__ = 'ConferenceNode', + + class ConferenceNode(EventServiceClient): __metaclass__ = Singleton + topics = ["Thor.Members"] def __init__(self): pass def connectionLost(self, connector, reason): """Called when an event server connection goes away""" self.connections.discard(connector.transport) def connectionFailed(self, connector, reason): """Called when an event server connection has an unrecoverable error""" connector.failed = True available_connectors = set(c for c in self.connectors if not c.failed) if not available_connectors: NotificationCenter().post_notification('ThorNetworkGotFatalError', sender=self) def start(self, roles): # Needs to be called from a green thread log.info('Publishing %s roles to SIPThor' % roles) self.node = ThorEntity(SIPConfig.local_ip.normalized, roles, version=sylk.__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True tls_context = TLSContext(credentials) EventServiceClient.__init__(self, ThorNodeConfig.domain, tls_context) def stop(self): # Needs to be called from a green thread self._shutdown() def _monitor_event_servers(self): def wrapped_func(): servers = self._get_event_servers() self._update_event_servers(servers) callInGreenThread(wrapped_func) return KeepRunning def _disconnect_all(self): for conn in self.connectors: conn.disconnect() def _shutdown(self): if self.disconnecting: return self.disconnecting = True self.dns_monitor.cancel() if self.advertiser: self.advertiser.cancel() if self.shutdown_message: self._publish(self.shutdown_message) requests = [conn.protocol.unsubscribe(*self.topics) for conn in self.connections] d = defer.DeferredList([request.deferred for request in requests]) block_on(d) self._disconnect_all() def handle_event(self, event): #print "Received event: %s" % event networks = self.networks role_map = ThorEntitiesRoleMap(event.message) # mapping between role names and lists of nodes with that role updated = False for role in self.node.roles + ('sip_proxy',): try: network = networks[role] except KeyError: from thor import network as thor_network network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network new_nodes = set([node.ip for node in role_map.get(role, [])]) old_nodes = set(network.nodes) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) plural = len(removed_nodes) != 1 and 's' or '' log.info("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes))) updated = True if added_nodes: for node in added_nodes: network.add_node(node) plural = len(added_nodes) != 1 and 's' or '' log.info("added %s node%s: %s" % (role, plural, ', '.join(added_nodes))) updated = True if updated: NotificationCenter().post_notification('ThorNetworkGotUpdate', sender=self, data=NotificationData(networks=self.networks)) diff --git a/sylk/server.py b/sylk/server.py index 69583bc..b83fba2 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,256 +1,255 @@ import os import sys from threading import Event from uuid import uuid4 from application import log from application.notification import NotificationCenter from application.python import Null from application.system import makedirs from eventlib import proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer from sipsimple.lookup import DNSManager from sipsimple.storage import MemoryStorage from sipsimple.threading import ThreadManager from sipsimple.threading.green import run_in_green_thread from sipsimple.video import VideoDevice from twisted.internet import reactor # Load stream extensions needed for integration with SIP SIMPLE SDK -import sylk.streams -del sylk.streams +import sylk.streams; del sylk.streams from sylk.accounts import DefaultAccount from sylk.applications import IncomingRequestHandler from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import TraceLogManager from sylk.session import SessionManager from sylk.web import WebServer class SylkServer(SIPApplication): def __init__(self): self.request_handler = Null self.thor_interface = Null self.web_server = Null self.options = Null self.stopping_event = Event() self.stop_event = Event() self.failed = False def start(self, options): self.options = options if self.options.enable_bonjour: ServerConfig.enable_bonjour = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, name='ThorNetworkGotFatalError') Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: super(SylkServer, self).start(MemoryStorage()) except Exception as e: log.fatal('Error starting SIP Application: %s' % e) sys.exit(1) def _initialize_core(self): # SylkServer needs to listen for extra events and request types notification_center = NotificationCenter() settings = SIPSimpleSettings() # initialize core options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP detect_sip_loops=False, udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # video video_codecs=list(settings.rtp.video_codec_list), enable_colorbar_device=True, # logging log_level=settings.logs.pjsip_level if settings.logs.trace_pjsip else 0, trace_sip=settings.logs.trace_sip, # events and requests to handle events={'conference': ['application/conference-info+xml'], 'presence': ['application/pidf+xml'], 'refer': ['message/sipfrag;version=2.0']}, incoming_events={'conference', 'presence'}, incoming_requests={'MESSAGE'}) notification_center.add_observer(self, sender=self.engine) self.engine.start(**options) @run_in_green_thread def _initialize_subsystems(self): notification_center = NotificationCenter() with self._lock: stop_pending = self._stop_pending if stop_pending: self.state = 'stopping' if stop_pending: notification_center.post_notification('SIPApplicationWillEnd', sender=self) reactor.stop() return account_manager = AccountManager() dns_manager = DNSManager() session_manager = SessionManager() settings = SIPSimpleSettings() # Initialize default account default_account = DefaultAccount() account_manager.default_account = default_account # initialize TLS self._initialize_tls() # initialize PJSIP internal resolver self.engine.set_nameservers(dns_manager.nameservers) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize video objects self.video_device = VideoDevice(u'Colorbar generator', settings.video.resolution, settings.video.framerate) # initialize instance id settings.instance_id = uuid4().urn settings.save() # initialize ZRTP cache makedirs(ServerConfig.spool_dir.normalized) self.engine.zrtp_cache = os.path.join(ServerConfig.spool_dir.normalized, 'zrtp.db') # initialize middleware components dns_manager.start() account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self) # start SylkServer components self.web_server = WebServer() self.web_server.start() self.request_handler = IncomingRequestHandler() self.request_handler.start() if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode self.thor_interface = ConferenceNode() thor_roles = [] if 'conference' in self.request_handler.application_registry: thor_roles.append('conference_server') if 'xmppgateway' in self.request_handler.application_registry: thor_roles.append('xmpp_gateway') if 'webrtcgateway' in self.request_handler.application_registry: thor_roles.append('webrtc_gateway') self.thor_interface.start(thor_roles) @run_in_green_thread def _shutdown_subsystems(self): dns_manager = DNSManager() account_manager = AccountManager() session_manager = SessionManager() # terminate all sessions p = proc.spawn(session_manager.stop) p.wait() # shutdown SylkServer components procs = [proc.spawn(self.web_server.stop), proc.spawn(self.request_handler.stop), proc.spawn(self.thor_interface.stop)] proc.waitall(procs) # shutdown other middleware components procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop)] proc.waitall(procs) # shutdown engine self.engine.stop() self.engine.join(timeout=5) # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _NH_AudioDevicesDidChange(self, notification): pass def _NH_DefaultAudioDeviceDidChange(self, notification): pass def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal('Could not set TLS options: %s' % notification.data.error) sys.exit(1) def _NH_SIPApplicationWillStart(self, notification): tracelog_manager = TraceLogManager() tracelog_manager.start() def _NH_SIPApplicationDidStart(self, notification): settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.info('SylkServer started, listening on:') for transport in settings.sip.transport_list: try: log.info(' %s:%d (%s)' % (local_ip, getattr(self.engine, '%s_port' % transport), transport.upper())) except TypeError: pass def _NH_SIPApplicationWillEnd(self, notification): self.stopping_event.set() def _NH_SIPApplicationDidEnd(self, notification): log.info('SIP application ended') tracelog_manager = TraceLogManager() tracelog_manager.stop() if not self.stopping_event.is_set(): log.warning('SIP application ended without shutting down all subsystems') self.stopping_event.set() self.stop_event.set() def _NH_SIPApplicationGotFatalError(self, notification): log.error('An exception occurred within the SIP core:\n%s\n' % notification.data.traceback) self.failed = True def _NH_SIPEngineDidFail(self, notification): log.error('SIP engine failed') self.failed = True super(SylkServer, self)._NH_SIPEngineDidFail(notification) def _NH_ThorNetworkGotFatalError(self, notification): log.error("All Thor Event Servers have unrecoverable errors.") diff --git a/sylk/session.py b/sylk/session.py index 0f52841..f7f9406 100644 --- a/sylk/session.py +++ b/sylk/session.py @@ -1,1978 +1,1986 @@ import random -from threading import RLock -from time import time - from application import log from application.notification import IObserver, Notification, NotificationCenter, NotificationData from application.python import Null, limit from application.python.decorator import decorator, preserve_signature from application.python.types import Singleton from application.system import host from eventlib import api, coros, proc from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, Invitation, Subscription, SIPCoreError, SIPCoreInvalidStateError, PJSIPError, sip_status_messages from sipsimple.core import ContactHeader, RouteHeader, FromHeader, ToHeader, ReasonHeader, WarningHeader from sipsimple.core import SIPURI, SDPConnection, SDPSession, SDPMediaStream from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import ParserError from sipsimple.payloads.conference import ConferenceDocument from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import ISOTimestamp +from threading import RLock +from time import time from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.configuration import SIPConfig class InvitationDisconnectedError(Exception): def __init__(self, invitation, data): self.invitation = invitation self.data = data + class MediaStreamDidNotInitializeError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data + class MediaStreamDidFailError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data + class SubscriptionError(Exception): def __init__(self, error, timeout, **attributes): self.error = error self.timeout = timeout self.attributes = attributes + class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data + class InterruptSubscription(Exception): pass + class TerminateSubscription(Exception): pass + class IllegalStateError(RuntimeError): pass + @decorator def transition_state(required_state, new_state): def state_transitioner(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): with obj._lock: if obj.state != required_state: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) obj.state = new_state return func(obj, *args, **kwargs) return wrapper return state_transitioner + @decorator def check_state(required_states): def state_checker(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): if obj.state not in required_states: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) return func(obj, *args, **kwargs) return wrapper return state_checker class ConferenceHandler(object): implements(IObserver) def __init__(self, session): self.session = session self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _activate(self): self.active = True command = Command('subscribe') self._command_channel.send(command) return command def _deactivate(self): self.active = False command = Command('unsubscribe') self._command_channel.send(command) return command def _resubscribe(self): command = Command('subscribe') self._command_channel.send(command) return command def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._deactivate() command = Command('terminate') self._command_channel.send(command) command.wait() self.session = None def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): command.signal() raise proc.ProcExit() def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) target_uri = SIPURI.new(self.session.remote_identity.uri) refresh_interval = getattr(command, 'refresh_interval', account.sip.subscribe_interval) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[route] except KeyError: continue subscription = Subscription(target_uri, FromHeader(SIPURI.new(self.session.local_identity.uri)), ToHeader(target_uri), ContactHeader(contact_uri), 'conference', RouteHeader(route.uri), credentials=account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) timeout = 5 raise SubscriptionError(error='Internal error', timeout=timeout) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail as e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time timeout = random.uniform(60, 120) raise SubscriptionError(error='Authentication failed', timeout=timeout) elif e.data.code == 423: # Get the value of the Min-Expires header timeout = random.uniform(60, 120) if e.data.min_expires is not None and e.data.min_expires > refresh_interval: raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires) else: raise SubscriptionError(error='Interval too short', timeout=timeout) elif e.data.code in (405, 406, 489, 1400): command.signal(e) return else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription timeout = random.uniform(60, 180) raise SubscriptionError(error='No more routes to try', timeout=timeout) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'conference' and notification.data.body: try: conference_info = ConferenceDocument.parse(notification.data.body) except ParserError: pass else: notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info)) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) except InterruptSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) except SubscriptionError as e: if 'min_expires' in e.attributes: command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires']) else: command = Command('subscribe', command.event) self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_SIPSessionDidStart(self, notification): if self.session.remote_focus: self._activate() @run_in_green_thread def _NH_SIPSessionDidFail(self, notification): self._terminate() @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): self._terminate() def _NH_SIPSessionDidRenegotiateStreams(self, notification): if self.session.remote_focus and not self.active: self._activate() elif not self.session.remote_focus and self.active: self._deactivate() def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._resubscribe() class Session(object): implements(IObserver) media_stream_timeout = 15 short_reinvite_timeout = 5 def __init__(self, account): self.account = account self.direction = None self.end_time = None self.on_hold = False self.proposed_streams = None self.route = None self.state = None self.start_time = None self.streams = None self.transport = None self.local_focus = False self.remote_focus = False self.greenlet = None self.conference = None self._channel = coros.queue() self._hold_in_progress = False self._invitation = None self._local_identity = None self._remote_identity = None self._lock = RLock() def init_incoming(self, invitation, data): remote_sdp = invitation.sdp.proposed_remote if not remote_sdp: invitation.send_response(488) return self.proposed_streams = [] for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, remote_sdp, index) except UnknownStreamError: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break if not self.proposed_streams: invitation.send_response(488) return if 'Replaces' in data.headers: invitation.send_response(403) return self.direction = 'incoming' self.state = 'incoming' self.transport = invitation.transport self._invitation = invitation self.conference = ConferenceHandler(self) if 'isfocus' in invitation.remote_contact_header.parameters: self.remote_focus = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=invitation) notification_center.post_notification('SIPSessionNewIncoming', self, NotificationData(streams=self.proposed_streams, headers=data.headers)) @transition_state(None, 'connecting') @run_in_green_thread def connect(self, from_header, to_header, route, streams, is_focus=False, contact_header=None, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') self.direction = 'outgoing' self.proposed_streams = streams self.route = route self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() self._local_identity = from_header self._remote_identity = to_header self.conference = ConferenceHandler(self) notification_center.add_observer(self, sender=self._invitation) notification_center.post_notification('SIPSessionNewOutgoing', self, NotificationData(streams=streams[:])) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 if contact_header is None: try: contact_uri = self.account.contact[self.route] except KeyError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e)) return else: contact_header = ContactHeader(contact_uri) if SIPConfig.advertised_ip not in (None, '0.0.0.0'): local_ip = SIPConfig.advertised_ip.normalized elif SIPConfig.local_ip not in (None, '0.0.0.0'): local_ip = SIPConfig.local_ip.normalized else: local_ip = contact_header.uri.host connection = SDPConnection(local_ip) local_sdp = SDPSession(local_ip, name=settings.user_agent) for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection local_sdp.media.append(media) route_header = RouteHeader(self.route.uri) if is_focus: contact_header.parameters['isfocus'] = None self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, extra_headers=extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self, ) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connected': if not connected: connected = True else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.end() return notification_center.post_notification('SIPSessionWillStart', self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) [self._channel.send(notification) for notification in invitation_notifications] while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connected': if not connected: connected = True else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=0, reason=None, error=error) except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator)) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason else: code = getattr(e.data, 'code', 0) reason = getattr(e.data, 'reason', 'Session disconnected') if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in self.streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() def _reinvite_after_ice(self): # This function does not do any error checking, it's designed to be called at the end of connect and ad self.state = 'sending_proposal' self.greenlet = api.getcurrent() local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for index, stream in enumerate(self.streams): local_sdp.media[index] = stream.get_local_media(remote_sdp=None, index=index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False try: with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for index, stream in enumerate(self.streams): stream.update(local_sdp, remote_sdp, index) else: return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': self.end() return except Exception: pass finally: self.state = 'connected' self.greenlet = None @check_state(['incoming', 'received_proposal']) @run_in_green_thread def send_ring_indication(self): try: self._invitation.send_response(180) except SIPCoreInvalidStateError: pass # The INVITE session might have already been cancelled; ignore the error @transition_state('incoming', 'accepting') @run_in_green_thread def accept(self, streams, is_focus=False, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() self.local_focus = is_focus connected = False unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') for stream in self.proposed_streams: if stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') self.proposed_streams = streams wait_count = len(self.proposed_streams) try: while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 remote_sdp = self._invitation.sdp.proposed_remote if SIPConfig.advertised_ip not in (None, '0.0.0.0'): local_ip = SIPConfig.advertised_ip.normalized elif SIPConfig.local_ip not in (None, '0.0.0.0'): local_ip = SIPConfig.local_ip.normalized else: sdp_connection = remote_sdp.connection or next(media.connection for media in remote_sdp.media if media.connection is not None) local_ip = host.outgoing_ip_for(sdp_connection.address) if sdp_connection.address != '0.0.0.0' else sdp_connection.address if local_ip is None: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address') return connection = SDPConnection(local_ip) local_sdp = SDPSession(local_ip, name=settings.user_agent) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection else: media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) contact_header = ContactHeader.new(self._invitation.local_contact_header) try: local_contact_uri = self.account.contact[self._invitation.transport] contact_header.uri = local_contact_uri except KeyError: pass if is_focus: contact_header.parameters['isfocus'] = None self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp, extra_headers=extra_headers) notification_center.post_notification('SIPSessionWillStart', sender=self) # Local and remote SDPs will be set after the 200 OK is sent while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) wait_count = 0 stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map.get(index, None) if stream is not None: if remote_media.port: wait_count += 1 stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): while wait_count > 0 or not connected or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() reason_header = None if isinstance(e, api.TimeoutError): if wait_count > 0: error = 'media stream timed out while starting' else: error = 'No ACK received' reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'Missing ACK' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream did not initialize' else: error = 'media stream failed: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream failed to start' self.start_time = ISOTimestamp.now() if self._invitation.state in ('incoming', 'early'): self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header) else: self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header) except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' if e.data.prev_state in ('incoming', 'early'): notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None)) elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK': notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote')) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason)) self.greenlet = None except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() @transition_state('incoming', 'terminating') @run_in_green_thread def reject(self, code=603, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'disconnected': break except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' self.proposed_streams = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) except SIPCoreError as e: self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) except api.TimeoutError: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' self.proposed_streams = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None)) else: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' self.proposed_streams = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None)) @transition_state('received_proposal', 'accepting_proposal') @run_in_green_thread def accept_proposal(self, streams): self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] streams = [stream for stream in streams if stream in self.proposed_streams] for stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') try: wait_count = len(streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 remote_sdp = self._invitation.sdp.proposed_remote connection = SDPConnection(local_sdp.address) stream_map = dict((stream.index, stream) for stream in streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection if index < len(local_sdp.media): local_sdp.media[index] = media else: local_sdp.media.append(media) elif index >= len(local_sdp.media): # actually == is sufficient media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) self._invitation.send_response(200, sdp=local_sdp) prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) received_invitation_state = False received_sdp_update = False while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) for stream in streams: # TODO: check if port is 0 in local_sdp. In that case PJSIP disabled the stream becuase it couldn't # negotiation failed. If there are more streams, however, the negotiation is considered successful as a # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind io # OK, but we cannot really start the stream. -Saul stream.start(local_sdp, remote_sdp, stream.index) with api.timeout(self.media_stream_timeout): wait_count = len(streams) while wait_count > 0 or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 else: unhandled_notifications.append(notification) except api.TimeoutError: self._fail_proposal(originator='remote', error='media stream timed out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='remote', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='remote', error='media stream failed: {.data.reason}'.format(e)) except InvitationDisconnectedError as e: self._fail_proposal(originator='remote', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.streams + streams proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() @transition_state('received_proposal', 'rejecting_proposal') @run_in_green_thread def reject_proposal(self, code=488, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1, None): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': break except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() def add_stream(self, stream): self.add_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def add_streams(self, streams): streams = list(set(streams).difference(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() unhandled_notifications = [] self.proposed_streams = streams for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': # This is actually the only reason for which this notification could be received if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self._fail_proposal(originator='local', error='received stream proposal') self.handle_notification(notification) return local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.proposed_streams: # Try to reuse a disabled media stream to avoid an ever-growing SDP try: index = next(index for index, media in enumerate(local_sdp.media) if media.port == 0) reuse_media = True except StopIteration: index = len(local_sdp.media) reuse_media = False stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if reuse_media: local_sdp.media[index] = media else: local_sdp.media.append(media) self._invitation.send_reinvite(sdp=local_sdp) notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:])) received_invitation_state = False received_sdp_update = False try: with api.timeout(settings.sip.invite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) else: self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True if notification.data.code >= 300: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.greenlet = None self.state = 'connected' proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) return elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.cancel_proposal() return accepted_streams = [] for stream in self.proposed_streams: try: remote_media = remote_sdp.media[stream.index] except IndexError: self._fail_proposal(originator='local', error='SDP media missing in answer') return else: if remote_media.port: stream.start(local_sdp, remote_sdp, stream.index) accepted_streams.append(stream) else: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): wait_count = len(accepted_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 except api.TimeoutError: self._fail_proposal(originator='local', error='media stream timed out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='local', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='local', error='media stream failed: {.data.reason}'.format(e)) except InvitationDisconnectedError as e: self._fail_proposal(originator='local', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError as e: self._fail_proposal(originator='local', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams += accepted_streams proposed_streams = self.proposed_streams self.proposed_streams = None any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in accepted_streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=accepted_streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=accepted_streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() def remove_stream(self, stream): self.remove_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def remove_streams(self, streams): streams = list(set(streams).intersection(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() self.streams.remove(stream) media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True if not (200 <= notification.data.code < 300): break elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: for stream in streams: stream.end() self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except (api.TimeoutError, MediaStreamDidFailError, SIPCoreError): for stream in streams: stream.end() self.end() else: for stream in streams: stream.end() self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=streams)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() @transition_state('sending_proposal', 'cancelling_proposal') @run_in_green_thread def cancel_proposal(self): if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': if notification.data.code == 487: proposed_streams = (self.proposed_streams or [])[:] for stream in proposed_streams: stream.deactivate() stream.end() self.state = 'connected' self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) elif notification.data.code == 200: self.end() elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError as e: proposed_streams = (self.proposed_streams or [])[:] for stream in proposed_streams: stream.deactivate() stream.end() self.greenlet = None self.state = 'connected' self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=proposed_streams)) except InvitationDisconnectedError as e: self.proposed_streams = None self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) else: self.proposed_streams = None self.greenlet = None self.state = 'connected' finally: if self._hold_in_progress: self._send_hold() @run_in_green_thread def hold(self): if self.on_hold or self._hold_in_progress: return self._hold_in_progress = True streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.hold() if self.state == 'connected': self._send_hold() @run_in_green_thread def unhold(self): if not self.on_hold and not self._hold_in_progress: return self._hold_in_progress = False streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.unhold() if self.state == 'connected': self._send_unhold() @run_in_green_thread def end(self): if self.state in (None, 'terminating', 'terminated'): return if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = None notification_center = NotificationCenter() if self._invitation is None or self._invitation.state is None: # The invitation was not yet constructed self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) return invitation_state = self._invitation.state if invitation_state in ('disconnecting', 'disconnected'): return self.greenlet = api.getcurrent() self.state = 'terminating' if invitation_state == 'connected': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local')) streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() cancelling = invitation_state != 'connected' and self.direction == 'outgoing' try: self._invitation.end(timeout=1) while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': break except SIPCoreError as e: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e))) except InvitationDisconnectedError as e: # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state == 'connected': self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) else: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request')) finally: for stream in streams: stream.end() notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' @property def local_identity(self): if self._invitation is not None and self._invitation.local_identity is not None: return self._invitation.local_identity else: return self._local_identity @property def peer_address(self): return self._invitation.peer_address if self._invitation is not None else None @property def remote_identity(self): if self._invitation is not None and self._invitation.remote_identity is not None: return self._invitation.remote_identity else: return self._remote_identity @property def remote_user_agent(self): return self._invitation.remote_user_agent if self._invitation is not None else None @property def call_id(self): return self._invitation.call_id if self._invitation is not None else None @property def request_uri(self): return self._invitation.request_uri if self._invitation is not None else None def _cancel_hold(self): notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': if notification.data.code == 200: self.end() return False elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError: pass except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return False return True def _send_hold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = True self.state = 'connected' hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams))) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._hold_in_progress = False else: for stream in self.streams: stream.unhold() self._send_unhold() def _send_unhold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = False self.state = 'connected' notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: for stream in self.streams: stream.hold() self._send_hold() def _fail(self, originator, code, reason, error, reason_header=None): notification_center = NotificationCenter() prev_inv_state = self._invitation.state self.state = 'terminating' if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'): notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator)) if self._invitation.state not in (None, 'disconnecting', 'disconnected'): try: if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'): if 400 <= code <= 699 and reason is not None: self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else []) else: self._invitation.end(extra_headers=[reason_header] if reason_header is not None else []) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': break except (api.TimeoutError, SIPCoreError): pass notification_center.remove_observer(self, sender=self._invitation) self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None)) self.greenlet = None def _fail_proposal(self, originator, error): notification_center = NotificationCenter() has_streams = bool(self.proposed_streams) for stream in self.proposed_streams: try: notification_center.remove_observer(self, sender=stream) except KeyError: # _fail_proposal can be called from reject_proposal, which means the stream will # not have been initialized or the session registered as an observer for it. pass else: stream.deactivate() stream.end() if originator == 'remote' and self._invitation.sub_state == 'received_proposal': try: self._invitation.send_response(488 if has_streams else 500) except SIPCoreError: pass notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:])) self.state = 'connected' self.proposed_streams = None self.greenlet = None @run_in_green_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPInvitationChangedState(self, notification): if self.state == 'terminated': return if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'): contact_header = notification.data.headers.get('Contact', None) if contact_header and 'isfocus' in contact_header[0].parameters: self.remote_focus = True if self.greenlet is not None: if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting': self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data)) else: self._channel.send(notification) else: self.greenlet = api.getcurrent() unhandled_notifications = [] try: if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self.state = 'received_proposal' try: proposed_remote_sdp = self._invitation.sdp.proposed_remote active_remote_sdp = self._invitation.sdp.active_remote if len(proposed_remote_sdp.media) < len(active_remote_sdp.media): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Streams cannot be deleted from the SDP')]) self.state = 'connected' return for stream in self.streams: if not stream.validate_update(proposed_remote_sdp, stream.index): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)]) self.state = 'connected' return added_media_indexes = set() removed_media_indexes = set() reused_media_indexes = set() for index, media_stream in enumerate(proposed_remote_sdp.media): if index >= len(active_remote_sdp.media): added_media_indexes.add(index) elif media_stream.port == 0 and active_remote_sdp.media[index].port > 0: removed_media_indexes.add(index) elif media_stream.port > 0 and active_remote_sdp.media[index].port == 0: reused_media_indexes.add(index) elif media_stream.media != active_remote_sdp.media[index].media: added_media_indexes.add(index) removed_media_indexes.add(index) if added_media_indexes | reused_media_indexes and removed_media_indexes: engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')]) self.state = 'connected' return elif added_media_indexes | reused_media_indexes: self.proposed_streams = [] for index in added_media_indexes | reused_media_indexes: media_stream = proposed_remote_sdp.media[index] if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index) except UnknownStreamError: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self._invitation.send_response(100) notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:])) else: self._invitation.send_response(488) self.state = 'connected' return else: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes] prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) for stream in removed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=proposed_remote_sdp, index=stream.index) try: self._invitation.send_response(200, sdp=local_sdp) except PJSIPError: for stream in removed_streams: self.streams.remove(stream) stream.end() if removed_streams: self.end() return else: try: self._invitation.send_response(488) except PJSIPError: self.end() return else: for stream in removed_streams: self.streams.remove(stream) stream.end() received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) if removed_media_indexes: notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams)) except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: self.end() else: self.state = 'connected' elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request': self.state = 'received_proposal_request' try: # An empty proposal was received, generate an offer self._invitation.send_response(100) local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip) if local_sdp.connection is not None: local_sdp.connection.address = connection_address for index, stream in enumerate(self.streams): stream.reset(index) media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is not None: media.connection.address = connection_address local_sdp.media[stream.index] = media self._invitation.send_response(200, sdp=local_sdp) received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: raise # FIXME else: self.state = 'connected' elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal': if notification.data.originator == 'local' and notification.data.code == 487: self.state = 'connected' proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() elif notification.data.state == 'disconnected': if self.state == 'incoming': self.state = 'terminated' if notification.data.originator == 'remote': notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: # There must have been an error involved notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator)) for stream in self.streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' self.end_time = ISOTimestamp.now() notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason)) notification.center.remove_observer(self, sender=self._invitation) finally: self.greenlet = None for notification in unhandled_notifications: self.handle_notification(notification) def _NH_SIPInvitationGotSDPUpdate(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_SIPInvitationTransferNewIncoming(self, notification): self._invitation.notify_transfer_progress(500) def _NH_RTPStreamDidEnableEncryption(self, notification): if notification.sender.type != 'audio': return audio_stream = notification.sender if audio_stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: video_stream = next(stream for stream in self.streams or [] if stream.type=='video') except StopIteration: return if video_stream.encryption.type == 'ZRTP' and not video_stream.encryption.active: video_stream.encryption.zrtp._enable(audio_stream) def _NH_MediaStreamDidStart(self, notification): stream = notification.sender if stream.type == 'audio' and stream.encryption.type == 'ZRTP': stream.encryption.zrtp._enable() elif stream.type == 'video' and stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: audio_stream = next(stream for stream in self.streams or [] if stream.type=='audio') except StopIteration: pass else: if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active: stream.encryption.zrtp._enable(audio_stream) if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidInitialize(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if self.greenlet is not None and self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data)) def _NH_MediaStreamDidFail(self, notification): if self.greenlet is not None: if self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) else: stream = notification.sender if self.streams == [stream]: self.end() else: try: self.remove_stream(stream) except IllegalStateError: self.end() class SessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = [] self.state = None self._channel = coros.queue() def start(self): self.state = 'starting' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillStart', sender=self) notification_center.add_observer(self, 'SIPInvitationChangedState') notification_center.add_observer(self, 'SIPSessionNewIncoming') notification_center.add_observer(self, 'SIPSessionNewOutgoing') notification_center.add_observer(self, 'SIPSessionDidFail') notification_center.add_observer(self, 'SIPSessionDidEnd') self.state = 'started' notification_center.post_notification('SIPSessionManagerDidStart', sender=self) def stop(self): self.state = 'stopping' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillEnd', sender=self) for session in self.sessions: session.end() while self.sessions: self._channel.wait() notification_center.remove_observer(self, 'SIPInvitationChangedState') notification_center.remove_observer(self, 'SIPSessionNewIncoming') notification_center.remove_observer(self, 'SIPSessionNewOutgoing') notification_center.remove_observer(self, 'SIPSessionDidFail') notification_center.remove_observer(self, 'SIPSessionDidEnd') self.state = 'stopped' notification_center.post_notification('SIPSessionManagerDidEnd', sender=self) @run_in_twisted_thread def handle_notification(self, notification): if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming': account_manager = AccountManager() account = account_manager.find_account(notification.data.request_uri) if account is None: account = DefaultAccount() notification.sender.send_response(100) session = Session(account) session.init_incoming(notification.sender, notification.data) elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'): self.sessions.append(notification.sender) elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): self.sessions.remove(notification.sender) if self.state == 'stopping': self._channel.send(notification) diff --git a/sylk/streams.py b/sylk/streams.py index 1e9bb99..b228ac9 100644 --- a/sylk/streams.py +++ b/sylk/streams.py @@ -1,382 +1,383 @@ + import random from collections import defaultdict from functools import partial from application.notification import NotificationCenter, NotificationData from eventlib import api from eventlib.coros import queue from eventlib.proc import spawn, ProcExit from msrplib.connect import DirectConnector, DirectAcceptor from msrplib.protocol import URI, FailureReportHeader, SuccessReportHeader, UseNicknameHeader from msrplib.session import contains_mime_type, MSRPSession from msrplib.transport import make_response from sipsimple.core import SDPAttribute from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, State, LastActive, Refresh, ContentType from sipsimple.streams import InvalidStreamError, UnknownStreamError from sipsimple.streams.msrp import MSRPStreamBase as _MSRPStreamBase, MSRPStreamError, NotificationProxyLogger from sipsimple.streams.msrp.chat import ChatStream as _ChatStream, ChatStreamError, ChatIdentity, Message, QueuedMessage, CPIMPayload, CPIMParserError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from sylk.configuration import SIPConfig @run_in_green_thread def MSRPStreamBase_initialize(self, session, direction): self.greenlet = api.getcurrent() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) try: self.session = session self.transport = self.session.account.msrp.transport outgoing = direction=='outgoing' logger = NotificationProxyLogger() if self.session.account.msrp.connection_model == 'relay': if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' elif not outgoing: if self.transport=='tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger) self.local_role = 'passive' else: # outgoing self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if not outgoing and self.transport=='tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' full_local_path = self.msrp_connector.prepare(local_uri=URI(host=SIPConfig.local_ip.normalized, port=0, use_tls=self.transport=='tls', credentials=self.session.account.tls_credentials)) self.local_media = self._create_local_media(full_local_path) except Exception as e: notification_center.post_notification('MediaStreamDidNotInitialize', self, NotificationData(reason=str(e))) else: self._initialized = True notification_center.post_notification('MediaStreamDidInitialize', self) finally: self.greenlet = None # Monkey-patch the initialize method (needed because we want every MSRP based stream to behave this way, including file transfers) # _MSRPStreamBase.initialize = MSRPStreamBase_initialize class ChatStream(_MSRPStreamBase): type = 'chat' priority = _ChatStream.priority + 1 msrp_session_class = MSRPSession media_type = 'message' accept_types = ['message/cpim'] accept_wrapped_types = ['*'] supported_chatroom_capabilities = ['nickname', 'private-messages', 'com.ag-projects.screen-sharing', 'com.ag-projects.zrtp-sas'] def __init__(self): super(ChatStream, self).__init__(direction='sendrecv') self.message_queue = queue() self.sent_messages = set() self.incoming_queue = defaultdict(list) self.message_queue_thread = None @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): remote_stream = remote_sdp.media[stream_index] if remote_stream.media != 'message': raise UnknownStreamError expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport=='tls' else 'TCP/MSRP' if remote_stream.transport != expected_transport: raise InvalidStreamError("expected %s transport in chat stream, got %s" % (expected_transport, remote_stream.transport)) if remote_stream.formats != ['*']: raise InvalidStreamError("wrong format list specified") stream = cls() stream.remote_role = remote_stream.attributes.getfirst('setup', 'active') if remote_stream.direction != 'sendrecv': raise InvalidStreamError("Unsupported direction for chat stream: %s" % remote_stream.direction) remote_accept_types = remote_stream.attributes.getfirst('accept-types') if remote_accept_types is None: raise InvalidStreamError("remote SDP media does not have 'accept-types' attribute") if not any(contains_mime_type(cls.accept_types, mime_type) for mime_type in remote_accept_types.split()): raise InvalidStreamError("no compatible media types found") return stream @property def local_identity(self): try: return ChatIdentity(self.session.local_identity.uri, self.session.account.display_name) except AttributeError: return None @property def remote_identity(self): try: return ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name) except AttributeError: return None @property def private_messages_allowed(self): return 'private-messages' in self.chatroom_capabilities @property def nickname_allowed(self): return 'nickname' in self.chatroom_capabilities @property def chatroom_capabilities(self): try: if self.session.local_focus: return ' '.join(self.local_media.attributes.getall('chatroom')).split() elif self.session.remote_focus: return ' '.join(self.remote_media.attributes.getall('chatroom')).split() except AttributeError: pass return [] def _NH_MediaStreamDidStart(self, notification): self.message_queue_thread = spawn(self._message_queue_handler) def _NH_MediaStreamDidNotInitialize(self, notification): message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream was closed') notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _NH_MediaStreamDidEnd(self, notification): if self.message_queue_thread is not None: self.message_queue_thread.kill() else: message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _create_local_media(self, uri_path): local_media = super(ChatStream, self)._create_local_media(uri_path) if self.session.local_focus and self.supported_chatroom_capabilities: local_media.attributes.append(SDPAttribute('chatroom', ' '.join(self.supported_chatroom_capabilities))) return local_media def _handle_REPORT(self, chunk): # in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message. if chunk.message_id in self.sent_messages: self.sent_messages.remove(chunk.message_id) notification_center = NotificationCenter() data = NotificationData(message_id=chunk.message_id, message=chunk, code=chunk.status.code, reason=chunk.status.comment) if chunk.status.code == 200: notification_center.post_notification('ChatStreamDidDeliverMessage', sender=self, data=data) else: notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _handle_SEND(self, chunk): # This ChatStream doesn't send MSRP REPORT chunks automatically, the developer needs to manually send them if chunk.size == 0: # keep-alive self.msrp_session.send_report(chunk, 200, 'OK') return if self.direction == 'sendonly': self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if chunk.content_type.lower() != 'message/cpim': self.incoming_queue.pop(chunk.message_id, None) self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') return if chunk.contflag == '#': self.incoming_queue.pop(chunk.message_id, None) self.msrp_session.send_report(chunk, 200, 'OK') return elif chunk.contflag == '+': self.incoming_queue[chunk.message_id].append(chunk.data) self.msrp_session.send_report(chunk, 200, 'OK') return else: data = ''.join(self.incoming_queue.pop(chunk.message_id, [])) + chunk.data try: payload = CPIMPayload.decode(data) except CPIMParserError: self.msrp_session.send_report(chunk, 400, 'CPIM Parser Error') return message = Message(**{name: getattr(payload, name) for name in Message.__slots__}) if not contains_mime_type(self.accept_wrapped_types, message.content_type): self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') return if message.timestamp is None: message.timestamp = ISOTimestamp.now() if message.sender is None: message.sender = self.remote_identity if payload.charset is not None: message.content = message.content.decode(payload.charset) private = self.session.remote_focus and len(message.recipients) == 1 and message.recipients[0] != self.remote_identity notification_center = NotificationCenter() if message.content_type.lower() == IsComposingDocument.content_type: try: data = IsComposingDocument.parse(message.content) except ParserError as e: self.msrp_session.send_report(chunk, 400, str(e)) return ndata = NotificationData(state=data.state.value, refresh=data.refresh.value if data.refresh is not None else 120, content_type=data.content_type.value if data.content_type is not None else None, last_active=data.last_active.value if data.last_active is not None else None, sender=message.sender, recipients=message.recipients, private=private, chunk=chunk) notification_center.post_notification('ChatStreamGotComposingIndication', self, ndata) else: ndata = NotificationData(message=message, private=private, chunk=chunk) notification_center.post_notification('ChatStreamGotMessage', self, ndata) def _handle_NICKNAME(self, chunk): nickname = chunk.headers['Use-Nickname'].decoded NotificationCenter().post_notification('ChatStreamGotNicknameRequest', self, NotificationData(nickname=nickname, chunk=chunk)) def _on_transaction_response(self, message_id, response): if message_id in self.sent_messages and response.code != 200: self.sent_messages.remove(message_id) data = NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment) NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _on_nickname_transaction_response(self, message_id, response): notification_center = NotificationCenter() if response.code == 200: notification_center.post_notification('ChatStreamDidSetNickname', sender=self, data=NotificationData(message_id=message_id, response=response)) else: notification_center.post_notification('ChatStreamDidNotSetNickname', sender=self, data=NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment)) def _message_queue_handler(self): notification_center = NotificationCenter() try: while True: message = self.message_queue.wait() if self.msrp_session is None: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) break try: if isinstance(message.content, unicode): message.content = message.content.encode('utf8') charset = 'utf8' else: charset = None message.sender = message.sender or self.local_identity message.recipients = message.recipients or [self.remote_identity] message.timestamp = message.timestamp or ISOTimestamp.now() payload = CPIMPayload(charset=charset, **{name: getattr(message, name) for name in Message.__slots__}) except ChatStreamError as e: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason=e.args[0]) notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) continue else: content, content_type = payload.encode() message_id = message.id notify_progress = message.notify_progress report = 'yes' if notify_progress else 'no' chunk = self.msrp_session.make_message(content, content_type=content_type, message_id=message_id) chunk.add_header(FailureReportHeader(report)) chunk.add_header(SuccessReportHeader(report)) try: self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_transaction_response, message_id)) except Exception as e: if notify_progress: data = NotificationData(message_id=message_id, message=None, code=0, reason=str(e)) notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) except ProcExit: if notify_progress: data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) raise else: if notify_progress: self.sent_messages.add(message_id) notification_center.post_notification('ChatStreamDidSendMessage', sender=self, data=NotificationData(message=chunk)) finally: self.message_queue_thread = None while self.sent_messages: message_id = self.sent_messages.pop() data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) @run_in_twisted_thread def _enqueue_message(self, message): if self._done: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) else: self.message_queue.send(message) @run_in_green_thread def _send_nickname_response(self, chunk, code, reason): response = make_response(chunk, code, reason) try: self.msrp_session.send_chunk(response) except Exception: pass def accept_nickname(self, chunk): if chunk.method != 'NICKNAME': raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method) self._send_nickname_response(chunk, 200, 'OK') def reject_nickname(self, chunk, code, reason): if chunk.method != 'NICKNAME': raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method) self._send_nickname_response(chunk, code, reason) def send_message(self, content, content_type='text/plain', sender=None, recipients=None, timestamp=None, additional_headers=None, message_id=None, notify_progress=True): message = QueuedMessage(content, content_type, sender=sender, recipients=recipients, timestamp=timestamp, additional_headers=additional_headers, id=message_id, notify_progress=notify_progress) self._enqueue_message(message) return message.id def send_composing_indication(self, state, refresh=None, last_active=None, sender=None, recipients=None, message_id=None, notify_progress=False): content = IsComposingDocument.create(state=State(state), refresh=Refresh(refresh) if refresh is not None else None, last_active=LastActive(last_active) if last_active is not None else None, content_type=ContentType('text')) message = QueuedMessage(content, IsComposingDocument.content_type, sender=sender, recipients=recipients, id=message_id, notify_progress=notify_progress) self._enqueue_message(message) return message.id @run_in_green_thread def _set_local_nickname(self, nickname, message_id): if self.msrp_session is None: # should we generate ChatStreamDidNotSetNickname here? return chunk = self.msrp.make_request('NICKNAME') chunk.add_header(UseNicknameHeader(nickname or u'')) try: self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_nickname_transaction_response, message_id)) except Exception as e: self._failure_reason = str(e) NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='sending', reason=self._failure_reason)) def set_local_nickname(self, nickname): if not self.nickname_allowed: raise ChatStreamError('Setting nickname is not supported') message_id = '%x' % random.getrandbits(64) self._set_local_nickname(nickname, message_id) return message_id diff --git a/sylk/tls.py b/sylk/tls.py index f6dc369..bbaefae 100644 --- a/sylk/tls.py +++ b/sylk/tls.py @@ -1,50 +1,49 @@ -"""TLS helper classes""" - -__all__ = ['Certificate', 'PrivateKey'] - -from gnutls.crypto import X509Certificate, X509PrivateKey - from application import log from application.process import process +from gnutls.crypto import X509Certificate, X509PrivateKey + + +__all__ = 'Certificate', 'PrivateKey' def file_content(file): path = process.config_file(file) if path is None: raise Exception("File '%s' does not exist" % file) try: f = open(path, 'rt') except Exception: raise Exception("File '%s' could not be open" % file) try: return f.read() finally: f.close() + class Certificate(object): """Configuration data type. Used to create a gnutls.crypto.X509Certificate object from a file given in the configuration file.""" def __new__(cls, value): if isinstance(value, basestring): try: return X509Certificate(file_content(value)) except Exception as e: log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e))) return None else: raise TypeError('value should be a string') + class PrivateKey(object): """Configuration data type. Used to create a gnutls.crypto.X509PrivateKey object from a file given in the configuration file.""" def __new__(cls, value): if isinstance(value, basestring): try: return X509PrivateKey(file_content(value)) except Exception as e: log.warn("Private key file '%s' could not be loaded: %s" % (value, str(e))) return None else: raise TypeError('value should be a string') - diff --git a/sylk/web.py b/sylk/web.py index 387e28b..f3bd268 100644 --- a/sylk/web.py +++ b/sylk/web.py @@ -1,91 +1,92 @@ -__all__ = ['Klein', 'StaticFileResource', 'WebServer', 'server'] - -import os - from application import log from application.python.types import Singleton from klein import Klein from twisted.internet import reactor from twisted.internet.ssl import DefaultOpenSSLContextFactory from twisted.web.resource import Resource, NoResource from twisted.web.server import Site from twisted.web.static import File from sylk import __version__ from sylk.configuration import WebServerConfig -# Set the 'Server' header string which Twisted Web will use +import os import twisted.web.server + + +__all__ = 'Klein', 'StaticFileResource', 'WebServer', 'server' + + +# Set the 'Server' header string which Twisted Web will use twisted.web.server.version = b'SylkServer/%s' % __version__ class StaticFileResource(File): def directoryListing(self): return NoResource('Directory listing not available') class RootResource(Resource): isLeaf = True def render_GET(self, request): request.setHeader('Content-Type', 'text/plain') return 'Welcome to SylkServer!' class WebServer(object): __metaclass__ = Singleton def __init__(self): self.base = Resource() self.base.putChild('', RootResource()) self.site = Site(self.base, logPath=os.devnull) self.site.noisy = False self.listener = None @property def url(self): return self.__dict__.get('url', '') def register_resource(self, path, resource): self.base.putChild(path, resource) def start(self): interface = WebServerConfig.local_ip port = WebServerConfig.local_port cert_path = WebServerConfig.certificate.normalized if WebServerConfig.certificate else None cert_chain_path = WebServerConfig.certificate_chain.normalized if WebServerConfig.certificate_chain else None if cert_path is not None: if not os.path.isfile(cert_path): log.error('Certificate file %s could not be found' % cert_path) return try: ssl_ctx_factory = DefaultOpenSSLContextFactory(cert_path, cert_path) except Exception: log.exception('Creating TLS context') return if cert_chain_path is not None: if not os.path.isfile(cert_chain_path): log.error('Certificate chain file %s could not be found' % cert_chain_path) return ssl_ctx = ssl_ctx_factory.getContext() try: ssl_ctx.use_certificate_chain_file(cert_chain_path) except Exception: log.exception('Setting TLS certificate chain file') return self.listener = reactor.listenSSL(port, self.site, ssl_ctx_factory, backlog=511, interface=interface) scheme = 'https' else: self.listener = reactor.listenTCP(port, self.site, backlog=511, interface=interface) scheme = 'http' port = self.listener.getHost().port self.__dict__['url'] = '%s://%s:%d' % (scheme, WebServerConfig.hostname or interface.normalized, port) log.info('Web server listening for requests on: %s' % self.url) def stop(self): if self.listener is not None: self.listener.stopListening() server = WebServer() -