diff --git a/debian/control b/debian/control index 5df65ea..587b590 100644 --- a/debian/control +++ b/debian/control @@ -1,35 +1,35 @@ Source: sylkserver Section: net Priority: optional Maintainer: Saul Ibarra Uploaders: Dan Pascu , Adrian Georgescu Build-Depends: debhelper (>= 7.3.5), dh-python, dh-systemd, python-all (>= 2.7) Standards-Version: 3.9.6 Package: sylkserver Architecture: all Depends: ${python:Depends}, ${misc:Depends}, python-application (>= 1.4.0), python-eventlib, python-lxml, python-sipsimple (>= 3.0.0), python-twisted, python-klein Suggests: libavahi-compat-libdnssd1, python-twisted-words, python-wokkel (>= 0.7.0), sylkserver-webrtc-gateway Recommends: sylkserver-sounds Description: A state of the art, extensible SIP Application Server SylkServer is an application server that can be programmed to perform SIP end-point applications and act as a gateway between SIP and XMPP domains. Package: sylkserver-sounds Architecture: all Depends: ${misc:Depends}, sylkserver Description: A state of the art, extensible SIP Application Server SylkServer is an application server that can be programmed to perform SIP end-point applications and act as a gateway between SIP and XMPP domains. . This package contains sounds used by SylkServer. Package: sylkserver-webrtc-gateway Architecture: all -Depends: ${misc:Depends}, sylkserver, janus (>= 0.1.0), python-autobahn (>= 0.10.3) +Depends: ${misc:Depends}, sylkserver, janus (>= 0.1.0), python-autobahn (>= 0.10.3), python-jsonmodels Description: A state of the art, extensible SIP Application Server SylkServer is an application server that can be programmed to perform SIP end-point applications and act as a gateway between SIP and XMPP domains. . This is a meta-package containing the dependencies required to run the WebRTC gateway application. diff --git a/sylk/applications/webrtcgateway/models/__init__.py b/sylk/applications/webrtcgateway/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sylk/applications/webrtcgateway/models/sylkrtc.py b/sylk/applications/webrtcgateway/models/sylkrtc.py new file mode 100644 index 0000000..80de8d8 --- /dev/null +++ b/sylk/applications/webrtcgateway/models/sylkrtc.py @@ -0,0 +1,111 @@ + +__all__ = ['AccountAddRequest', 'AccountRemoveRequest', 'AccountRegisterRequest', 'AccountUnregisterRequest', + 'SessionCreateRequest', 'SessionAnswerRequest', 'SessionTrickleRequest', 'SessionTerminateRequest', + 'AckResponse', 'ErrorResponse', + 'ReadyEvent'] + +import re + +from jsonmodels import models, fields, errors, validators +from sipsimple.core import SIPURI, SIPCoreError + +SIP_PREFIX_RE = re.compile('^sips?:') + + +class DefaultValueField(fields.BaseField): + def __init__(self, value): + self.default_value = value + super(DefaultValueField, self).__init__() + + def validate(self, value): + if value != self.default_value: + raise errors.ValidationError('%s doesn\'t match the expected value %s' % (value, self.default_value)) + + def get_default_value(self): + return self.default_value + + +def URIValidator(value): + account = SIP_PREFIX_RE.sub('', value) + try: + SIPURI.parse('sip:%s' % account) + except SIPCoreError: + raise errors.ValidationError('invalid account: %s' % value) + + +class SylkRTCRequestBase(models.Base): + transaction = fields.StringField(required=True) + + +class AccountRequestBase(SylkRTCRequestBase): + account = fields.StringField(required=True, + validators=[URIValidator]) + + +class AccountAddRequest(AccountRequestBase): + sylkrtc = DefaultValueField('account-add') + password = fields.StringField(required=True, + validators=[validators.Length(minimum_value=1, maximum_value=9999)]) + + +class AccountRemoveRequest(AccountRequestBase): + sylkrtc = DefaultValueField('account-remove') + + +class AccountRegisterRequest(AccountRequestBase): + sylkrtc = DefaultValueField('account-register') + + +class AccountUnregisterRequest(AccountRequestBase): + sylkrtc = DefaultValueField('account-unregister') + + +class SessionRequestBase(SylkRTCRequestBase): + session = fields.StringField(required=True) + + +class SessionCreateRequest(SessionRequestBase): + sylkrtc = DefaultValueField('session-create') + account = fields.StringField(required=True, + validators=[URIValidator]) + uri = fields.StringField(required=True, + validators=[URIValidator]) + sdp = fields.StringField(required=True) + + +class SessionAnswerRequest(SessionRequestBase): + sylkrtc = DefaultValueField('session-answer') + sdp = fields.StringField(required=True) + + +class ICECandidate(models.Base): + candidate = fields.StringField(required=True) + sdpMLineIndex = fields.IntField(required=True) + sdpMid = fields.StringField(required=True) + + +class SessionTrickleRequest(SessionRequestBase): + sylkrtc = DefaultValueField('session-trickle') + candidates = fields.ListField([ICECandidate]) + + +class SessionTerminateRequest(SessionRequestBase): + sylkrtc = DefaultValueField('session-terminate') + + +class SylkRTCResponseBase(models.Base): + transaction = fields.StringField(required=True) + + +class AckResponse(SylkRTCResponseBase): + sylkrtc = DefaultValueField('ack') + + +class ErrorResponse(SylkRTCResponseBase): + sylkrtc = DefaultValueField('error') + error = fields.StringField(required=True) + + +class ReadyEvent(models.Base): + sylkrtc = DefaultValueField('event') + event = DefaultValueField('ready') diff --git a/sylk/applications/webrtcgateway/web/api.py b/sylk/applications/webrtcgateway/web/api.py index 00b30a4..c408ed0 100644 --- a/sylk/applications/webrtcgateway/web/api.py +++ b/sylk/applications/webrtcgateway/web/api.py @@ -1,786 +1,740 @@ import json import random import re import uuid from application.python import Null from application.notification import IObserver, NotificationCenter from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory from eventlib import coros, proc from eventlib.twistedutil import block_on from sipsimple.configuration.settings import SIPSimpleSettings -from sipsimple.core import SIPURI, SIPCoreError +from sipsimple.core import SIPURI from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor from zope.interface import implements from sylk.applications.webrtcgateway.configuration import GeneralConfig from sylk.applications.webrtcgateway.logger import log +from sylk.applications.webrtcgateway.models import sylkrtc from sylk.applications.webrtcgateway.util import GreenEvent try: from autobahn.websocket.http import HttpException except ImportError: # AutoBahn 0.12 changed this from autobahn.websocket import ConnectionDeny as HttpException SYLK_WS_PROTOCOL = 'sylkRTC-1' SIP_PREFIX_RE = re.compile('^sips?:') +sylkrtc_models = { + 'account-add' : sylkrtc.AccountAddRequest, + 'account-remove' : sylkrtc.AccountRemoveRequest, + 'account-register' : sylkrtc.AccountRegisterRequest, + 'account-unregister' : sylkrtc.AccountUnregisterRequest, + 'session-create' : sylkrtc.SessionCreateRequest, + 'session-answer' : sylkrtc.SessionAnswerRequest, + 'session-trickle' : sylkrtc.SessionTrickleRequest, + 'session-terminate' : sylkrtc.SessionTerminateRequest, +} + class AccountInfo(object): def __init__(self, id, password): self.id = id self.password = password self.registration_state = None self.janus_handle_id = None @property def uri(self): return 'sip:%s' % self.id class SessionPartyIdentity(object): def __init__(self, uri, display_name=''): self.uri = uri self.display_name = display_name class SessionInfoBase(object): type = None # override in subclass def __init__(self, id): self.id = id self.direction = None self.state = None self.account_id = None self.local_identity = None # instance of SessionPartyIdentity self.remote_identity = None # instance of SessionPartyIdentity def init_outgoing(self, account_id, destination): self.account_id = account_id self.direction = 'outgoing' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(destination) def init_incoming(self, account_id, originator, originator_display_name=''): self.account_id = account_id self.direction = 'incoming' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(originator, originator_display_name) class JanusSessionInfo(SessionInfoBase): type = 'janus' def __init__(self, id): super(JanusSessionInfo, self).__init__(id) self.janus_handle_id = None class Operation(object): __slots__ = ('name', 'data') def __init__(self, name, data): self.name = name self.data = data class APIError(Exception): pass -class SylkWebSocketServerProtocol(WebSocketServerProtocol): - janus_session_id = None - accounts_map = None # account ID -> account - account_handles_map = None # Janus handle ID -> account - sessions_map = None # session ID -> session - session_handles_map = None # Janus handle ID -> session - ready_event = None - resolver = None - proc = None - operations_queue = None - - def onConnect(self, request): - log.msg('Incoming connection from %s (origin %s)' % (request.peer, request.origin)) - if SYLK_WS_PROTOCOL not in request.protocols: - log.msg('Rejecting connection, remote does not support our sub-protocol') - raise HttpException(406, 'No compatible protocol specified') - if not self.backend.ready: - log.msg('Rejecting connection, backend is not connected') - raise HttpException(503, 'Backend is not connected') - return SYLK_WS_PROTOCOL - - def onOpen(self): - self.factory.connections.add(self) - self.accounts_map = {} - self.account_handles_map = {} - self.sessions_map = {} - self.session_handles_map = {} +class ConnectionHandler(object): + def __init__(self, protocol): + self.protocol = protocol + self.janus_session_id = None + self.accounts_map = {} # account ID -> account + self.account_handles_map = {} # Janus handle ID -> account + self.sessions_map = {} # session ID -> session + self.session_handles_map = {} # Janus handle ID -> session self.ready_event = GreenEvent() self.resolver = DNSLookup() self.proc = proc.spawn(self._operations_handler) self.operations_queue = coros.queue() - self._create_janus_session() - def onMessage(self, payload, isBinary): - if isBinary: - log.warn('Received invalid binary message') - return - if GeneralConfig.trace_websocket: - self.factory.ws_logger.msg("IN", ISOTimestamp.now(), payload) - try: - data = json.loads(payload) - except Exception, e: - log.warn('Error parsing WebSocket payload: %s' % e) - return - try: - request_type = data.pop('sylkrtc') - except KeyError: - log.warn('Error getting WebSocket message type') - return - self.ready_event.wait() - op = Operation(request_type.lower(), data) - self.operations_queue.send(op) + def start(self): + self._create_janus_session() - def onClose(self, wasClean, code, reason): - if self.ready_event is None: - # Very early connection closed, onOpen wasn't even called - return - log.msg('Connection from %s closed' % self.transport.getPeer()) - self.factory.connections.discard(self) + def stop(self): if self.ready_event.is_set(): assert self.janus_session_id is not None - self.backend.janus_stop_keepalive(self.janus_session_id) - self.backend.janus_destroy_session(self.janus_session_id) + self.protocol.backend.janus_stop_keepalive(self.janus_session_id) + self.protocol.backend.janus_destroy_session(self.janus_session_id) if self.proc is not None: self.proc.kill() self.proc = None # cleanup self.ready_event.clear() self.accounts_map.clear() self.account_handles_map.clear() self.sessions_map.clear() self.session_handles_map.clear() self.janus_session_id = None + self.protocol = None - def disconnect(self, code=1000, reason=u''): - self.sendClose(code, reason) + def handle_message(self, data): + try: + request_type = data.pop('sylkrtc') + except KeyError: + log.warn('Error getting WebSocket message type') + return + self.ready_event.wait() + try: + model = sylkrtc_models[request_type] + except KeyError: + log.warn('Unknown request type: %s' % request_type) + return + request = model(**data) + try: + request.validate() + except Exception, e: + log.error('%s: %s' % (request_type, e)) + if request.transaction: + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + return + op = Operation(request_type, request) + self.operations_queue.send(op) # internal methods (not overriding / implementing the protocol API) + def _send_response(self, response): + response.validate() + self._send_data(json.dumps(response.to_struct())) + def _send_data(self, data): if GeneralConfig.trace_websocket: - self.factory.ws_logger.msg("OUT", ISOTimestamp.now(), data) - self.sendMessage(data, False) + self.protocol.factory.ws_logger.msg("OUT", ISOTimestamp.now(), data) + self.protocol.sendMessage(data, False) def _cleanup_session(self, session): @run_in_green_thread def do_cleanup(): if self.janus_session_id is None: # The connection was closed, there is noting to do here return self.sessions_map.pop(session.id) if session.direction == 'outgoing': # Destroy plugin handle for outgoing sessions. For incoming ones it's the # same as the account handle, so don't - block_on(self.backend.janus_detach(self.janus_session_id, session.janus_handle_id)) - self.backend.janus_set_event_handler(session.janus_handle_id, None) + block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id)) + self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None) self.session_handles_map.pop(session.janus_handle_id) # give it some time to receive other hangup events reactor.callLater(2, do_cleanup) @run_in_green_thread def _create_janus_session(self): if self.ready_event.is_set(): - data = dict(sylkrtc='event', event='ready') - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ReadyEvent()) return try: - self.janus_session_id = block_on(self.backend.janus_create_session()) - self.backend.janus_start_keepalive(self.janus_session_id) + self.janus_session_id = block_on(self.protocol.backend.janus_create_session()) + self.protocol.backend.janus_start_keepalive(self.janus_session_id) except Exception, e: log.warn('Error creating session, disconnecting: %s' % e) - self.disconnect(3000, unicode(e)) + self.protocol.disconnect(3000, unicode(e)) return - data = dict(sylkrtc='event', event='ready') - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ReadyEvent()) self.ready_event.set() def _lookup_sip_proxy(self, account): sip_uri = SIPURI.parse('sip:%s' % account) # The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed # as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use # caching to avoid long delays, we randomize the results matching the highest priority route's # transport. proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: proxy_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: proxy_uri = SIPURI(host=sip_uri.host) settings = SIPSimpleSettings() - try: - routes = self.resolver.lookup_sip_proxy(proxy_uri, settings.sip.transport_list).wait() - except DNSLookupError, e: - raise DNSLookupError('DNS lookup error: %s' % e) + routes = self.resolver.lookup_sip_proxy(proxy_uri, settings.sip.transport_list).wait() if not routes: - raise DNSLookupError('DNS lookup error: no results found') + raise DNSLookupError('no results found') # Get all routes with the highest priority transport and randomly pick one - route = random.choice([r for r in routes if r.transport==routes[0].transport]) + route = random.choice([r for r in routes if r.transport == routes[0].transport]) # Build a proxy URI Sofia-SIP likes - return '%s:%s:%d%s' % ('sips' if route.transport=='tls' else 'sip', + return '%s:%s:%d%s' % ('sips' if route.transport == 'tls' else 'sip', route.address, route.port, ';transport=%s' % route.transport if route.transport != 'tls' else '') def _handle_janus_event(self, handle_id, event_type, event): + # TODO: use a model op = Operation('janus-event', data=dict(handle_id=handle_id, event_type=event_type, event=event)) self.operations_queue.send(op) def _operations_handler(self): while True: op = self.operations_queue.wait() handler = getattr(self, '_OH_%s' % op.name.replace('-', '_'), Null) try: handler(op.data) except Exception: - log.err() - del op - del handler - - def _OH_account_add(self, data): - transaction = data.get('transaction', None) - if transaction is None: - log.warn('Transaction not specified!') - return + log.exception('Unhandled exception in operation "%s"' % op.name) + del op, handler - try: - try: - account = data['account'] - password = data['password'] - except KeyError: - raise APIError('Invalid parameters: "account" and "password" must be specified') + def _OH_account_add(self, request): + # extract the fields to avoid going through the descriptor several times + account = request.account + password = request.password + try: if account in self.accounts_map: - log.warn('Account %s already added' % account) - data = dict(sylkrtc='error', transaction=transaction, error='Account already added') - self._send_data(json.dumps(data)) - return + raise APIError('Account %s already added' % account) - # Validate URI - uri = 'sip:%s' % account - try: - sip_uri = SIPURI.parse(uri) - except SIPCoreError: - raise APIError('Invalid account specified: %s' % account) - if not {'*', sip_uri.host}.intersection(GeneralConfig.sip_domains): - raise APIError('SIP domain not allowed: %s' % sip_uri.host) + # check if domain is acceptable + domain = account.partition('@')[2] + if not {'*', domain}.intersection(GeneralConfig.sip_domains): + raise APIError('SIP domain not allowed: %s' % domain) # Create and store our mapping account_info = AccountInfo(account, password) self.accounts_map[account_info.id] = account_info - - data = dict(sylkrtc='ack', transaction=transaction) - self._send_data(json.dumps(data)) - log.msg('Account %s added' % account) except APIError, e: log.error('account_add: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) - except Exception, e: - log.error('Unexpected error in account_add: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + else: + log.msg('Account %s added' % account) + self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) - def _OH_account_remove(self, data): - transaction = data.get('transaction', None) - if transaction is None: - log.warn('Transaction not specified!') - return + def _OH_account_remove(self, request): + # extract the fields to avoid going through the descriptor several times + account = request.account try: - try: - account = data['account'] - except KeyError: - raise APIError('Invalid parameters: "account" must be specified') - try: account_info = self.accounts_map.pop(account) except KeyError: raise APIError('Unknown account specified: %s' % account) handle_id = account_info.janus_handle_id if handle_id is not None: - block_on(self.backend.janus_detach(self.janus_session_id, handle_id)) - self.backend.janus_set_event_handler(handle_id, None) + block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) + self.protocol.backend.janus_set_event_handler(handle_id, None) self.account_handles_map.pop(handle_id) - - data = dict(sylkrtc='ack', transaction=transaction) - self._send_data(json.dumps(data)) - log.msg('Account %s removed' % account) except APIError, e: log.error('account_remove: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) - except Exception, e: - log.error('Unexpected error in account_remove: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + else: + log.msg('Account %s removed' % account) + self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) - def _OH_account_register(self, data): - transaction = data.get('transaction', None) - if transaction is None: - log.warn('Transaction not specified!') - return + def _OH_account_register(self, request): + # extract the fields to avoid going through the descriptor several times + account = request.account try: - try: - account = data['account'] - except KeyError: - raise APIError('Invalid parameters: "account" must be specified') - try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) - proxy = self._lookup_sip_proxy(account) + try: + proxy = self._lookup_sip_proxy(account) + except DNSLookupError: + raise APIError('DNS lookup error') handle_id = account_info.janus_handle_id if handle_id is not None: # Destroy the existing plugin handle - block_on(self.backend.janus_detach(self.janus_session_id, handle_id)) - self.backend.janus_set_event_handler(handle_id, None) + block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) + self.protocol.backend.janus_set_event_handler(handle_id, None) self.account_handles_map.pop(handle_id) account_info.janus_handle_id = None # Create a plugin handle - handle_id = block_on(self.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) - self.backend.janus_set_event_handler(handle_id, self._handle_janus_event) + handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) + self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event) account_info.janus_handle_id = handle_id self.account_handles_map[handle_id] = account_info data = {'request': 'register', 'username': account_info.uri, 'ha1_secret': account_info.password, 'proxy': proxy} - block_on(self.backend.janus_message(self.janus_session_id, handle_id, data)) - - data = dict(sylkrtc='ack', transaction=transaction) - self._send_data(json.dumps(data)) - log.msg('Account %s will register' % account) + block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) except APIError, e: log.error('account-register: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) - except Exception, e: - log.error('Unexpected error in account-register: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + else: + log.msg('Account %s will register' % account) + self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) - def _OH_account_unregister(self, data): - transaction = data.get('transaction', None) - if transaction is None: - log.warn('Transaction not specified!') - return + def _OH_account_unregister(self, request): + # extract the fields to avoid going through the descriptor several times + account = request.account try: - try: - account = data['account'] - except KeyError: - raise APIError('Invalid parameters: "account" must be specified') - try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) handle_id = account_info.janus_handle_id if handle_id is not None: - block_on(self.backend.janus_detach(self.janus_session_id, handle_id)) - self.backend.janus_set_event_handler(handle_id, None) + block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) + self.protocol.backend.janus_set_event_handler(handle_id, None) account_info.janus_handle_id = None self.account_handles_map.pop(handle_id) - - data = dict(sylkrtc='ack', transaction=transaction) - self._send_data(json.dumps(data)) - log.msg('Account %s will unregister' % account) except APIError, e: log.error('account-unregister: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) - except Exception, e: - log.error('Unexpected error in account-unregister: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + else: + log.msg('Account %s will unregister' % account) + self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) - def _OH_session_create(self, data): - transaction = data.get('transaction', None) - if transaction is None: - log.warn('Transaction not specified!') - return + def _OH_session_create(self, request): + # extract the fields to avoid going through the descriptor several times + account = request.account + session = request.session + uri = request.uri + sdp = request.sdp try: - try: - account = data['account'] - session = data['session'] - uri = data['uri'] - sdp = data['sdp'] - except KeyError: - raise APIError('Invalid parameters: "account", "session", "uri" and "sdp" must be specified') - try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) if session in self.sessions_map: raise APIError('Session ID (%s) already in use' % session) # Create a new plugin handle and 'register' it, without actually doing so - handle_id = block_on(self.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) - self.backend.janus_set_event_handler(handle_id, self._handle_janus_event) + handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) + self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event) try: proxy = self._lookup_sip_proxy(account_info.id) except DNSLookupError: - block_on(self.backend.janus_detach(self.janus_session_id, handle_id)) - self.backend.janus_set_event_handler(handle_id, None) + block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) + self.protocol.backend.janus_set_event_handler(handle_id, None) raise APIError('DNS lookup error') account_uri = 'sip:%s' % account_info.id - data = {'request': 'register', 'username': account_uri, 'ha1_secret': account_info.password, 'proxy': proxy, 'send_register': False} - block_on(self.backend.janus_message(self.janus_session_id, handle_id, data)) + data = {'request': 'register', 'username': account_uri, 'ha1_secret': account_info.password, 'proxy': proxy, + 'send_register': False} + block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) session_info = JanusSessionInfo(session) session_info.janus_handle_id = handle_id session_info.init_outgoing(account, uri) self.sessions_map[session_info.id] = session_info self.session_handles_map[handle_id] = session_info data = {'request': 'call', 'uri': 'sip:%s' % SIP_PREFIX_RE.sub('', uri), 'srtp': 'sdes_optional'} jsep = {'type': 'offer', 'sdp': sdp} - block_on(self.backend.janus_message(self.janus_session_id, handle_id, data, jsep)) - data = dict(sylkrtc='ack', transaction=transaction) - self._send_data(json.dumps(data)) - log.msg('Outgoing session %s from %s to %s created' % (session, account, uri)) + block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep)) except APIError, e: log.error('session-create: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) - except Exception, e: - log.error('Unexpected error in session-create: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + else: + log.msg('Outgoing session %s from %s to %s created' % (session, account, uri)) + self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) - def _OH_session_answer(self, data): - transaction = data.get('transaction', None) - if transaction is None: - log.warn('Transaction not specified!') - return + def _OH_session_answer(self, request): + # extract the fields to avoid going through the descriptor several times + session = request.session + sdp = request.sdp try: - try: - session = data['session'] - sdp = data['sdp'] - except KeyError: - raise APIError('Invalid parameters: "session" and "sdp" must be specified') - try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.direction != 'incoming': raise APIError('Cannot answer outgoing session') if session_info.state != 'connecting': raise APIError('Invalid state for session answer') data = {'request': 'accept'} jsep = {'type': 'answer', 'sdp': sdp} - block_on(self.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data, jsep)) - data = dict(sylkrtc='ack', transaction=transaction) - self._send_data(json.dumps(data)) - log.msg('%s answered session %s' % (session_info.account_id, session)) + block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data, jsep)) except APIError, e: log.error('session-answer: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) - except Exception, e: - log.error('Unexpected error in session-answer: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + else: + log.msg('%s answered session %s' % (session_info.account_id, session)) + self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) - def _OH_session_trickle(self, data): - transaction = data.get('transaction', None) - if transaction is None: - log.warn('Transaction not specified!') - return + def _OH_session_trickle(self, request): + # extract the fields to avoid going through the descriptor several times + session = request.session + candidates = [c.to_struct() for c in request.candidates] try: - try: - session = data['session'] - candidates = data['candidates'] - except KeyError: - raise APIError('Invalid parameters: "session" and "candidates" must be specified') - try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.state == 'terminated': raise APIError('Session is terminated') - block_on(self.backend.janus_trickle(self.janus_session_id, session_info.janus_handle_id, candidates)) - data = dict(sylkrtc='ack', transaction=transaction) - self._send_data(json.dumps(data)) - log.msg('Trickled ICE candidate(s) for session %s' % session) + block_on(self.protocol.backend.janus_trickle(self.janus_session_id, session_info.janus_handle_id, candidates)) except APIError, e: log.error('session-trickle: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) - except Exception, e: - log.error('Unexpected error in session-trickle: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + else: + if candidates: + log.msg('Trickled ICE candidate(s) for session %s' % session) + else: + log.msg('Trickled ICE candidates end for session %s' % session) + self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) - def _OH_session_terminate(self, data): - transaction = data.get('transaction', None) - if transaction is None: - log.warn('Transaction not specified!') - return + def _OH_session_terminate(self, request): + # extract the fields to avoid going through the descriptor several times + session = request.session try: - try: - session = data['session'] - except KeyError: - raise APIError('Invalid parameters: "session" must be specified') - try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.state not in ('connecting', 'progress', 'accepted', 'established'): raise APIError('Invalid state for session terminate: \"%s\"' % session_info.state) if session_info.direction == 'incoming' and session_info.state == 'connecting': data = {'request': 'decline', 'code': 486} else: data = {'request': 'hangup'} - block_on(self.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data)) - data = dict(sylkrtc='ack', transaction=transaction) - self._send_data(json.dumps(data)) - log.msg('%s terminated session %s' % (session_info.account_id, session)) + block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data)) except APIError, e: log.error('session-terminate: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) - except Exception, e: - log.error('Unexpected error in session-terminate: %s' % e) - data = dict(sylkrtc='error', transaction=transaction, error=str(e)) - self._send_data(json.dumps(data)) + self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) + else: + log.msg('%s terminated session %s' % (session_info.account_id, session)) + self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) # Event handlers def _OH_janus_event(self, data): handle_id = data['handle_id'] event_type = data['event_type'] event = data['event'] if event_type == 'event': - event_data = event['plugindata']['data'] - if 'result' in event_data: - jsep = event.get('jsep', None) - event_type = event_data['result']['event'] - if event_type in ('registering', 'registered', 'registration_failed', 'incomingcall'): - # skip 'registered' events from session handles - if event_type == 'registered' and event_data['result']['register_sent'] in (False, 'false'): - return - # account event - try: - account_info = self.account_handles_map[handle_id] - except KeyError: - log.warn('Could not find account for handle ID %s' % handle_id) - return - if event_type == 'incomingcall': - originator_uri = SIP_PREFIX_RE.sub('', event_data['result']['username']) - originator_display_name = event_data['result'].get('displayname', '').replace('"', '') - jsep = event.get('jsep', None) - assert jsep is not None - session_id = uuid.uuid4().hex - session = JanusSessionInfo(session_id) - session.janus_handle_id = handle_id - session.init_incoming(account_info.id, originator_uri, originator_display_name) - self.sessions_map[session_id] = session - self.session_handles_map[handle_id] = session - data = dict(sylkrtc='account_event', - account=account_info.id, - session=session_id, - event='incoming_session', - data=dict(originator=session.remote_identity.__dict__, sdp=jsep['sdp'])) - log.msg('Incoming session %s %s <-> %s created' % (session.id, - session.remote_identity.uri, - session.local_identity.uri)) - else: - registration_state = event_type - if registration_state == 'registration_failed': - registration_state = 'failed' - if account_info.registration_state == registration_state: - return - account_info.registration_state = registration_state - registration_data = dict(state=registration_state) - if registration_state == 'failed': - code = event_data['result']['code'] - reason = event_data['result']['reason'] - registration_data['reason'] = '%d %s' % (code, reason) - data = dict(sylkrtc='account_event', - account=account_info.id, - event='registration_state', - data=registration_data) - log.msg('Account %s registration state changed to %s' % (account_info.id, registration_state)) - self._send_data(json.dumps(data)) - elif event_type in ('calling', 'accepted', 'hangup'): - # session event - try: - session_info = self.session_handles_map[handle_id] - except KeyError: - log.warn('Could not find session for handle ID %s' % handle_id) - return - if event_type == 'hangup' and session_info.state == 'terminated': - return - if event_type == 'calling': - session_info.state = 'progress' - elif event_type == 'accepted': - session_info.state = 'accepted' - elif event_type == 'hangup': - session_info.state = 'terminated' - data = dict(sylkrtc='session_event', - session=session_info.id, - event='state', - data=dict(state=session_info.state)) - log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) - if session_info.state == 'accepted' and session_info.direction == 'outgoing': - assert jsep is not None - data['data']['sdp'] = jsep['sdp'] - elif session_info.state == 'terminated': - code = event_data['result'].get('code', 0) - reason = event_data['result'].get('reason', 'Unknown') - reason = '%d %s' % (code, reason) - data['data']['reason'] = reason - self._send_data(json.dumps(data)) - if session_info.state == 'terminated': - self._cleanup_session(session_info) - log.msg('%s session %s %s <-> %s terminated (%s)' % (session_info.direction.title(), - session_info.id, - session_info.local_identity.uri, - session_info.remote_identity.uri, - reason)) - # check if missed incoming call - if session_info.direction == 'incoming' and code == 487: - data = dict(sylkrtc='account_event', - account=session_info.account_id, - event='missed_session', - data=dict(originator=session_info.remote_identity.__dict__)) - log.msg('Incoming session from %s missed' % session_info.remote_identity.uri) - self._send_data(json.dumps(data)) - elif event_type == 'missed_call': - try: - account_info = self.account_handles_map[handle_id] - except KeyError: - log.warn('Could not find account for handle ID %s' % handle_id) - return - originator_uri = SIP_PREFIX_RE.sub('', event_data['result']['caller']) - originator_display_name = event_data['result'].get('displayname', '').replace('"', '') - # We have no session, so create an identity object by hand - originator = SessionPartyIdentity(originator_uri, originator_display_name) - data = dict(sylkrtc='account_event', - account=account_info.id, - event='missed_session', - data=dict(originator=originator.__dict__)) - log.msg('Incoming session from %s missed' % originator.uri) - self._send_data(json.dumps(data)) - elif event_type in ('ack', 'declining', 'hangingup'): - # ignore - pass - else: - log.warn('Unexpected SIP plugin event type: %s' % event_type) - else: - log.warn('Unexpected event: %s' % event) + plugin = event['plugindata']['plugin'].split('.')[-1].lower() + handler = getattr(self, '_OH_janus_event_plugin_%s' % plugin, Null) + handler(data) elif event_type == 'webrtcup': try: session_info = self.session_handles_map[handle_id] except KeyError: log.msg('Could not find session for handle ID %s' % handle_id) return session_info.state = 'established' data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state)) log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) + # TODO: SessionEvent model self._send_data(json.dumps(data)) elif event_type == 'hangup': try: session_info = self.session_handles_map[handle_id] except KeyError: log.msg('Could not find session for handle ID %s' % handle_id) return if session_info.state != 'terminated': session_info.state = 'terminated' code = event.get('code', 0) reason = event.get('reason', 'Unknown') reason = '%d %s' % (code, reason) data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state, reason=reason)) log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) + # TODO: SessionEvent model self._send_data(json.dumps(data)) self._cleanup_session(session_info) log.msg('%s session %s %s <-> %s terminated (%s)' % (session_info.direction.title(), session_info.id, session_info.local_identity.uri, session_info.remote_identity.uri, reason)) elif event_type in ('media', 'detached'): # ignore pass else: log.warn('Received unexpected event type: %s' % event_type) + def _OH_janus_event_plugin_sip(self, data): + handle_id = data['handle_id'] + event = data['event'] + plugin_data = event['plugindata'] + assert(plugin_data['plugin'] == 'janus.plugin.sip') + event_data = event['plugindata']['data'] + assert(event_data.get('sip', '') == 'event') + + if 'result' not in event_data: + log.warn('Unexpected event: %s' % event) + return + + event_data = event_data['result'] + jsep = event.get('jsep', None) + event_type = event_data['event'] + if event_type in ('registering', 'registered', 'registration_failed', 'incomingcall'): + # skip 'registered' events from session handles + if event_type == 'registered' and event_data['register_sent'] in (False, 'false'): + return + # account event + try: + account_info = self.account_handles_map[handle_id] + except KeyError: + log.warn('Could not find account for handle ID %s' % handle_id) + return + if event_type == 'incomingcall': + originator_uri = SIP_PREFIX_RE.sub('', event_data['username']) + originator_display_name = event_data.get('displayname', '').replace('"', '') + jsep = event.get('jsep', None) + assert jsep is not None + session_id = uuid.uuid4().hex + session = JanusSessionInfo(session_id) + session.janus_handle_id = handle_id + session.init_incoming(account_info.id, originator_uri, originator_display_name) + self.sessions_map[session_id] = session + self.session_handles_map[handle_id] = session + data = dict(sylkrtc='account_event', + account=account_info.id, + session=session_id, + event='incoming_session', + data=dict(originator=session.remote_identity.__dict__, sdp=jsep['sdp'])) + log.msg('Incoming session %s %s <-> %s created' % (session.id, + session.remote_identity.uri, + session.local_identity.uri)) + else: + registration_state = event_type + if registration_state == 'registration_failed': + registration_state = 'failed' + if account_info.registration_state == registration_state: + return + account_info.registration_state = registration_state + registration_data = dict(state=registration_state) + if registration_state == 'failed': + code = event_data['code'] + reason = event_data['reason'] + registration_data['reason'] = '%d %s' % (code, reason) + data = dict(sylkrtc='account_event', + account=account_info.id, + event='registration_state', + data=registration_data) + log.msg('Account %s registration state changed to %s' % (account_info.id, registration_state)) + # TODO: AccountEvent model + self._send_data(json.dumps(data)) + elif event_type in ('calling', 'accepted', 'hangup'): + # session event + try: + session_info = self.session_handles_map[handle_id] + except KeyError: + log.warn('Could not find session for handle ID %s' % handle_id) + return + if event_type == 'hangup' and session_info.state == 'terminated': + return + if event_type == 'calling': + session_info.state = 'progress' + elif event_type == 'accepted': + session_info.state = 'accepted' + elif event_type == 'hangup': + session_info.state = 'terminated' + data = dict(sylkrtc='session_event', + session=session_info.id, + event='state', + data=dict(state=session_info.state)) + log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) + if session_info.state == 'accepted' and session_info.direction == 'outgoing': + assert jsep is not None + data['data']['sdp'] = jsep['sdp'] + elif session_info.state == 'terminated': + code = event_data.get('code', 0) + reason = event_data.get('reason', 'Unknown') + reason = '%d %s' % (code, reason) + data['data']['reason'] = reason + # TODO: SessionEvent model + self._send_data(json.dumps(data)) + if session_info.state == 'terminated': + self._cleanup_session(session_info) + log.msg('%s session %s %s <-> %s terminated (%s)' % (session_info.direction.title(), + session_info.id, + session_info.local_identity.uri, + session_info.remote_identity.uri, + reason)) + # check if missed incoming call + if session_info.direction == 'incoming' and code == 487: + data = dict(sylkrtc='account_event', + account=session_info.account_id, + event='missed_session', + data=dict(originator=session_info.remote_identity.__dict__)) + log.msg('Incoming session from %s missed' % session_info.remote_identity.uri) + # TODO: AccountEvent model + self._send_data(json.dumps(data)) + elif event_type == 'missed_call': + try: + account_info = self.account_handles_map[handle_id] + except KeyError: + log.warn('Could not find account for handle ID %s' % handle_id) + return + originator_uri = SIP_PREFIX_RE.sub('', event_data['caller']) + originator_display_name = event_data.get('displayname', '').replace('"', '') + # We have no session, so create an identity object by hand + originator = SessionPartyIdentity(originator_uri, originator_display_name) + data = dict(sylkrtc='account_event', + account=account_info.id, + event='missed_session', + data=dict(originator=originator.__dict__)) + log.msg('Incoming session from %s missed' % originator.uri) + # TODO: AccountEvent model + self._send_data(json.dumps(data)) + elif event_type in ('ack', 'declining', 'hangingup'): + # ignore + pass + else: + log.warn('Unexpected SIP plugin event type: %s' % event_type) + + +class SylkWebSocketServerProtocol(WebSocketServerProtocol): + backend = None + connection_handler = None + + def onConnect(self, request): + log.msg('Incoming connection from %s (origin %s)' % (request.peer, request.origin)) + if SYLK_WS_PROTOCOL not in request.protocols: + log.msg('Rejecting connection, remote does not support our sub-protocol') + raise HttpException(406, 'No compatible protocol specified') + if not self.backend.ready: + log.msg('Rejecting connection, backend is not connected') + raise HttpException(503, 'Backend is not connected') + return SYLK_WS_PROTOCOL + + def onOpen(self): + self.factory.connections.add(self) + self.connection_handler = ConnectionHandler(self) + self.connection_handler.start() + + def onMessage(self, payload, is_binary): + if is_binary: + log.warn('Received invalid binary message') + return + if GeneralConfig.trace_websocket: + self.factory.ws_logger.msg("IN", ISOTimestamp.now(), payload) + try: + data = json.loads(payload) + except Exception, e: + log.warn('Error parsing WebSocket payload: %s' % e) + return + self.connection_handler.handle_message(data) + + def onClose(self, clean, code, reason): + if self.connection_handler is None: + # Very early connection closed, onOpen wasn't even called + return + log.msg('Connection from %s closed' % self.transport.getPeer()) + self.factory.connections.discard(self) + self.connection_handler.stop() + self.connection_handler = None + + def disconnect(self, code=1000, reason=u''): + self.sendClose(code, reason) + class SylkWebSocketServerFactory(WebSocketServerFactory): implements(IObserver) protocol = SylkWebSocketServerProtocol connections = set() backend = None # assigned by WebHandler def __init__(self, *args, **kw): super(SylkWebSocketServerFactory, self).__init__(*args, **kw) notification_center = NotificationCenter() notification_center.add_observer(self, name='JanusBackendDisconnected') def buildProtocol(self, addr): protocol = self.protocol() protocol.factory = self protocol.backend = self.backend return protocol def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_JanusBackendDisconnected(self, notification): for conn in self.connections.copy(): conn.failConnection()