diff --git a/sylk/applications/webrtcgateway/configuration.py b/sylk/applications/webrtcgateway/configuration.py index 8696eb3..516926c 100644 --- a/sylk/applications/webrtcgateway/configuration.py +++ b/sylk/applications/webrtcgateway/configuration.py @@ -1,27 +1,28 @@ # Copyright (C) 2015 AG Projects. See LICENSE for details # from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import StringList from sylk.configuration.datatypes import SIPProxyAddress class GeneralConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'General' web_origins = ConfigSetting(type=StringList, value=['*']) sip_domains = ConfigSetting(type=StringList, value=['*']) outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None) trace_websocket = False + websocket_ping_interval = 120 class JanusConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'Janus' api_url = 'ws://127.0.0.1:8188' api_secret = '0745f2f74f34451c89343afcdcae5809' trace_janus = False diff --git a/sylk/applications/webrtcgateway/web/__init__.py b/sylk/applications/webrtcgateway/web/__init__.py index 48df4fb..12bfb67 100644 --- a/sylk/applications/webrtcgateway/web/__init__.py +++ b/sylk/applications/webrtcgateway/web/__init__.py @@ -1,91 +1,93 @@ # Copyright (C) 2015 AG Projects. See LICENSE for details # from application.python.types import Singleton from autobahn.twisted.resource import WebSocketResource from sylk import __version__ as sylk_version from sylk.applications.webrtcgateway.configuration import GeneralConfig, JanusConfig from sylk.applications.webrtcgateway.janus.backend import JanusBackend from sylk.applications.webrtcgateway.logger import log from sylk.applications.webrtcgateway.web.api import SYLK_WS_PROTOCOL, SylkWebSocketServerFactory from sylk.applications.webrtcgateway.websocket_logger import Logger as WSLogger from sylk.resources import Resources from sylk.web import Klein, StaticFileResource, server class WebRTCGatewayWeb(object): __metaclass__ = Singleton app = Klein() _resource = None _ws_resource = None def __init__(self, ws_factory): self._ws_resource = WebSocketResource(ws_factory) def resource(self): if self._resource is None: self._resource = self.app.resource() return self._resource @app.route('/') def index(self, request): path = Resources.get('html/webrtcgateway/index.html') r = StaticFileResource(path) r.isLeaf = True return r @app.route('/ws') def ws(self, request): return self._ws_resource @app.route('/js/', branch=True) def js(self, request): path = Resources.get('html/webrtcgateway/js/') return StaticFileResource(path) @app.route('/test/', branch=True) def test(self, request): path = Resources.get('html/webrtcgateway/test/') return StaticFileResource(path) class WebHandler(object): def __init__(self): self.backend = None self.factory = None self.resource = None self.web = None self.ws_logger = WSLogger() def start(self): ws_url = 'ws' + server.url[4:] + '/webrtcgateway/ws' self.factory = SylkWebSocketServerFactory(ws_url, protocols=[SYLK_WS_PROTOCOL], server='SylkServer/%s' % sylk_version, debug=False) - self.factory.setProtocolOptions(allowedOrigins=GeneralConfig.web_origins) + self.factory.setProtocolOptions(allowedOrigins=GeneralConfig.web_origins, + autoPingInterval=GeneralConfig.websocket_ping_interval, + autoPingTimeout=GeneralConfig.websocket_ping_interval/2) self.factory.ws_logger = self.ws_logger self.web = WebRTCGatewayWeb(self.factory) server.register_resource('webrtcgateway', self.web.resource()) log.msg('WebSocket handler started at %s' % ws_url) log.msg('Allowed web origins: %s' % ', '.join(GeneralConfig.web_origins)) log.msg('Allowed SIP domains: %s' % ', '.join(GeneralConfig.sip_domains)) log.msg('Using Janus API: %s' % JanusConfig.api_url) self.ws_logger.start() self.backend = JanusBackend() self.backend.start() self.factory.backend = self.backend def stop(self): if self.factory is not None: for conn in self.factory.connections.copy(): conn.failConnection() self.factory = None if self.backend is not None: self.backend.stop() self.backend = None self.ws_logger.stop() diff --git a/sylk/applications/webrtcgateway/web/api.py b/sylk/applications/webrtcgateway/web/api.py index 59a2d61..88b310c 100644 --- a/sylk/applications/webrtcgateway/web/api.py +++ b/sylk/applications/webrtcgateway/web/api.py @@ -1,781 +1,783 @@ # Copyright (C) 2015 AG Projects. See LICENSE for details # import json import random import re import uuid from application.python import Null from application.notification import IObserver, NotificationCenter from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory from autobahn.websocket import http from eventlib import coros, proc from eventlib.twistedutil import block_on from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor from zope.interface import implements from sylk.applications.webrtcgateway.configuration import GeneralConfig from sylk.applications.webrtcgateway.logger import log from sylk.applications.webrtcgateway.util import GreenEvent SYLK_WS_PROTOCOL = 'sylkRTC-1' SIP_PREFIX_RE = re.compile('^sips?:') class AccountInfo(object): def __init__(self, id, password): self.id = id self.password = password self.registration_state = None self.janus_handle_id = None @property def uri(self): return 'sip:%s' % self.id class SessionPartyIdentity(object): def __init__(self, uri, display_name=''): self.uri = uri self.display_name = display_name class SessionInfoBase(object): type = None # override in subclass def __init__(self, id): self.id = id self.direction = None self.state = None self.account_id = None self.local_identity = None # instance of SessionPartyIdentity self.remote_identity = None # instance of SessionPartyIdentity def init_outgoing(self, account_id, destination): self.account_id = account_id self.direction = 'outgoing' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(destination) def init_incoming(self, account_id, originator, originator_display_name=''): self.account_id = account_id self.direction = 'incoming' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(originator, originator_display_name) class JanusSessionInfo(SessionInfoBase): type = 'janus' def __init__(self, id): super(JanusSessionInfo, self).__init__(id) self.janus_handle_id = None class Operation(object): __slots__ = ('name', 'data') def __init__(self, name, data): self.name = name self.data = data class APIError(Exception): pass class SylkWebSocketServerProtocol(WebSocketServerProtocol): janus_session_id = None accounts_map = None # account ID -> account account_handles_map = None # Janus handle ID -> account sessions_map = None # session ID -> session session_handles_map = None # Janus handle ID -> session ready_event = None resolver = None proc = None operations_queue = None def onConnect(self, request): log.msg('Incoming connection from %s (origin %s)' % (request.peer, request.origin)) if SYLK_WS_PROTOCOL not in request.protocols: log.msg('Rejecting connection, remote does not support our sub-protocol') raise http.HttpException(http.NOT_ACCEPTABLE[0], 'No compatible protocol specified') if not self.backend.ready: log.msg('Rejecting connection, backend is not connected') raise http.HttpException(http.SERVICE_UNAVAILABLE[0], 'Backend is not connected') return SYLK_WS_PROTOCOL def onOpen(self): self.factory.connections.add(self) self.accounts_map = {} self.account_handles_map = {} self.sessions_map = {} self.session_handles_map = {} self.ready_event = GreenEvent() self.resolver = DNSLookup() self.proc = proc.spawn(self._operations_handler) self.operations_queue = coros.queue() self._create_janus_session() def onMessage(self, payload, isBinary): if isBinary: log.warn('Received invalid binary message') return if GeneralConfig.trace_websocket: self.factory.ws_logger.msg("IN", ISOTimestamp.now(), payload) try: data = json.loads(payload) except Exception, e: log.warn('Error parsing WebSocket payload: %s' % e) return try: request_type = data.pop('sylkrtc') except KeyError: log.warn('Error getting WebSocket message type') return self.ready_event.wait() op = Operation(request_type.lower(), data) self.operations_queue.send(op) def onClose(self, wasClean, code, reason): if self.ready_event is None: # Very early connection closed, onOpen wasn't even called return log.msg('Connection from %s closed' % self.transport.getPeer()) self.factory.connections.discard(self) if self.ready_event.is_set(): assert self.janus_session_id is not None self.backend.janus_stop_keepalive(self.janus_session_id) self.backend.janus_destroy_session(self.janus_session_id) if self.proc is not None: self.proc.kill() self.proc = None # cleanup self.ready_event.clear() self.accounts_map.clear() self.account_handles_map.clear() self.sessions_map.clear() self.session_handles_map.clear() self.janus_session_id = None def disconnect(self, code=1000, reason=u''): self.sendClose(code, reason) + # internal methods (not overriding / implementing the protocol API) + def _send_data(self, data): if GeneralConfig.trace_websocket: self.factory.ws_logger.msg("OUT", ISOTimestamp.now(), data) self.sendMessage(data, False) def _cleanup_session(self, session): @run_in_green_thread def do_cleanup(): if self.janus_session_id is None: # The connection was closed, there is noting to do here return self.sessions_map.pop(session.id) if session.direction == 'outgoing': # Destroy plugin handle for outgoing sessions. For incoming ones it's the # same as the account handle, so don't block_on(self.backend.janus_detach(self.janus_session_id, session.janus_handle_id)) self.backend.janus_set_event_handler(session.janus_handle_id, None) self.session_handles_map.pop(session.janus_handle_id) # give it some time to receive other hangup events reactor.callLater(2, do_cleanup) @run_in_green_thread def _create_janus_session(self): if self.ready_event.is_set(): data = dict(sylkrtc='event', event='ready') self._send_data(json.dumps(data)) return try: self.janus_session_id = block_on(self.backend.janus_create_session()) self.backend.janus_start_keepalive(self.janus_session_id) except Exception, e: log.warn('Error creating session, disconnecting: %s' % e) self.disconnect(3000, unicode(e)) return data = dict(sylkrtc='event', event='ready') self._send_data(json.dumps(data)) self.ready_event.set() def _lookup_sip_proxy(self, account): sip_uri = SIPURI.parse('sip:%s' % account) # The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed # as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use # caching to avoid long delays, we randomize the results matching the highest priority route's # transport. proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: proxy_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: proxy_uri = SIPURI(host=sip_uri.host) settings = SIPSimpleSettings() try: routes = self.resolver.lookup_sip_proxy(proxy_uri, settings.sip.transport_list).wait() except DNSLookupError, e: raise DNSLookupError('DNS lookup error: %s' % e) if not routes: raise DNSLookupError('DNS lookup error: no results found') # Get all routes with the highest priority transport and randomly pick one route = random.choice([r for r in routes if r.transport==routes[0].transport]) # Build a proxy URI Sofia-SIP likes return '%s:%s:%d%s' % ('sips' if route.transport=='tls' else 'sip', route.address, route.port, ';transport=%s' % route.transport if route.transport != 'tls' else '') def _handle_janus_event(self, handle_id, event_type, event): op = Operation('janus-event', data=dict(handle_id=handle_id, event_type=event_type, event=event)) self.operations_queue.send(op) def _operations_handler(self): while True: op = self.operations_queue.wait() handler = getattr(self, '_OH_%s' % op.name.replace('-', '_'), Null) try: handler(op.data) except Exception: log.err() del op del handler def _OH_account_add(self, data): transaction = data.get('transaction', None) if transaction is None: log.warn('Transaction not specified!') return try: try: account = data['account'] password = data['password'] except KeyError: raise APIError('Invalid parameters: "account" and "password" must be specified') if account in self.accounts_map: log.warn('Account %s already added' % account) data = dict(sylkrtc='error', transaction=transaction, error='Account already added') self._send_data(json.dumps(data)) return # Validate URI uri = 'sip:%s' % account try: sip_uri = SIPURI.parse(uri) except SIPCoreError: raise APIError('Invalid account specified: %s' % account) if not {'*', sip_uri.host}.intersection(GeneralConfig.sip_domains): raise APIError('SIP domain not allowed: %s' % sip_uri.host) # Create and store our mapping account_info = AccountInfo(account, password) self.accounts_map[account_info.id] = account_info data = dict(sylkrtc='ack', transaction=transaction) self._send_data(json.dumps(data)) log.msg('Account %s added' % account) except APIError, e: log.error('account_add: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) except Exception, e: log.error('Unexpected error in account_add: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) def _OH_account_remove(self, data): transaction = data.get('transaction', None) if transaction is None: log.warn('Transaction not specified!') return try: try: account = data['account'] except KeyError: raise APIError('Invalid parameters: "account" must be specified') try: account_info = self.accounts_map.pop(account) except KeyError: raise APIError('Unknown account specified: %s' % account) handle_id = account_info.janus_handle_id if handle_id is not None: block_on(self.backend.janus_detach(self.janus_session_id, handle_id)) self.backend.janus_set_event_handler(handle_id, None) self.account_handles_map.pop(handle_id) data = dict(sylkrtc='ack', transaction=transaction) self._send_data(json.dumps(data)) log.msg('Account %s removed' % account) except APIError, e: log.error('account_remove: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) except Exception, e: log.error('Unexpected error in account_remove: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) def _OH_account_register(self, data): transaction = data.get('transaction', None) if transaction is None: log.warn('Transaction not specified!') return try: try: account = data['account'] except KeyError: raise APIError('Invalid parameters: "account" must be specified') try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) proxy = self._lookup_sip_proxy(account) handle_id = account_info.janus_handle_id if handle_id is not None: # Destroy the existing plugin handle block_on(self.backend.janus_detach(self.janus_session_id, handle_id)) self.backend.janus_set_event_handler(handle_id, None) self.account_handles_map.pop(handle_id) account_info.janus_handle_id = None # Create a plugin handle handle_id = block_on(self.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) self.backend.janus_set_event_handler(handle_id, self._handle_janus_event) account_info.janus_handle_id = handle_id self.account_handles_map[handle_id] = account_info data = {'request': 'register', 'username': account_info.uri, 'ha1_secret': account_info.password, 'proxy': proxy} block_on(self.backend.janus_message(self.janus_session_id, handle_id, data)) data = dict(sylkrtc='ack', transaction=transaction) self._send_data(json.dumps(data)) log.msg('Account %s will register' % account) except APIError, e: log.error('account-register: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) except Exception, e: log.error('Unexpected error in account-register: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) def _OH_account_unregister(self, data): transaction = data.get('transaction', None) if transaction is None: log.warn('Transaction not specified!') return try: try: account = data['account'] except KeyError: raise APIError('Invalid parameters: "account" must be specified') try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) handle_id = account_info.janus_handle_id if handle_id is not None: block_on(self.backend.janus_detach(self.janus_session_id, handle_id)) self.backend.janus_set_event_handler(handle_id, None) account_info.janus_handle_id = None self.account_handles_map.pop(handle_id) data = dict(sylkrtc='ack', transaction=transaction) self._send_data(json.dumps(data)) log.msg('Account %s will unregister' % account) except APIError, e: log.error('account-unregister: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) except Exception, e: log.error('Unexpected error in account-unregister: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) def _OH_session_create(self, data): transaction = data.get('transaction', None) if transaction is None: log.warn('Transaction not specified!') return try: try: account = data['account'] session = data['session'] uri = data['uri'] sdp = data['sdp'] except KeyError: raise APIError('Invalid parameters: "account", "session", "uri" and "sdp" must be specified') try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) if session in self.sessions_map: raise APIError('Session ID (%s) already in use' % session) # Create a new plugin handle and 'register' it, without actually doing so handle_id = block_on(self.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) self.backend.janus_set_event_handler(handle_id, self._handle_janus_event) try: proxy = self._lookup_sip_proxy(account_info.id) except DNSLookupError: block_on(self.backend.janus_detach(self.janus_session_id, handle_id)) self.backend.janus_set_event_handler(handle_id, None) raise APIError('DNS lookup error') account_uri = 'sip:%s' % account_info.id data = {'request': 'register', 'username': account_uri, 'ha1_secret': account_info.password, 'proxy': proxy, 'send_register': False} block_on(self.backend.janus_message(self.janus_session_id, handle_id, data)) session_info = JanusSessionInfo(session) session_info.janus_handle_id = handle_id session_info.init_outgoing(account, uri) self.sessions_map[session_info.id] = session_info self.session_handles_map[handle_id] = session_info data = {'request': 'call', 'uri': 'sip:%s' % SIP_PREFIX_RE.sub('', uri)} jsep = {'type': 'offer', 'sdp': sdp} block_on(self.backend.janus_message(self.janus_session_id, handle_id, data, jsep)) data = dict(sylkrtc='ack', transaction=transaction) self._send_data(json.dumps(data)) log.msg('Outgoing session %s from %s to %s created' % (session, account, uri)) except APIError, e: log.error('session-create: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) except Exception, e: log.error('Unexpected error in session-create: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) def _OH_session_answer(self, data): transaction = data.get('transaction', None) if transaction is None: log.warn('Transaction not specified!') return try: try: session = data['session'] sdp = data['sdp'] except KeyError: raise APIError('Invalid parameters: "session" and "sdp" must be specified') try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.direction != 'incoming': raise APIError('Cannot answer outgoing session') if session_info.state != 'connecting': raise APIError('Invalid state for session answer') data = {'request': 'accept'} jsep = {'type': 'answer', 'sdp': sdp} block_on(self.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data, jsep)) data = dict(sylkrtc='ack', transaction=transaction) self._send_data(json.dumps(data)) log.msg('%s answered session %s' % (session_info.account_id, session)) except APIError, e: log.error('session-answer: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) except Exception, e: log.error('Unexpected error in session-answer: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) def _OH_session_trickle(self, data): transaction = data.get('transaction', None) if transaction is None: log.warn('Transaction not specified!') return try: try: session = data['session'] candidates = data['candidates'] except KeyError: raise APIError('Invalid parameters: "session" and "candidates" must be specified') try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.state == 'terminated': raise APIError('Session is terminated') block_on(self.backend.janus_trickle(self.janus_session_id, session_info.janus_handle_id, candidates)) data = dict(sylkrtc='ack', transaction=transaction) self._send_data(json.dumps(data)) log.msg('Trickled ICE candidate(s) for session %s' % session) except APIError, e: log.error('session-trickle: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) except Exception, e: log.error('Unexpected error in session-trickle: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) def _OH_session_terminate(self, data): transaction = data.get('transaction', None) if transaction is None: log.warn('Transaction not specified!') return try: try: session = data['session'] except KeyError: raise APIError('Invalid parameters: "session" must be specified') try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.state not in ('connecting', 'progress', 'accepted', 'established'): raise APIError('Invalid state for session terminate: \"%s\"' % session_info.state) if session_info.direction == 'incoming' and session_info.state == 'connecting': data = {'request': 'decline', 'code': 486} else: data = {'request': 'hangup'} block_on(self.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data)) data = dict(sylkrtc='ack', transaction=transaction) self._send_data(json.dumps(data)) log.msg('%s terminated session %s' % (session_info.account_id, session)) except APIError, e: log.error('session-terminate: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) except Exception, e: log.error('Unexpected error in session-terminate: %s' % e) data = dict(sylkrtc='error', transaction=transaction, error=str(e)) self._send_data(json.dumps(data)) # Event handlers def _OH_janus_event(self, data): handle_id = data['handle_id'] event_type = data['event_type'] event = data['event'] if event_type == 'event': event_data = event['plugindata']['data'] if 'result' in event_data: jsep = event.get('jsep', None) event_type = event_data['result']['event'] if event_type in ('registering', 'registered', 'registration_failed', 'incomingcall'): # skip 'registered' events from session handles if event_type == 'registered' and event_data['result']['register_sent'] in (False, 'false'): return # account event try: account_info = self.account_handles_map[handle_id] except KeyError: log.warn('Could not find account for handle ID %s' % handle_id) return if event_type == 'incomingcall': originator_uri = SIP_PREFIX_RE.sub('', event_data['result']['username']) originator_display_name = event_data['result'].get('displayname', '').replace('"', '') jsep = event.get('jsep', None) assert jsep is not None session_id = uuid.uuid4().hex session = JanusSessionInfo(session_id) session.janus_handle_id = handle_id session.init_incoming(account_info.id, originator_uri, originator_display_name) self.sessions_map[session_id] = session self.session_handles_map[handle_id] = session data = dict(sylkrtc='account_event', account=account_info.id, session=session_id, event='incoming_session', data=dict(originator=session.remote_identity.__dict__, sdp=jsep['sdp'])) log.msg('Incoming session %s %s <-> %s created' % (session.id, session.remote_identity.uri, session.local_identity.uri)) else: registration_state = event_type if registration_state == 'registration_failed': registration_state = 'failed' if account_info.registration_state == registration_state: return account_info.registration_state = registration_state registration_data = dict(state=registration_state) if registration_state == 'failed': code = event_data['result']['code'] reason = event_data['result']['reason'] registration_data['reason'] = '%d %s' % (code, reason) data = dict(sylkrtc='account_event', account=account_info.id, event='registration_state', data=registration_data) log.msg('Account %s registration state changed to %s' % (account_info.id, registration_state)) self._send_data(json.dumps(data)) elif event_type in ('calling', 'accepted', 'hangup'): # session event try: session_info = self.session_handles_map[handle_id] except KeyError: log.warn('Could not find session for handle ID %s' % handle_id) return if event_type == 'hangup' and session_info.state == 'terminated': return if event_type == 'calling': session_info.state = 'progress' elif event_type == 'accepted': session_info.state = 'accepted' elif event_type == 'hangup': session_info.state = 'terminated' data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state)) log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) if session_info.state == 'accepted' and session_info.direction == 'outgoing': assert jsep is not None data['data']['sdp'] = jsep['sdp'] elif session_info.state == 'terminated': code = event_data['result'].get('code', 0) reason = event_data['result'].get('reason', 'Unknown') reason = '%d %s' % (code, reason) data['data']['reason'] = reason self._send_data(json.dumps(data)) if session_info.state == 'terminated': self._cleanup_session(session_info) log.msg('%s session %s %s <-> %s terminated (%s)' % (session_info.direction.title(), session_info.id, session_info.local_identity.uri, session_info.remote_identity.uri, reason)) # check if missed incoming call if session_info.direction == 'incoming' and code == 487: data = dict(sylkrtc='account_event', account=session_info.account_id, event='missed_session', data=dict(originator=session_info.remote_identity.__dict__)) log.msg('Incoming session from %s missed' % session_info.remote_identity.uri) self._send_data(json.dumps(data)) elif event_type == 'missed_call': try: account_info = self.account_handles_map[handle_id] except KeyError: log.warn('Could not find account for handle ID %s' % handle_id) return originator_uri = SIP_PREFIX_RE.sub('', event_data['result']['caller']) originator_display_name = event_data['result'].get('displayname', '').replace('"', '') # We have no session, so create an identity object by hand originator = SessionPartyIdentity(originator_uri, originator_display_name) data = dict(sylkrtc='account_event', account=account_info.id, event='missed_session', data=dict(originator=originator.__dict__)) log.msg('Incoming session from %s missed' % originator.uri) self._send_data(json.dumps(data)) elif event_type in ('ack', 'declining', 'hangingup'): # ignore pass else: log.warn('Unexpected SIP plugin event type: %s' % event_type) else: log.warn('Unexpected event: %s' % event) elif event_type == 'webrtcup': try: session_info = self.session_handles_map[handle_id] except KeyError: log.msg('Could not find session for handle ID %s' % handle_id) return session_info.state = 'established' data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state)) log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) self._send_data(json.dumps(data)) elif event_type == 'hangup': try: session_info = self.session_handles_map[handle_id] except KeyError: log.msg('Could not find session for handle ID %s' % handle_id) return if session_info.state != 'terminated': session_info.state = 'terminated' code = event.get('code', 0) reason = event.get('reason', 'Unknown') reason = '%d %s' % (code, reason) data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state, reason=reason)) log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) self._send_data(json.dumps(data)) self._cleanup_session(session_info) log.msg('%s session %s %s <-> %s terminated (%s)' % (session_info.direction.title(), session_info.id, session_info.local_identity.uri, session_info.remote_identity.uri, reason)) elif event_type in ('media', 'detached'): # ignore pass else: log.warn('Received unexpected event type: %s' % event_type) class SylkWebSocketServerFactory(WebSocketServerFactory): implements(IObserver) protocol = SylkWebSocketServerProtocol connections = set() backend = None # assigned by WebHandler def __init__(self, *args, **kw): super(SylkWebSocketServerFactory, self).__init__(*args, **kw) notification_center = NotificationCenter() notification_center.add_observer(self, name='JanusBackendDisconnected') def buildProtocol(self, addr): protocol = self.protocol() protocol.factory = self protocol.backend = self.backend return protocol def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_JanusBackendDisconnected(self, notification): for conn in self.connections.copy(): conn.failConnection() diff --git a/webrtcgateway.ini.sample b/webrtcgateway.ini.sample index 59ca3c8..015f194 100644 --- a/webrtcgateway.ini.sample +++ b/webrtcgateway.ini.sample @@ -1,35 +1,39 @@ ; SylkServer WebRTC gateway configuration file ; ; For the gateway to work Janus needs to be properly installed and configured, ; please refer to README.webrtc for detailed instructions ; [General] ; List of allowed web origins. The connection origin (Origin header in the ; HTTP request) will be checked against the list defined here, if the domain ; is no allowed the connection will be refused. ; * (the default) means any ; web_origins = * ; Proxy used for outbound SIP traffic ; outbound_sip_proxy = ; List of allowed SIP domains for managing accounts ; sip_domains = * ; Boolean indicating if the WebSocket messages sent to/from clients should be logged ; to a file ; trace_websocket = False +; WebSocket Ping frames are sent at the configured interval, this helps detect dead +; client connections +; websocket_ping_interval = 120 + [Janus] ; URL pointing to the Janus API endpoint (only WebSocket is supported) ; api_url = ws://localhost:8188 ; API secret shared with Janus (must match the value in janus.cfg) ; A random UUID value is recommended, a new value can be generated with ; the following command: ; python -c 'import uuid; print(uuid.uuid4().hex)' api_secret = 0745f2f74f34451c89343afcdcae5809 ; Boolean indicating if the messages between SylkServer and Janus should be logged to ; a file ; trace_janus = False