diff --git a/sylk/applications/webrtcgateway/__init__.py b/sylk/applications/webrtcgateway/__init__.py index a0338e1..7ee2443 100644 --- a/sylk/applications/webrtcgateway/__init__.py +++ b/sylk/applications/webrtcgateway/__init__.py @@ -1,30 +1,38 @@ from sylk.applications import SylkApplication from sylk.applications.webrtcgateway.logger import log from sylk.applications.webrtcgateway.util import IdentityFormatter from sylk.applications.webrtcgateway.web import WebHandler +from sylk.applications.webrtcgateway.web.admin import AdminWebHandler +from sylk.applications.webrtcgateway.web.storage import TokenStorage class WebRTCGatewayApplication(SylkApplication): def __init__(self): self.web_handler = WebHandler() + self.admin_web_handler = AdminWebHandler() def start(self): self.web_handler.start() + self.admin_web_handler.start() + # Load tokens from storage + storage = TokenStorage() + storage.load() def stop(self): self.web_handler.stop() + self.admin_web_handler.stop() def incoming_session(self, session): log.msg(u'New incoming session %s from %s rejected' % (session.call_id, IdentityFormatter.format(session.remote_identity))) session.reject(403) def incoming_subscription(self, request, data): request.reject(405) def incoming_referral(self, request, data): request.reject(405) def incoming_message(self, request, data): request.reject(405) diff --git a/sylk/applications/webrtcgateway/configuration.py b/sylk/applications/webrtcgateway/configuration.py index 40b6871..17dcc00 100644 --- a/sylk/applications/webrtcgateway/configuration.py +++ b/sylk/applications/webrtcgateway/configuration.py @@ -1,127 +1,134 @@ import os import re from application.configuration import ConfigFile, ConfigSection, ConfigSetting -from application.configuration.datatypes import StringList +from application.configuration.datatypes import NetworkAddress, StringList from sylk.configuration import ServerConfig from sylk.configuration.datatypes import Path, SIPProxyAddress from sylk.resources import VarResources __all__ = ['get_room_config'] # Datatypes class AccessPolicyValue(str): allowed_values = ('allow,deny', 'deny,allow') def __new__(cls, value): value = re.sub('\s', '', value) if value not in cls.allowed_values: raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values)) return str.__new__(cls, value) class Domain(str): domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$") def __new__(cls, value): value = str(value) if not cls.domain_re.match(value): raise ValueError("illegal domain: %s" % value) return str.__new__(cls, value) class SIPAddress(str): def __new__(cls, address): address = str(address) address = address.replace('@', '%40', address.count('@')-1) try: username, domain = address.split('@') Domain(domain) except ValueError: raise ValueError("illegal SIP address: %s, must be in user@domain format" % address) return str.__new__(cls, address) class PolicySettingValue(list): def __init__(self, value): if isinstance(value, (tuple, list)): l = [str(x) for x in value] elif isinstance(value, basestring): if value.lower() in ('none', ''): return list.__init__(self, []) elif value.lower() in ('any', 'all', '*'): return list.__init__(self, ['*']) else: l = re.split(r'\s*,\s*', value) else: raise TypeError("value must be a string, list or tuple") values = [] for item in l: if '@' in item: values.append(SIPAddress(item)) else: values.append(Domain(item)) return list.__init__(self, values) def match(self, uri): if self == ['*']: return True (user, domain) = uri.split("@") uri = re.sub('^(sip:|sips:)', '', str(uri)) return uri in self or domain in self +class ManagementInterfaceAddress(NetworkAddress): + default_port = 20888 + + # Configuration objects class GeneralConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'General' web_origins = ConfigSetting(type=StringList, value=['*']) sip_domains = ConfigSetting(type=StringList, value=['*']) outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None) trace_websocket = False websocket_ping_interval = 120 recording_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'recordings'))) - - -class RoomConfig(ConfigSection): - __cfgfile__ = 'webrtcgateway.ini' - - record = False - access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny')) - allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all')) - deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none')) + http_management_interface = ConfigSetting(type=ManagementInterfaceAddress, value=ManagementInterfaceAddress('127.0.0.1')) + http_management_auth_secret = ConfigSetting(type=str, value=None) + firebase_server_key = ConfigSetting(type=str, value=None) class JanusConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'Janus' api_url = 'ws://127.0.0.1:8188' api_secret = '0745f2f74f34451c89343afcdcae5809' trace_janus = False +class RoomConfig(ConfigSection): + __cfgfile__ = 'webrtcgateway.ini' + + record = False + access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny')) + allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all')) + deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none')) + + class Configuration(object): def __init__(self, data): self.__dict__.update(data) def get_room_config(room): config_file = ConfigFile(RoomConfig.__cfgfile__) section = config_file.get_section(room) if section is not None: RoomConfig.read(section=room) config = Configuration(dict(RoomConfig)) RoomConfig.reset() else: # Apply general policy config = Configuration(dict(RoomConfig)) return config diff --git a/sylk/applications/webrtcgateway/models/sylkrtc.py b/sylk/applications/webrtcgateway/models/sylkrtc.py index 1ff432b..934ac42 100644 --- a/sylk/applications/webrtcgateway/models/sylkrtc.py +++ b/sylk/applications/webrtcgateway/models/sylkrtc.py @@ -1,191 +1,197 @@ __all__ = ['AccountAddRequest', 'AccountRemoveRequest', 'AccountRegisterRequest', 'AccountUnregisterRequest', 'SessionCreateRequest', 'SessionAnswerRequest', 'SessionTrickleRequest', 'SessionTerminateRequest', 'AckResponse', 'ErrorResponse', 'ReadyEvent'] import re from jsonmodels import models, fields, errors, validators from sipsimple.core import SIPURI, SIPCoreError SIP_PREFIX_RE = re.compile('^sips?:') class DefaultValueField(fields.BaseField): def __init__(self, value): self.default_value = value super(DefaultValueField, self).__init__() def validate(self, value): if value != self.default_value: raise errors.ValidationError('%s doesn\'t match the expected value %s' % (value, self.default_value)) def get_default_value(self): return self.default_value def URIValidator(value): account = SIP_PREFIX_RE.sub('', value) try: SIPURI.parse('sip:%s' % account) except SIPCoreError: raise errors.ValidationError('invalid account: %s' % value) def URIListValidator(values): for item in values: URIValidator(item) class OptionsValidator(object): def __init__(self, options): self.options = options def __call__(self, value): if value not in self.options: raise errors.ValidationError('invalid option: %s' % value) # Base models class SylkRTCRequestBase(models.Base): transaction = fields.StringField(required=True) class SylkRTCResponseBase(models.Base): transaction = fields.StringField(required=True) # Miscelaneous models class AckResponse(SylkRTCResponseBase): sylkrtc = DefaultValueField('ack') class ErrorResponse(SylkRTCResponseBase): sylkrtc = DefaultValueField('error') error = fields.StringField(required=True) class ICECandidate(models.Base): candidate = fields.StringField(required=True) sdpMLineIndex = fields.IntField(required=True) sdpMid = fields.StringField(required=True) class ReadyEvent(models.Base): sylkrtc = DefaultValueField('event') event = DefaultValueField('ready') # Account models class AccountRequestBase(SylkRTCRequestBase): account = fields.StringField(required=True, validators=[URIValidator]) class AccountAddRequest(AccountRequestBase): sylkrtc = DefaultValueField('account-add') password = fields.StringField(required=True, validators=[validators.Length(minimum_value=1, maximum_value=9999)]) display_name = fields.StringField(required=False) user_agent = fields.StringField(required=False) class AccountRemoveRequest(AccountRequestBase): sylkrtc = DefaultValueField('account-remove') class AccountRegisterRequest(AccountRequestBase): sylkrtc = DefaultValueField('account-register') class AccountUnregisterRequest(AccountRequestBase): sylkrtc = DefaultValueField('account-unregister') +class AccountDeviceTokenRequest(AccountRequestBase): + sylkrtc = DefaultValueField('account-devicetoken') + old_token = fields.StringField(required=False) + new_token = fields.StringField(required=True) + + # Session models class SessionRequestBase(SylkRTCRequestBase): session = fields.StringField(required=True) class SessionCreateRequest(SessionRequestBase): sylkrtc = DefaultValueField('session-create') account = fields.StringField(required=True, validators=[URIValidator]) uri = fields.StringField(required=True, validators=[URIValidator]) sdp = fields.StringField(required=True) class SessionAnswerRequest(SessionRequestBase): sylkrtc = DefaultValueField('session-answer') sdp = fields.StringField(required=True) class SessionTrickleRequest(SessionRequestBase): sylkrtc = DefaultValueField('session-trickle') candidates = fields.ListField([ICECandidate]) class SessionTerminateRequest(SessionRequestBase): sylkrtc = DefaultValueField('session-terminate') # VideoRoom models class VideoRoomRequestBase(SylkRTCRequestBase): session = fields.StringField(required=True) class VideoRoomJoinRequest(VideoRoomRequestBase): sylkrtc = DefaultValueField('videoroom-join') account = fields.StringField(required=True, validators=[URIValidator]) uri = fields.StringField(required=True, validators=[URIValidator]) sdp = fields.StringField(required=True) class VideoRoomControlTrickleRequest(models.Base): # ID for the subscriber session, if specified, otherwise the publisher is considered session = fields.StringField(required=False) candidates = fields.ListField([ICECandidate]) class VideoRoomControlFeedAttachRequest(models.Base): session = fields.StringField(required=True) publisher = fields.StringField(required=True) class VideoRoomControlFeedAnswerRequest(models.Base): session = fields.StringField(required=True) sdp = fields.StringField(required=True) class VideoRoomControlFeedDetachRequest(models.Base): session = fields.StringField(required=True) class VideoRoomControlInviteParticipantsRequest(models.Base): participants = fields.ListField([str, unicode], validators=[URIListValidator]) class VideoRoomControlRequest(VideoRoomRequestBase): sylkrtc = DefaultValueField('videoroom-ctl') option = fields.StringField(required=True, validators=[OptionsValidator(['trickle', 'feed-attach', 'feed-answer', 'feed-detach', 'invite-participants'])]) # all other options should have optional fields below, and the application needs to do a little validation trickle = fields.EmbeddedField(VideoRoomControlTrickleRequest, required=False) feed_attach = fields.EmbeddedField(VideoRoomControlFeedAttachRequest, required=False) feed_answer = fields.EmbeddedField(VideoRoomControlFeedAnswerRequest, required=False) feed_detach = fields.EmbeddedField(VideoRoomControlFeedDetachRequest, required=False) invite_participants = fields.EmbeddedField(VideoRoomControlInviteParticipantsRequest, required=False) class VideoRoomTerminateRequest(VideoRoomRequestBase): sylkrtc = DefaultValueField('videoroom-terminate') diff --git a/sylk/applications/webrtcgateway/web/admin.py b/sylk/applications/webrtcgateway/web/admin.py new file mode 100644 index 0000000..19f3d01 --- /dev/null +++ b/sylk/applications/webrtcgateway/web/admin.py @@ -0,0 +1,101 @@ + +import json + +from application.python.types import Singleton +from klein import Klein +from twisted.internet import reactor +from twisted.web.server import Site + +from sylk.applications.webrtcgateway.configuration import GeneralConfig +from sylk.applications.webrtcgateway.logger import log +from sylk.applications.webrtcgateway.web import push +from sylk.applications.webrtcgateway.web.storage import TokenStorage + +__all__ = ['AdminWebHandler'] + + +class AuthError(Exception): pass + + +class AdminWebHandler(object): + __metaclass__ = Singleton + + app = Klein() + + def __init__(self): + self.listener = None + + def start(self): + host, port = GeneralConfig.http_management_interface + self.listener = reactor.listenTCP(port, Site(self.app.resource()), interface=host) + log.msg('Admin web handler started at http://%s:%d' % (host, port)) + + def stop(self): + if self.listener is not None: + self.listener.stopListening() + self.listener = None + + # Admin web API + + def _check_auth(self, request): + auth_secret = GeneralConfig.http_management_auth_secret + if auth_secret: + auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None) + if not auth_headers or auth_headers[0] != auth_secret: + raise AuthError() + + @app.handle_errors(AuthError) + def auth_error(self, request, failure): + request.setResponseCode(403) + return 'Authentication error' + + @app.route('/incoming_session', methods=['POST']) + def incoming_session(self, request): + self._check_auth(request) + request.setHeader('Content-Type', 'application/json') + try: + data = json.load(request.content) + originator = data['originator'] + destination = data['destination'] + except Exception, e: + return json.dumps({'success': False, 'error': str(e)}) + else: + storage = TokenStorage() + tokens = storage[destination] + push.incoming_session(originator, destination, tokens) + return json.dumps({'success': True}) + + @app.route('/missed_session', methods=['POST']) + def missed_session(self, request): + self._check_auth(request) + request.setHeader('Content-Type', 'application/json') + try: + data = json.load(request.content) + originator = data['originator'] + destination = data['destination'] + except Exception, e: + return json.dumps({'success': False, 'error': str(e)}) + else: + storage = TokenStorage() + tokens = storage[destination] + push.missed_session(originator, destination, tokens) + return json.dumps({'success': True}) + + @app.route('/tokens/') + def get_tokens(self, request, account): + self._check_auth(request) + request.setHeader('Content-Type', 'application/json') + storage = TokenStorage() + tokens = storage[account] + return json.dumps({'tokens': list(tokens)}) + + @app.route('/tokens//', methods=['POST', 'DELETE']) + def process_token(self, request, account, token): + self._check_auth(request) + request.setHeader('Content-Type', 'application/json') + storage = TokenStorage() + if request.method == 'POST': + storage.add(account, token) + elif request.method == 'DELETE': + storage.remove(account, token) + return json.dumps({'success': True}) diff --git a/sylk/applications/webrtcgateway/web/factory.py b/sylk/applications/webrtcgateway/web/factory.py index 52c4be3..317a5d4 100644 --- a/sylk/applications/webrtcgateway/web/factory.py +++ b/sylk/applications/webrtcgateway/web/factory.py @@ -1,79 +1,80 @@ import weakref from application.notification import IObserver, NotificationCenter from application.python import Null +from application.python.types import Singleton from autobahn.twisted.websocket import WebSocketServerFactory from zope.interface import implements from sylk.applications.webrtcgateway.web.protocol import SylkWebSocketServerProtocol, SYLK_WS_PROTOCOL class VideoRoomContainer(object): def __init__(self): self._rooms = set() self._uri_map = weakref.WeakValueDictionary() self._id_map = weakref.WeakValueDictionary() def add(self, room): self._rooms.add(room) self._uri_map[room.uri] = room self._id_map[room.id] = room def remove(self, value): if isinstance(value, int): room = self._id_map.get(value, None) elif isinstance(value, basestring): room = self._uri_map.get(value, None) else: room = value self._rooms.discard(room) def clear(self): self._rooms.clear() def __len__(self): return len(self._rooms) def __iter__(self): return iter(self._rooms) def __getitem__(self, item): if isinstance(item, int): return self._id_map[item] elif isinstance(item, basestring): return self._uri_map[item] else: raise KeyError('%s not found' % item) def __contains__(self, item): return item in self._id_map or item in self._uri_map or item in self._rooms class SylkWebSocketServerFactory(WebSocketServerFactory): implements(IObserver) protocol = SylkWebSocketServerProtocol connections = set() videorooms = VideoRoomContainer() backend = None # assigned by WebHandler def __init__(self, *args, **kw): super(SylkWebSocketServerFactory, self).__init__(*args, **kw) notification_center = NotificationCenter() notification_center.add_observer(self, name='JanusBackendDisconnected') def buildProtocol(self, addr): protocol = self.protocol() protocol.factory = self protocol.backend = self.backend return protocol def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_JanusBackendDisconnected(self, notification): for conn in self.connections.copy(): conn.dropConnection(abort=True) self.videorooms.clear() diff --git a/sylk/applications/webrtcgateway/web/handler.py b/sylk/applications/webrtcgateway/web/handler.py index 381c583..611befd 100644 --- a/sylk/applications/webrtcgateway/web/handler.py +++ b/sylk/applications/webrtcgateway/web/handler.py @@ -1,1311 +1,1338 @@ import json import os import random import re import uuid import weakref from application.python import Null from application.system import makedirs from eventlib import coros, proc from eventlib.twistedutil import block_on from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor from sylk.applications.webrtcgateway.configuration import GeneralConfig, get_room_config from sylk.applications.webrtcgateway.logger import log from sylk.applications.webrtcgateway.models import sylkrtc from sylk.applications.webrtcgateway.util import GreenEvent +from sylk.applications.webrtcgateway.web.storage import TokenStorage SIP_PREFIX_RE = re.compile('^sips?:') class ACLValidationError(Exception): pass sylkrtc_models = { # account management 'account-add' : sylkrtc.AccountAddRequest, 'account-remove' : sylkrtc.AccountRemoveRequest, 'account-register' : sylkrtc.AccountRegisterRequest, 'account-unregister' : sylkrtc.AccountUnregisterRequest, + 'account-devicetoken' : sylkrtc.AccountDeviceTokenRequest, # session management 'session-create' : sylkrtc.SessionCreateRequest, 'session-answer' : sylkrtc.SessionAnswerRequest, 'session-trickle' : sylkrtc.SessionTrickleRequest, 'session-terminate' : sylkrtc.SessionTerminateRequest, # video conference management 'videoroom-join' : sylkrtc.VideoRoomJoinRequest, 'videoroom-ctl' : sylkrtc.VideoRoomControlRequest, 'videoroom-terminate' : sylkrtc.VideoRoomTerminateRequest, } class AccountInfo(object): def __init__(self, id, password, display_name=None, user_agent=None): self.id = id self.password = password self.display_name = display_name self.user_agent = user_agent self.registration_state = None self.janus_handle_id = None @property def uri(self): return 'sip:%s' % self.id class SessionPartyIdentity(object): def __init__(self, uri, display_name=''): self.uri = uri self.display_name = display_name class SIPSessionInfo(object): def __init__(self, id): self.id = id self.direction = None self.state = None self.account_id = None self.local_identity = None # instance of SessionPartyIdentity self.remote_identity = None # instance of SessionPartyIdentity self.janus_handle_id = None self.ice_media_negotiation_started = False self.ice_media_negotiation_ended = False def init_outgoing(self, account_id, destination): self.account_id = account_id self.direction = 'outgoing' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(destination) def init_incoming(self, account_id, originator, originator_display_name=''): self.account_id = account_id self.direction = 'incoming' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(originator, originator_display_name) class VideoRoom(object): def __init__(self, uri): self.config = get_room_config(uri) self.uri = uri self.record = self.config.record self.rec_dir = os.path.join(GeneralConfig.recording_dir, '%s/' % uri) self.id = random.getrandbits(32) # janus needs numeric room names self.destroyed = False self._session_id_map = weakref.WeakValueDictionary() self._publisher_id_map = weakref.WeakValueDictionary() if self.record: makedirs(self.rec_dir, 0755) log.msg('Video room %s created with recording in %s' % (self.uri, self.rec_dir)) else: log.msg('Video room %s created without recording' % self.uri) def add(self, session): assert session.publisher_id is not None assert session.id not in self._session_id_map assert session.publisher_id not in self._publisher_id_map self._session_id_map[session.id] = session self._publisher_id_map[session.publisher_id] = session log.msg('Video room %s: added session %s for %s' % (self.uri, session.id, session.account_id)) def __getitem__(self, key): try: return self._session_id_map[key] except KeyError: return self._publisher_id_map[key] def __len__(self): return len(self._session_id_map) class VideoRoomSessionInfo(object): def __init__(self, id): self.id = id self.account_id = None self.type = None # publisher / subscriber self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers self.janus_handle_id = None self.room = None self.parent_session = None self.feeds = {} # janus publisher ID -> our publisher ID self.ice_media_negotiation_started = False self.ice_media_negotiation_ended = False def initialize(self, account_id, type, room): assert type in ('publisher', 'subscriber') self.account_id = account_id self.type = type self.room = room log.msg('Video room %s: new session %s initialized by %s' % (self.room.uri, self.id, self.account_id)) def __repr__(self): return '<%s: id=%s janus_handle_id=%s type=%s>' % (self.__class__.__name__, self.id, self.janus_handle_id, self.type) class VideoRoomSessionContainer(object): def __init__(self): self._sessions = set() self._janus_handle_map = weakref.WeakValueDictionary() self._id_map = weakref.WeakValueDictionary() def add(self, session): assert session not in self._sessions assert session.janus_handle_id not in self._janus_handle_map assert session.id not in self._id_map self._sessions.add(session) self._janus_handle_map[session.janus_handle_id] = session self._id_map[session.id] = session def remove(self, session): self._sessions.discard(session) def count(self): return len(self._sessions) def clear(self): self._sessions.clear() def __len__(self): return len(self._sessions) def __iter__(self): return iter(self._sessions) def __getitem__(self, key): try: return self._id_map[key] except KeyError: return self._janus_handle_map[key] def __contains__(self, item): return item in self._id_map or item in self._janus_handle_map or item in self._sessions class Operation(object): __slots__ = ('name', 'data') def __init__(self, name, data): self.name = name self.data = data class APIError(Exception): pass class ConnectionHandler(object): def __init__(self, protocol): self.protocol = protocol self.janus_session_id = None self.accounts_map = {} # account ID -> account self.account_handles_map = {} # Janus handle ID -> account self.sessions_map = {} # session ID -> session self.session_handles_map = {} # Janus handle ID -> session self.videoroom_sessions = VideoRoomSessionContainer() # session ID / janus handle ID -> session self.ready_event = GreenEvent() self.resolver = DNSLookup() self.proc = proc.spawn(self._operations_handler) self.operations_queue = coros.queue() @property def end_point_address(self): return self.protocol.peer def start(self): self._create_janus_session() def stop(self): if self.ready_event.is_set(): assert self.janus_session_id is not None self.protocol.backend.janus_stop_keepalive(self.janus_session_id) self.protocol.backend.janus_destroy_session(self.janus_session_id) if self.proc is not None: self.proc.kill() self.proc = None # cleanup self.ready_event.clear() self.accounts_map.clear() self.account_handles_map.clear() self.sessions_map.clear() self.session_handles_map.clear() self.videoroom_sessions.clear() self.janus_session_id = None self.protocol = None def handle_message(self, data): try: request_type = data.pop('sylkrtc') except KeyError: log.warn('Error getting WebSocket message type') return self.ready_event.wait() try: model = sylkrtc_models[request_type] except KeyError: log.warn('Unknown request type: %s' % request_type) return request = model(**data) try: request.validate() except Exception, e: log.error('%s: %s' % (request_type, e)) if request.transaction: self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) return op = Operation(request_type, request) self.operations_queue.send(op) def validate_acl(self, room_uri, from_uri): cfg = get_room_config(room_uri) if cfg.access_policy == 'allow,deny': if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri): return raise ACLValidationError else: if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri): raise ACLValidationError # internal methods (not overriding / implementing the protocol API) def _send_response(self, response): response.validate() self._send_data(json.dumps(response.to_struct())) def _send_data(self, data): if GeneralConfig.trace_websocket: self.protocol.factory.ws_logger.msg("OUT", ISOTimestamp.now(), data) self.protocol.sendMessage(data, False) def _cleanup_session(self, session): @run_in_green_thread def do_cleanup(): if self.janus_session_id is None: # The connection was closed, there is noting to do here return self.sessions_map.pop(session.id) if session.direction == 'outgoing': # Destroy plugin handle for outgoing sessions. For incoming ones it's the # same as the account handle, so don't block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id)) self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None) self.session_handles_map.pop(session.janus_handle_id) # give it some time to receive other hangup events reactor.callLater(2, do_cleanup) def _cleanup_videoroom_session(self, session): @run_in_green_thread def do_cleanup(): if self.janus_session_id is None: # The connection was closed, there is noting to do here return if session in self.videoroom_sessions: self.videoroom_sessions.remove(session) block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id)) self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None) # give it some time to receive other hangup events reactor.callLater(2, do_cleanup) def _maybe_destroy_videoroom(self, videoroom): if videoroom is None: return @run_in_green_thread def f(): if self.protocol is None: # The connection was closed return # destroy the room if empty if not videoroom and not videoroom.destroyed: videoroom.destroyed = True self.protocol.factory.videorooms.remove(videoroom) # create a handle to do the cleanup handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom) data = {'request': 'destroy', 'room': videoroom.id} try: block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) except Exception: pass block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) log.msg('Video room %s destroyed' % videoroom.uri) # don't destroy it immediately reactor.callLater(5, f) @run_in_green_thread def _create_janus_session(self): if self.ready_event.is_set(): self._send_response(sylkrtc.ReadyEvent()) return try: self.janus_session_id = block_on(self.protocol.backend.janus_create_session()) self.protocol.backend.janus_start_keepalive(self.janus_session_id) except Exception, e: log.warn('Error creating session, disconnecting: %s' % e) self.protocol.disconnect(3000, unicode(e)) return self._send_response(sylkrtc.ReadyEvent()) self.ready_event.set() def _lookup_sip_proxy(self, account): sip_uri = SIPURI.parse('sip:%s' % account) # The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed # as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use # caching to avoid long delays, we randomize the results matching the highest priority route's # transport. proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: proxy_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: proxy_uri = SIPURI(host=sip_uri.host) settings = SIPSimpleSettings() routes = self.resolver.lookup_sip_proxy(proxy_uri, settings.sip.transport_list).wait() if not routes: raise DNSLookupError('no results found') # Get all routes with the highest priority transport and randomly pick one route = random.choice([r for r in routes if r.transport == routes[0].transport]) # Build a proxy URI Sofia-SIP likes return '%s:%s:%d%s' % ('sips' if route.transport == 'tls' else 'sip', route.address, route.port, ';transport=%s' % route.transport if route.transport != 'tls' else '') def _handle_conference_invite(self, originator, room_uri, participants): for p in participants: try: account_info = self.accounts_map[p] except KeyError: continue data = dict(sylkrtc='account_event', account=account_info.id, event='conference_invite', data=dict(originator=dict(uri=originator.id, display_name=originator.display_name), room=room_uri)) log.msg('Video room %s: invitation from %s to %s' % (room_uri, originator.id, account_info.id)) self._send_data(json.dumps(data)) def _handle_janus_event_sip(self, handle_id, event_type, event): # TODO: use a model op = Operation('janus-event-sip', data=dict(handle_id=handle_id, event_type=event_type, event=event)) self.operations_queue.send(op) def _handle_janus_event_videoroom(self, handle_id, event_type, event): # TODO: use a model op = Operation('janus-event-videoroom', data=dict(handle_id=handle_id, event_type=event_type, event=event)) self.operations_queue.send(op) def _operations_handler(self): while True: op = self.operations_queue.wait() handler = getattr(self, '_OH_%s' % op.name.replace('-', '_'), Null) try: handler(op.data) except Exception: log.exception('Unhandled exception in operation "%s"' % op.name) del op, handler def _OH_account_add(self, request): # extract the fields to avoid going through the descriptor several times account = request.account password = request.password display_name = request.display_name user_agent = request.user_agent try: if account in self.accounts_map: raise APIError('Account %s already added' % account) # check if domain is acceptable domain = account.partition('@')[2] if not {'*', domain}.intersection(GeneralConfig.sip_domains): raise APIError('SIP domain not allowed: %s' % domain) # Create and store our mapping account_info = AccountInfo(account, password, display_name, user_agent) self.accounts_map[account_info.id] = account_info except APIError, e: log.error('account_add: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Account %s added using %s at %s' % (account, user_agent, self.end_point_address)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_account_remove(self, request): # extract the fields to avoid going through the descriptor several times account = request.account try: try: account_info = self.accounts_map.pop(account) except KeyError: raise APIError('Unknown account specified: %s' % account) handle_id = account_info.janus_handle_id if handle_id is not None: block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) self.account_handles_map.pop(handle_id) except APIError, e: log.error('account_remove: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Account %s removed' % account) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_account_register(self, request): # extract the fields to avoid going through the descriptor several times account = request.account try: try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) try: proxy = self._lookup_sip_proxy(account) except DNSLookupError: raise APIError('DNS lookup error') handle_id = account_info.janus_handle_id if handle_id is not None: # Destroy the existing plugin handle block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) self.account_handles_map.pop(handle_id) account_info.janus_handle_id = None # Create a plugin handle handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip) account_info.janus_handle_id = handle_id self.account_handles_map[handle_id] = account_info data = {'request': 'register', 'username': account_info.uri, 'display_name': account_info.display_name, 'user_agent': account_info.user_agent, 'ha1_secret': account_info.password, 'proxy': proxy} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) except APIError, e: log.error('account-register: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Account %s will register using %s' % (account, account_info.user_agent)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_account_unregister(self, request): # extract the fields to avoid going through the descriptor several times account = request.account try: try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) handle_id = account_info.janus_handle_id if handle_id is not None: block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) account_info.janus_handle_id = None self.account_handles_map.pop(handle_id) except APIError, e: log.error('account-unregister: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Account %s will unregister' % account) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) + def _OH_account_devicetoken(self, request): + # extract the fields to avoid going through the descriptor several times + account = request.account + old_token = request.old_token + new_token = request.new_token + + try: + try: + account_info = self.accounts_map[account] + except KeyError: + raise APIError('Unknown account specified: %s' % account) + + storage = TokenStorage() + if old_token is not None: + storage.remove(account, old_token) + log.msg('Removed device token %s... for account %s', (old_token[:5], account)) + if new_token is not None: + storage.add(account, new_token) + log.msg('Added device token %s... for account %s', (new_token[:5], account)) + except APIError, e: + log.error('account-devicetoken: %s' % e) + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + else: + self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) + def _OH_session_create(self, request): # extract the fields to avoid going through the descriptor several times account = request.account session = request.session uri = request.uri sdp = request.sdp try: try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) if session in self.sessions_map: raise APIError('Session ID (%s) already in use' % session) # Create a new plugin handle and 'register' it, without actually doing so handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip) try: proxy = self._lookup_sip_proxy(account_info.id) except DNSLookupError: block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) raise APIError('DNS lookup error') data = {'request': 'register', 'username': account_info.uri, 'display_name': account_info.display_name, 'user_agent': account_info.user_agent, 'ha1_secret': account_info.password, 'proxy': proxy, 'send_register': False} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) session_info = SIPSessionInfo(session) session_info.janus_handle_id = handle_id session_info.init_outgoing(account, uri) # TODO: create a "SessionContainer" object combining the 2 self.sessions_map[session_info.id] = session_info self.session_handles_map[handle_id] = session_info data = {'request': 'call', 'uri': 'sip:%s' % SIP_PREFIX_RE.sub('', uri), 'srtp': 'sdes_optional'} jsep = {'type': 'offer', 'sdp': sdp} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep)) except APIError, e: log.error('session-create: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Outgoing session %s from %s to %s created using %s from %s' % (session, account, uri, account_info.user_agent, self.end_point_address)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_session_answer(self, request): # extract the fields to avoid going through the descriptor several times session = request.session sdp = request.sdp try: try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.direction != 'incoming': raise APIError('Cannot answer outgoing session') if session_info.state != 'connecting': raise APIError('Invalid state for session answer') data = {'request': 'accept'} jsep = {'type': 'answer', 'sdp': sdp} block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data, jsep)) except APIError, e: log.error('session-answer: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('%s answered session %s' % (session_info.account_id, session)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_session_trickle(self, request): # extract the fields to avoid going through the descriptor several times session = request.session candidates = [c.to_struct() for c in request.candidates] try: try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.state == 'terminated': raise APIError('Session is terminated') try: account_info = self.accounts_map[session_info.account_id] except KeyError: raise APIError('Unknown account specified: %s' % session_info.account_id) block_on(self.protocol.backend.janus_trickle(self.janus_session_id, session_info.janus_handle_id, candidates)) except APIError, e: #log.error('session-trickle: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: if candidates: if not session_info.ice_media_negotiation_started: log.msg('Session %s: ICE negotiation started by %s using %s' % (session_info.id, session_info.account_id, account_info.user_agent)) session_info.ice_media_negotiation_started = True else: log.msg('Session %s: ICE negotiation ended by %s using %s' % (session_info.id, session_info.account_id, account_info.user_agent)) session_info.ice_media_negotiation_ended = True self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_session_terminate(self, request): # extract the fields to avoid going through the descriptor several times session = request.session try: try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.state not in ('connecting', 'progress', 'accepted', 'established'): raise APIError('Invalid state for session terminate: \"%s\"' % session_info.state) if session_info.direction == 'incoming' and session_info.state == 'connecting': data = {'request': 'decline', 'code': 486} else: data = {'request': 'hangup'} block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data)) except APIError, e: log.error('session-terminate: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('%s terminated session %s' % (session_info.account_id, session)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_videoroom_join(self, request): account = request.account session = request.session uri = request.uri sdp = request.sdp videoroom = None try: try: self.validate_acl(uri, account) except ACLValidationError: raise APIError('%s is not allowed to join room %s' % (account, uri)) try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) if session in self.videoroom_sessions: raise APIError('Video room session ID (%s) already in use' % session) handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom) # create the room if it doesn't exist try: videoroom = self.protocol.factory.videorooms[uri] except KeyError: videoroom = VideoRoom(uri) self.protocol.factory.videorooms.add(videoroom) data = {'request': 'create', 'room': videoroom.id, 'publishers': 10, 'record': videoroom.record, 'rec_dir': videoroom.rec_dir } try: block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) except Exception, e: code = getattr(e, 'code', -1) if code != 427: # 417 == room exists block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) raise APIError(str(e)) # join the room data = {'request': 'joinandconfigure', 'room': videoroom.id, 'ptype': 'publisher', 'audio': True, 'video': True } if account_info.display_name: data['display'] = account_info.display_name jsep = {'type': 'offer', 'sdp': sdp} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep)) videoroom_session = VideoRoomSessionInfo(session) videoroom_session.janus_handle_id = handle_id videoroom_session.initialize(account, 'publisher', videoroom) self.videoroom_sessions.add(videoroom_session) except APIError, e: log.error('videoroom-join: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) self._maybe_destroy_videoroom(videoroom) else: log.msg('Video room %s: joined by %s using %s (%d participants present) from %s' % (videoroom.uri, account, account_info.user_agent, self.videoroom_sessions.count(), self.end_point_address)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='progress')) self._send_data(json.dumps(data)) def _OH_videoroom_ctl(self, request): if request.option == 'trickle': trickle = request.trickle if not trickle: log.error('videoroom-ctl: missing field') return candidates = [c.to_struct() for c in trickle.candidates] session = trickle.session or request.session try: try: videoroom_session = self.videoroom_sessions[session] except KeyError: raise APIError('trickle: unknown video room session ID specified: %s' % session) try: account_info = self.accounts_map[videoroom_session.account_id] except KeyError: raise APIError('Unknown account specified: %s' % videoroom_session.account_id) block_on(self.protocol.backend.janus_trickle(self.janus_session_id, videoroom_session.janus_handle_id, candidates)) except APIError, e: #log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: if candidates: if not videoroom_session.ice_media_negotiation_started: log.msg('Video room %s: ICE negotiation started by %s using %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent)) videoroom_session.ice_media_negotiation_started = True else: log.msg('Video room %s: ICE negotiation ended by %s using %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent)) videoroom_session.ice_media_negotiation_ended = True self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) elif request.option == 'feed-attach': feed_attach = request.feed_attach if not feed_attach: log.error('videoroom-ctl: missing field') return try: if feed_attach.session in self.videoroom_sessions: raise APIError('feed-attach: video room session ID (%s) already in use' % feed_attach.session) # get the 'base' session, the one used to join and publish try: base_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('feed-attach: unknown video room session ID specified: %s' % request.session) # get the publisher's session try: publisher_session = base_session.room[feed_attach.publisher] except KeyError: raise APIError('feed-attach: unknown publisher video room session ID specified: %s' % feed_attach.publisher) if publisher_session.publisher_id is None: raise APIError('feed-attach: video room session ID does not have a publisher ID' % feed_attach.publisher) handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom) # join the room as a listener data = {'request': 'join', 'room': base_session.room.id, 'ptype': 'listener', 'feed': publisher_session.publisher_id} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) videoroom_session = VideoRoomSessionInfo(feed_attach.session) videoroom_session.janus_handle_id = handle_id videoroom_session.parent_session = base_session videoroom_session.publisher_id = publisher_session.id videoroom_session.initialize(base_session.account_id, 'subscriber', base_session.room) self.videoroom_sessions.add(videoroom_session) base_session.feeds[publisher_session.publisher_id] = publisher_session.id except APIError, e: log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Video room %s: %s attached to %s' % (base_session.room.uri, base_session.account_id, feed_attach.publisher)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) elif request.option == 'feed-answer': feed_answer = request.feed_answer if not feed_answer: log.error('videoroom-ctl: missing field') return try: try: videoroom_session = self.videoroom_sessions[request.feed_answer.session] except KeyError: raise APIError('feed-answer: unknown video room session ID specified: %s' % feed_answer.session) data = {'request': 'start', 'room': videoroom_session.room.id} jsep = {'type': 'answer', 'sdp': feed_answer.sdp} block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data, jsep)) except APIError, e: log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) elif request.option == 'feed-detach': feed_detach = request.feed_detach if not feed_detach: log.error('videoroom-ctl: missing field') return try: try: base_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('feed-detach: unknown video room session ID specified: %s' % request.session) try: videoroom_session = self.videoroom_sessions[feed_detach.session] except KeyError: raise APIError('feed-detach: unknown video room session ID specified: %s' % feed_detach.session) data = {'request': 'leave'} block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data)) except APIError, e: log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) block_on(self.protocol.backend.janus_detach(self.janus_session_id, videoroom_session.janus_handle_id)) self.protocol.backend.janus_set_event_handler(videoroom_session.janus_handle_id, None) self.videoroom_sessions.remove(videoroom_session) try: janus_publisher_id = next(k for k, v in base_session.feeds.iteritems() if v == videoroom_session.publisher_id) except StopIteration: pass else: base_session.feeds.pop(janus_publisher_id) elif request.option == 'invite-participants': invite_participants = request.invite_participants if not invite_participants: log.error('videoroom-ctl: missing field') return try: try: base_session = self.videoroom_sessions[request.session] account_info = self.accounts_map[base_session.account_id] except KeyError: raise APIError('invite-participants: unknown video room session ID specified: %s' % request.session) except APIError, e: log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) for conn in self.protocol.factory.connections.difference([self]): if conn.connection_handler: conn.connection_handler._handle_conference_invite(account_info, base_session.room.uri, invite_participants.participants) else: log.error('videoroom-ctl: unsupported option: %s' % request.option) def _OH_videoroom_terminate(self, request): session = request.session try: try: videoroom_session = self.videoroom_sessions[session] except KeyError: raise APIError('Unknown video room session ID specified: %s' % session) data = {'request': 'leave'} block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data)) except APIError, e: log.error('videoroom-terminate: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Video room %s: %s left the room (%d participants present)' % (videoroom_session.room.uri, videoroom_session.account_id, self.videoroom_sessions.count())) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='terminated')) self._send_data(json.dumps(data)) self._cleanup_videoroom_session(videoroom_session) self._maybe_destroy_videoroom(videoroom_session.room) # Event handlers def _OH_janus_event_sip(self, data): handle_id = data['handle_id'] event_type = data['event_type'] event = data['event'] if event_type == 'event': self._janus_event_plugin_sip(data) elif event_type == 'webrtcup': try: session_info = self.session_handles_map[handle_id] except KeyError: log.msg('Could not find session for handle ID %s' % handle_id) return session_info.state = 'established' data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state)) direction = session_info.direction.title() log.msg('%s session %s from %s to %s state: %s' % (direction, session_info.id, session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri, session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri, session_info.state)) # TODO: SessionEvent model self._send_data(json.dumps(data)) elif event_type == 'hangup': try: session_info = self.session_handles_map[handle_id] except KeyError: log.msg('Could not find session for handle ID %s' % handle_id) return if session_info.state != 'terminated': session_info.state = 'terminated' code = event.get('code', 0) reason = event.get('reason', 'Unknown') reason = '%d %s' % (code, reason) data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state, reason=reason)) # TODO: SessionEvent model self._send_data(json.dumps(data)) self._cleanup_session(session_info) direction = session_info.direction.title() log.msg('%s session %s from %s to %s terminated (%s)' % (direction, session_info.id, session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri, session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri, reason)) elif event_type in ('media', 'detached'): # ignore pass elif event_type == 'slowlink': # TODO something pass else: log.warn('Received unexpected event type: %s' % event_type) def _janus_event_plugin_sip(self, data): handle_id = data['handle_id'] event = data['event'] plugin_data = event['plugindata'] assert(plugin_data['plugin'] == 'janus.plugin.sip') event_data = event['plugindata']['data'] assert(event_data.get('sip', '') == 'event') if 'result' not in event_data: log.warn('Unexpected event: %s' % event) return event_data = event_data['result'] jsep = event.get('jsep', None) event_type = event_data['event'] if event_type in ('registering', 'registered', 'registration_failed', 'incomingcall'): # skip 'registered' events from session handles if event_type == 'registered' and event_data['register_sent'] in (False, 'false'): return # account event try: account_info = self.account_handles_map[handle_id] except KeyError: log.warn('Could not find account for handle ID %s' % handle_id) return if event_type == 'incomingcall': originator_uri = SIP_PREFIX_RE.sub('', event_data['username']) originator_display_name = event_data.get('displayname', '').replace('"', '') jsep = event.get('jsep', None) assert jsep is not None session_id = uuid.uuid4().hex session = SIPSessionInfo(session_id) session.janus_handle_id = handle_id session.init_incoming(account_info.id, originator_uri, originator_display_name) self.sessions_map[session_id] = session self.session_handles_map[handle_id] = session data = dict(sylkrtc='account_event', account=account_info.id, session=session_id, event='incoming_session', data=dict(originator=session.remote_identity.__dict__, sdp=jsep['sdp'])) log.msg('Incoming session %s from %s to %s created' % (session.id, session.remote_identity.uri, session.local_identity.uri)) else: registration_state = event_type if registration_state == 'registration_failed': registration_state = 'failed' if account_info.registration_state == registration_state: return account_info.registration_state = registration_state registration_data = dict(state=registration_state) if registration_state == 'failed': code = event_data['code'] reason = event_data['reason'] registration_data['reason'] = '%d %s' % (code, reason) log.msg('Account %s registration failed: %s (%s)' % (account_info.id, code, reason)) elif registration_state == 'registered': log.msg('Account %s registered using %s from %s' % (account_info.id, account_info.user_agent, self.end_point_address)) data = dict(sylkrtc='account_event', account=account_info.id, event='registration_state', data=registration_data) # TODO: AccountEvent model self._send_data(json.dumps(data)) elif event_type in ('calling', 'accepted', 'hangup'): # session event try: session_info = self.session_handles_map[handle_id] except KeyError: log.warn('Could not find session for handle ID %s' % handle_id) return if event_type == 'hangup' and session_info.state == 'terminated': return if event_type == 'calling': session_info.state = 'progress' elif event_type == 'accepted': session_info.state = 'accepted' elif event_type == 'hangup': session_info.state = 'terminated' data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state)) direction = session_info.direction.title() log.msg('%s session %s from %s to %s state: %s' % (direction, session_info.id, session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri, session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri, session_info.state)) if session_info.state == 'accepted' and session_info.direction == 'outgoing': assert jsep is not None data['data']['sdp'] = jsep['sdp'] elif session_info.state == 'terminated': code = event_data.get('code', 0) reason = event_data.get('reason', 'Unknown') reason = '%d %s' % (code, reason) data['data']['reason'] = reason # TODO: SessionEvent model self._send_data(json.dumps(data)) if session_info.state == 'terminated': self._cleanup_session(session_info) direction = session_info.direction.title() log.msg('%s session %s from %s to %s terminated (%s)' % (direction, session_info.id, session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri, session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri, reason)) # check if missed incoming call if session_info.direction == 'incoming' and code == 487: data = dict(sylkrtc='account_event', account=session_info.account_id, event='missed_session', data=dict(originator=session_info.remote_identity.__dict__)) log.msg('Incoming session from %s to %s was not answered ' % (session_info.remote_identity.uri, session_info.local_identity.uri)) # TODO: AccountEvent model self._send_data(json.dumps(data)) elif event_type == 'missed_call': try: account_info = self.account_handles_map[handle_id] except KeyError: log.warn('Could not find account for handle ID %s' % handle_id) return originator_uri = SIP_PREFIX_RE.sub('', event_data['caller']) originator_display_name = event_data.get('displayname', '').replace('"', '') # We have no session, so create an identity object by hand originator = SessionPartyIdentity(originator_uri, originator_display_name) data = dict(sylkrtc='account_event', account=account_info.id, event='missed_session', data=dict(originator=originator.__dict__)) log.msg('Incoming session from %s missed' % originator.uri) # TODO: AccountEvent model self._send_data(json.dumps(data)) elif event_type in ('ack', 'declining', 'hangingup', 'proceeding'): # ignore pass else: log.warn('Unexpected SIP plugin event type: %s' % event_type) def _OH_janus_event_videoroom(self, data): handle_id = data['handle_id'] event_type = data['event_type'] if event_type == 'event': self._janus_event_plugin_videoroom(data) elif event_type == 'webrtcup': try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find videoroom session for handle ID %s' % handle_id) return base_session = videoroom_session.parent_session if base_session is None: data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='established')) else: # this is a subscriber session data = dict(sylkrtc='videoroom_event', session=base_session.id, event='feed_established', data=dict(state='established', subscription=videoroom_session.id)) log.msg('Video room %s: session established to %s' % (videoroom_session.room.uri, videoroom_session.account_id)) self._send_data(json.dumps(data)) elif event_type == 'hangup': try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find video room session for handle ID %s' % handle_id) return log.msg('Video room %s: session terminated to %s' % (videoroom_session.room.uri, videoroom_session.account_id)) self._cleanup_videoroom_session(videoroom_session) self._maybe_destroy_videoroom(videoroom_session.room) elif event_type in ('media', 'detached'): # ignore pass elif event_type == 'slowlink': try: videoroom_session = (session_info for session_info in self.videoroom_sessions if session_info.janus_handle_id == handle_id).next() except StopIteration: log.warn('Could not find video room session for Janus handle ID %s' % handle_id) else: try: account_info = self.accounts_map[videoroom_session.account_id] except KeyError: raise APIError('Unknown account specified: %s' % videoroom_session.account_id) try: uplink = data['event']['uplink'] except KeyError: log.warn('Could not find uplink in slowlink event data') else: if uplink: log.msg('Video room %s: %s has poor download connection on %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent)) else: log.msg('Video room %s: %s has poor upload connection on %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent)) else: log.warn('Received unexpected event type %s: data=%s' % (event_type, data)) def _janus_event_plugin_videoroom(self, data): handle_id = data['handle_id'] event = data['event'] plugin_data = event['plugindata'] assert(plugin_data['plugin'] == 'janus.plugin.videoroom') event_data = event['plugindata']['data'] assert 'videoroom' in event_data event_type = event_data['videoroom'] if event_type == 'joined': # a join request succeeded, this is a publisher try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find video room session for handle ID %s' % handle_id) return room = videoroom_session.room videoroom_session.publisher_id = event_data['id'] room.add(videoroom_session) jsep = event.get('jsep', None) assert jsep is not None data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='accepted', sdp=jsep['sdp'])) self._send_data(json.dumps(data)) # send information about existing publishers publishers = [] for p in event_data['publishers']: publisher_id = p['id'] publisher_display = p.get('display', '') try: publisher_session = room[publisher_id] except KeyError: log.warn('Could not find matching session for publisher %s' % publisher_id) continue item = { 'id': publisher_session.id, 'uri': publisher_session.account_id, 'display_name': publisher_display, } publishers.append(item) data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='initial_publishers', data=dict(publishers=publishers)) self._send_data(json.dumps(data)) elif event_type == 'event': if 'publishers' in event_data: try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find videoroom session for handle ID %s' % handle_id) return room = videoroom_session.room # send information about new publishers publishers = [] for p in event_data['publishers']: publisher_id = p['id'] publisher_display = p.get('display', '') try: publisher_session = room[publisher_id] except KeyError: log.warn('Could not find matching session for publisher %s' % publisher_id) continue item = { 'id': publisher_session.id, 'uri': publisher_session.account_id, 'display_name': publisher_display, } publishers.append(item) data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='publishers_joined', data=dict(publishers=publishers)) self._send_data(json.dumps(data)) elif 'leaving' in event_data: try: base_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find video room session for handle ID %s' % handle_id) return janus_publisher_id = event_data['leaving'] try: publisher_id = base_session.feeds.pop(janus_publisher_id) except KeyError: return data = dict(sylkrtc='videoroom_event', session=base_session.id, event='publishers_left', data=dict(publishers=[publisher_id])) self._send_data(json.dumps(data)) elif {'started', 'unpublished', 'left'}.intersection(event_data): # ignore pass else: log.warn('Received unexpected plugin "event" event') elif event_type == 'attached': # sent when a feed is subscribed for a given publisher try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find videoroom session for handle ID %s' % handle_id) return # get the session which originated the subscription base_session = videoroom_session.parent_session assert base_session is not None jsep = event.get('jsep', None) assert jsep is not None assert jsep['type'] == 'offer' data = dict(sylkrtc='videoroom_event', session=base_session.id, event='feed_attached', data=dict(sdp=jsep['sdp'], subscription=videoroom_session.id)) self._send_data(json.dumps(data)) elif event_type == 'slow_link': pass else: log.warn('Received unexpected plugin event type %s: plugin_data=%s, event_data=%s' % (event_type, plugin_data, event_data)) diff --git a/sylk/applications/webrtcgateway/web/push.py b/sylk/applications/webrtcgateway/web/push.py new file mode 100644 index 0000000..511da3c --- /dev/null +++ b/sylk/applications/webrtcgateway/web/push.py @@ -0,0 +1,78 @@ + +import json + +from sipsimple.util import ISOTimestamp +from twisted.internet import defer, reactor +from twisted.web.client import Agent +from twisted.web.iweb import IBodyProducer +from twisted.web.http_headers import Headers +from zope.interface import implementer + +from sylk.applications.webrtcgateway.configuration import GeneralConfig +from sylk.applications.webrtcgateway.logger import log + +__all__ = ['incoming_session', 'missed_session'] + + +agent = Agent(reactor) +headers = Headers({'User-Agent': ['SylkServer'], + 'Content-Type': ['application/json'], + 'Authorization': ['key=%s' % GeneralConfig.firebase_server_key]}) +FIREBASE_API_URL = 'https://fcm.googleapis.com/fcm/send' + + +@implementer(IBodyProducer) +class StringProducer(object): + def __init__(self, data): + self.body = data + self.length = len(data) + + def startProducing(self, consumer): + consumer.write(self.body) + return defer.succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + + +def incoming_session(originator, destination, tokens): + for token in tokens: + data = {'to': token, 'notification': {}, 'data': {'sylkrtc': {}}} + data['notification']['body'] = 'Incoming session from %s' % originator + data['priority'] = 'high' + data['time_to_live'] = 60 # don't deliver if phone is out for over a minute + data['data']['sylkrtc']['event'] = 'incoming_session' + data['data']['sylkrtc']['originator'] = originator + data['data']['sylkrtc']['destination'] = destination + data['data']['sylkrtc']['timestamp'] = str(ISOTimestamp.now()) + _send_push_notification(json.dumps(data)) + + +def missed_session(originator, destination, tokens): + for token in tokens: + data = {'to': token, 'notification': {}, 'data': {'sylkrtc': {}}} + data['notification']['body'] = 'Missed session from %s' % originator + data['priority'] = 'high' + # No TTL, default is 4 weeks + data['data']['sylkrtc']['event'] = 'missed_session' + data['data']['sylkrtc']['originator'] = originator + data['data']['sylkrtc']['destination'] = destination + data['data']['sylkrtc']['timestamp'] = str(ISOTimestamp.now()) + _send_push_notification(json.dumps(data)) + + +@defer.inlineCallbacks +def _send_push_notification(payload): + if GeneralConfig.firebase_server_key: + try: + r = yield agent.request('POST', FIREBASE_API_URL, headers, StringProducer(payload)) + except Exception, e: + log.msg('Error sending Firebase message: %s', e) + else: + if r.code != 200: + log.warn('Error sending Firebase message: %s' % r.phrase) + else: + log.warn('Cannot send push notification: no Firebase server key configured') diff --git a/sylk/applications/webrtcgateway/web/storage.py b/sylk/applications/webrtcgateway/web/storage.py new file mode 100644 index 0000000..c890405 --- /dev/null +++ b/sylk/applications/webrtcgateway/web/storage.py @@ -0,0 +1,43 @@ + +__all__ = ['TokenStorage'] + +import cPickle as pickle +import os + +from application.python.types import Singleton +from collections import defaultdict +from sipsimple.threading import run_in_thread + +from sylk.configuration import ServerConfig + + +class TokenStorage(object): + __metaclass__ = Singleton + + def __init__(self): + self._tokens = defaultdict(set) + + @run_in_thread('file-io') + def _save(self): + with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: + pickle.dump(self._tokens, f) + + @run_in_thread('file-io') + def load(self): + try: + tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) + except Exception: + pass + else: + self._tokens.update(tokens) + + def __getitem__(self, key): + return self._tokens[key] + + def add(self, account, token): + self._tokens[account].add(token) + self._save() + + def remove(self, account, token): + self._tokens[account].discard(token) + self._save() diff --git a/webrtcgateway.ini.sample b/webrtcgateway.ini.sample index 8b3ca9c..3254c95 100644 --- a/webrtcgateway.ini.sample +++ b/webrtcgateway.ini.sample @@ -1,48 +1,57 @@ ; SylkServer WebRTC gateway configuration file ; ; For the gateway to work Janus needs to be properly installed and configured, ; please refer to README.webrtc for detailed instructions ; [General] ; List of allowed web origins. The connection origin (Origin header in the ; HTTP request) will be checked against the list defined here, if the domain ; is no allowed the connection will be refused. ; * (the default) means any ; web_origins = * ; Proxy used for outbound SIP traffic ; outbound_sip_proxy = ; List of allowed SIP domains for managing accounts ; sip_domains = * ; Boolean indicating if the WebSocket messages sent to/from clients should be logged ; to a file ; trace_websocket = False ; WebSocket Ping frames are sent at the configured interval, this helps detect dead ; client connections ; websocket_ping_interval = 120 +; IP and port for the HTTP management interface +; http_management_interface = 127.0.0.1:20888 + +; Shared secret for the HTTP management interface (Authorization: key=THE_KEY) +; http_management_auth_secret = + +; Server key for Firebase Cloud Messaging +; firebase_server_key = + [Janus] ; URL pointing to the Janus API endpoint (only WebSocket is supported) ; api_url = ws://127.0.0.1:8188 ; API secret shared with Janus (must match the value in janus.cfg) ; A random UUID value is recommended, a new value can be generated with ; the following command: ; python -c 'import uuid; print(uuid.uuid4().hex)' api_secret = 0745f2f74f34451c89343afcdcae5809 ; Boolean indicating if the messages between SylkServer and Janus should be logged to ; a file ; trace_janus = False ; Per room configuration options ; [room1@videoconference.example.com] ; record = True ; access_policy = deny, allow ; deny = all ; allow = domain1.com, test1@example.com, test2@example.com