diff --git a/sylk/applications/webrtcgateway/web/__init__.py b/sylk/applications/webrtcgateway/web/__init__.py index 5e957d5..d4af2e2 100644 --- a/sylk/applications/webrtcgateway/web/__init__.py +++ b/sylk/applications/webrtcgateway/web/__init__.py @@ -1,81 +1,82 @@ from application.python.types import Singleton from autobahn.twisted.resource import WebSocketResource from sylk import __version__ as sylk_version from sylk.applications.webrtcgateway.configuration import GeneralConfig, JanusConfig from sylk.applications.webrtcgateway.janus.backend import JanusBackend from sylk.applications.webrtcgateway.logger import log -from sylk.applications.webrtcgateway.web.api import SYLK_WS_PROTOCOL, SylkWebSocketServerFactory +from sylk.applications.webrtcgateway.web.factory import SylkWebSocketServerFactory +from sylk.applications.webrtcgateway.web.protocol import SYLK_WS_PROTOCOL from sylk.applications.webrtcgateway.websocket_logger import Logger as WSLogger from sylk.resources import Resources from sylk.web import Klein, StaticFileResource, server class WebRTCGatewayWeb(object): __metaclass__ = Singleton app = Klein() _resource = None _ws_resource = None def __init__(self, ws_factory): self._ws_resource = WebSocketResource(ws_factory) def resource(self): if self._resource is None: self._resource = self.app.resource() return self._resource @app.route('/') def index(self, request): path = Resources.get('html/webrtcgateway/index.html') r = StaticFileResource(path) r.isLeaf = True return r @app.route('/ws') def ws(self, request): return self._ws_resource class WebHandler(object): def __init__(self): self.backend = None self.factory = None self.resource = None self.web = None self.ws_logger = WSLogger() def start(self): ws_url = 'ws' + server.url[4:] + '/webrtcgateway/ws' self.factory = SylkWebSocketServerFactory(ws_url, protocols=[SYLK_WS_PROTOCOL], server='SylkServer/%s' % sylk_version, debug=False) self.factory.setProtocolOptions(allowedOrigins=GeneralConfig.web_origins, autoPingInterval=GeneralConfig.websocket_ping_interval, autoPingTimeout=GeneralConfig.websocket_ping_interval/2) self.factory.ws_logger = self.ws_logger self.web = WebRTCGatewayWeb(self.factory) server.register_resource('webrtcgateway', self.web.resource()) log.msg('WebSocket handler started at %s' % ws_url) log.msg('Allowed web origins: %s' % ', '.join(GeneralConfig.web_origins)) log.msg('Allowed SIP domains: %s' % ', '.join(GeneralConfig.sip_domains)) log.msg('Using Janus API: %s' % JanusConfig.api_url) self.ws_logger.start() self.backend = JanusBackend() self.backend.start() self.factory.backend = self.backend def stop(self): if self.factory is not None: for conn in self.factory.connections.copy(): conn.failConnection() self.factory = None if self.backend is not None: self.backend.stop() self.backend = None self.ws_logger.stop() diff --git a/sylk/applications/webrtcgateway/web/factory.py b/sylk/applications/webrtcgateway/web/factory.py new file mode 100644 index 0000000..28c9190 --- /dev/null +++ b/sylk/applications/webrtcgateway/web/factory.py @@ -0,0 +1,79 @@ + +import weakref + +from application.notification import IObserver, NotificationCenter +from application.python import Null +from autobahn.twisted.websocket import WebSocketServerFactory +from zope.interface import implements + +from sylk.applications.webrtcgateway.web.protocol import SylkWebSocketServerProtocol, SYLK_WS_PROTOCOL + + +class VideoRoomContainer(object): + def __init__(self): + self._rooms = set() + self._uri_map = weakref.WeakValueDictionary() + self._id_map = weakref.WeakValueDictionary() + + def add(self, room): + self._rooms.add(room) + self._uri_map[room.uri] = room + self._id_map[room.id] = room + + def remove(self, value): + if isinstance(value, int): + room = self._id_map.get(value, None) + elif isinstance(value, basestring): + room = self._uri_map.get(value, None) + else: + room = value + self._rooms.discard(room) + + def clear(self): + self._rooms.clear() + + def __len__(self): + return len(self._rooms) + + def __iter__(self): + return iter(self._rooms) + + def __getitem__(self, item): + if isinstance(item, int): + return self._id_map[item] + elif isinstance(item, basestring): + return self._uri_map[item] + else: + raise KeyError('%s not found' % item) + + def __contains__(self, item): + return item in self._id_map or item in self._uri_map or item in self._rooms + + +class SylkWebSocketServerFactory(WebSocketServerFactory): + implements(IObserver) + + protocol = SylkWebSocketServerProtocol + connections = set() + videorooms = VideoRoomContainer() + backend = None # assigned by WebHandler + + def __init__(self, *args, **kw): + super(SylkWebSocketServerFactory, self).__init__(*args, **kw) + notification_center = NotificationCenter() + notification_center.add_observer(self, name='JanusBackendDisconnected') + + def buildProtocol(self, addr): + protocol = self.protocol() + protocol.factory = self + protocol.backend = self.backend + return protocol + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_JanusBackendDisconnected(self, notification): + for conn in self.connections.copy(): + conn.failConnection() + self.videorooms.clear() diff --git a/sylk/applications/webrtcgateway/web/api.py b/sylk/applications/webrtcgateway/web/handler.py similarity index 92% rename from sylk/applications/webrtcgateway/web/api.py rename to sylk/applications/webrtcgateway/web/handler.py index 288dd3b..abd7d2e 100644 --- a/sylk/applications/webrtcgateway/web/api.py +++ b/sylk/applications/webrtcgateway/web/handler.py @@ -1,1326 +1,1199 @@ import json import random import re import uuid import weakref from application.python import Null -from application.notification import IObserver, NotificationCenter -from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory from eventlib import coros, proc from eventlib.twistedutil import block_on from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor -from zope.interface import implements from sylk.applications.webrtcgateway.configuration import GeneralConfig from sylk.applications.webrtcgateway.logger import log from sylk.applications.webrtcgateway.models import sylkrtc from sylk.applications.webrtcgateway.util import GreenEvent -try: - from autobahn.websocket.http import HttpException -except ImportError: - # AutoBahn 0.12 changed this - from autobahn.websocket import ConnectionDeny as HttpException - - -SYLK_WS_PROTOCOL = 'sylkRTC-1' SIP_PREFIX_RE = re.compile('^sips?:') sylkrtc_models = { # account management 'account-add' : sylkrtc.AccountAddRequest, 'account-remove' : sylkrtc.AccountRemoveRequest, 'account-register' : sylkrtc.AccountRegisterRequest, 'account-unregister' : sylkrtc.AccountUnregisterRequest, # session management 'session-create' : sylkrtc.SessionCreateRequest, 'session-answer' : sylkrtc.SessionAnswerRequest, 'session-trickle' : sylkrtc.SessionTrickleRequest, 'session-terminate' : sylkrtc.SessionTerminateRequest, # video conference management 'videoroom-join' : sylkrtc.VideoRoomJoinRequest, 'videoroom-ctl' : sylkrtc.VideoRoomControlRequest, 'videoroom-terminate' : sylkrtc.VideoRoomTerminateRequest, } class AccountInfo(object): def __init__(self, id, password, display_name=None, user_agent=None): self.id = id self.password = password self.display_name = display_name self.user_agent = user_agent self.registration_state = None self.janus_handle_id = None @property def uri(self): return 'sip:%s' % self.id class SessionPartyIdentity(object): def __init__(self, uri, display_name=''): self.uri = uri self.display_name = display_name class SIPSessionInfo(object): def __init__(self, id): self.id = id self.direction = None self.state = None self.account_id = None self.local_identity = None # instance of SessionPartyIdentity self.remote_identity = None # instance of SessionPartyIdentity self.janus_handle_id = None def init_outgoing(self, account_id, destination): self.account_id = account_id self.direction = 'outgoing' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(destination) def init_incoming(self, account_id, originator, originator_display_name=''): self.account_id = account_id self.direction = 'incoming' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(originator, originator_display_name) class VideoRoom(object): def __init__(self, uri): self.uri = uri self.id = random.getrandbits(32) # janus needs numeric room names self.destroyed = False self._session_id_map = weakref.WeakValueDictionary() self._publisher_id_map = weakref.WeakValueDictionary() def add(self, session): assert session.publisher_id is not None assert session.id not in self._session_id_map assert session.publisher_id not in self._publisher_id_map self._session_id_map[session.id] = session self._publisher_id_map[session.publisher_id] = session def __getitem__(self, key): try: return self._session_id_map[key] except KeyError: return self._publisher_id_map[key] def __len__(self): return len(self._session_id_map) -class VideoRoomContainer(object): - def __init__(self): - self._rooms = set() - self._uri_map = weakref.WeakValueDictionary() - self._id_map = weakref.WeakValueDictionary() - - def add(self, room): - self._rooms.add(room) - self._uri_map[room.uri] = room - self._id_map[room.id] = room - - def remove(self, value): - if isinstance(value, int): - room = self._id_map.get(value, None) - elif isinstance(value, basestring): - room = self._uri_map.get(value, None) - else: - room = value - self._rooms.discard(room) - - def clear(self): - self._rooms.clear() - - def __len__(self): - return len(self._rooms) - - def __iter__(self): - return iter(self._rooms) - - def __getitem__(self, item): - if isinstance(item, int): - return self._id_map[item] - elif isinstance(item, basestring): - return self._uri_map[item] - else: - raise KeyError('%s not found' % item) - - def __contains__(self, item): - return item in self._id_map or item in self._uri_map or item in self._rooms - - class VideoRoomSessionInfo(object): def __init__(self, id): self.id = id self.account_id = None self.type = None # publisher / subscriber self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers self.janus_handle_id = None self.room = None self.parent_session = None self.feeds = {} # janus publisher ID -> our publisher ID def initialize(self, account_id, type, room): assert type in ('publisher', 'subscriber') self.account_id = account_id self.type = type self.room = room def __repr__(self): return '<%s: id=%s janus_handle_id=%s type=%s>' % (self.__class__.__name__, self.id, self.janus_handle_id, self.type) class VideoRoomSessionContainer(object): def __init__(self): self._sessions = set() self._janus_handle_map = weakref.WeakValueDictionary() self._id_map = weakref.WeakValueDictionary() def add(self, session): assert session not in self._sessions assert session.janus_handle_id not in self._janus_handle_map assert session.id not in self._id_map self._sessions.add(session) self._janus_handle_map[session.janus_handle_id] = session self._id_map[session.id] = session def remove(self, session): self._sessions.discard(session) def clear(self): self._sessions.clear() def __len__(self): return len(self._sessions) def __iter__(self): return iter(self._sessions) def __getitem__(self, key): try: return self._id_map[key] except KeyError: return self._janus_handle_map[key] def __contains__(self, item): return item in self._id_map or item in self._janus_handle_map or item in self._sessions class Operation(object): __slots__ = ('name', 'data') def __init__(self, name, data): self.name = name self.data = data class APIError(Exception): pass class ConnectionHandler(object): def __init__(self, protocol): self.protocol = protocol self.janus_session_id = None self.accounts_map = {} # account ID -> account self.account_handles_map = {} # Janus handle ID -> account self.sessions_map = {} # session ID -> session self.session_handles_map = {} # Janus handle ID -> session self.videoroom_sessions = VideoRoomSessionContainer() # session ID / janus handle ID -> session self.ready_event = GreenEvent() self.resolver = DNSLookup() self.proc = proc.spawn(self._operations_handler) self.operations_queue = coros.queue() def start(self): self._create_janus_session() def stop(self): if self.ready_event.is_set(): assert self.janus_session_id is not None self.protocol.backend.janus_stop_keepalive(self.janus_session_id) self.protocol.backend.janus_destroy_session(self.janus_session_id) if self.proc is not None: self.proc.kill() self.proc = None # cleanup self.ready_event.clear() self.accounts_map.clear() self.account_handles_map.clear() self.sessions_map.clear() self.session_handles_map.clear() self.videoroom_sessions.clear() self.janus_session_id = None self.protocol = None def handle_message(self, data): try: request_type = data.pop('sylkrtc') except KeyError: log.warn('Error getting WebSocket message type') return self.ready_event.wait() try: model = sylkrtc_models[request_type] except KeyError: log.warn('Unknown request type: %s' % request_type) return request = model(**data) try: request.validate() except Exception, e: log.error('%s: %s' % (request_type, e)) if request.transaction: self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) return op = Operation(request_type, request) self.operations_queue.send(op) # internal methods (not overriding / implementing the protocol API) def _send_response(self, response): response.validate() self._send_data(json.dumps(response.to_struct())) def _send_data(self, data): if GeneralConfig.trace_websocket: self.protocol.factory.ws_logger.msg("OUT", ISOTimestamp.now(), data) self.protocol.sendMessage(data, False) def _cleanup_session(self, session): @run_in_green_thread def do_cleanup(): if self.janus_session_id is None: # The connection was closed, there is noting to do here return self.sessions_map.pop(session.id) if session.direction == 'outgoing': # Destroy plugin handle for outgoing sessions. For incoming ones it's the # same as the account handle, so don't block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id)) self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None) self.session_handles_map.pop(session.janus_handle_id) # give it some time to receive other hangup events reactor.callLater(2, do_cleanup) def _cleanup_videoroom_session(self, session): @run_in_green_thread def do_cleanup(): if self.janus_session_id is None: # The connection was closed, there is noting to do here return if session in self.videoroom_sessions: self.videoroom_sessions.remove(session) block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id)) self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None) # give it some time to receive other hangup events reactor.callLater(2, do_cleanup) def _maybe_destroy_videoroom(self, videoroom): @run_in_green_thread def f(): if self.protocol is None: # The connection was closed return # destroy the room if empty if not videoroom and not videoroom.destroyed: videoroom.destroyed = True self.protocol.factory.videorooms.remove(videoroom) # create a handle to do the cleanup handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom) data = {'request': 'destroy', 'room': videoroom.id} try: block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) except Exception: pass block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) log.msg('Video room %s destroyed' % videoroom.uri) # don't destroy it immediately reactor.callLater(5, f) @run_in_green_thread def _create_janus_session(self): if self.ready_event.is_set(): self._send_response(sylkrtc.ReadyEvent()) return try: self.janus_session_id = block_on(self.protocol.backend.janus_create_session()) self.protocol.backend.janus_start_keepalive(self.janus_session_id) except Exception, e: log.warn('Error creating session, disconnecting: %s' % e) self.protocol.disconnect(3000, unicode(e)) return self._send_response(sylkrtc.ReadyEvent()) self.ready_event.set() def _lookup_sip_proxy(self, account): sip_uri = SIPURI.parse('sip:%s' % account) # The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed # as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use # caching to avoid long delays, we randomize the results matching the highest priority route's # transport. proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: proxy_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: proxy_uri = SIPURI(host=sip_uri.host) settings = SIPSimpleSettings() routes = self.resolver.lookup_sip_proxy(proxy_uri, settings.sip.transport_list).wait() if not routes: raise DNSLookupError('no results found') # Get all routes with the highest priority transport and randomly pick one route = random.choice([r for r in routes if r.transport == routes[0].transport]) # Build a proxy URI Sofia-SIP likes return '%s:%s:%d%s' % ('sips' if route.transport == 'tls' else 'sip', route.address, route.port, ';transport=%s' % route.transport if route.transport != 'tls' else '') def _handle_conference_invite(self, originator, room_uri, participants): for p in participants: try: account_info = self.accounts_map[p] except KeyError: continue data = dict(sylkrtc='account_event', account=account_info.id, event='conference_invite', data=dict(originator=dict(uri=originator.id, display_name=originator.display_name), room=room_uri)) log.msg('Conference invitation from %s to %s for room %s' % (originator.id, account_info.id, room_uri)) self._send_data(json.dumps(data)) def _handle_janus_event_sip(self, handle_id, event_type, event): # TODO: use a model op = Operation('janus-event-sip', data=dict(handle_id=handle_id, event_type=event_type, event=event)) self.operations_queue.send(op) def _handle_janus_event_videoroom(self, handle_id, event_type, event): # TODO: use a model op = Operation('janus-event-videoroom', data=dict(handle_id=handle_id, event_type=event_type, event=event)) self.operations_queue.send(op) def _operations_handler(self): while True: op = self.operations_queue.wait() handler = getattr(self, '_OH_%s' % op.name.replace('-', '_'), Null) try: handler(op.data) except Exception: log.exception('Unhandled exception in operation "%s"' % op.name) del op, handler def _OH_account_add(self, request): # extract the fields to avoid going through the descriptor several times account = request.account password = request.password display_name = request.display_name user_agent = request.user_agent try: if account in self.accounts_map: raise APIError('Account %s already added' % account) # check if domain is acceptable domain = account.partition('@')[2] if not {'*', domain}.intersection(GeneralConfig.sip_domains): raise APIError('SIP domain not allowed: %s' % domain) # Create and store our mapping account_info = AccountInfo(account, password, display_name, user_agent) self.accounts_map[account_info.id] = account_info except APIError, e: log.error('account_add: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Account %s added' % account) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_account_remove(self, request): # extract the fields to avoid going through the descriptor several times account = request.account try: try: account_info = self.accounts_map.pop(account) except KeyError: raise APIError('Unknown account specified: %s' % account) handle_id = account_info.janus_handle_id if handle_id is not None: block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) self.account_handles_map.pop(handle_id) except APIError, e: log.error('account_remove: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Account %s removed' % account) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_account_register(self, request): # extract the fields to avoid going through the descriptor several times account = request.account try: try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) try: proxy = self._lookup_sip_proxy(account) except DNSLookupError: raise APIError('DNS lookup error') handle_id = account_info.janus_handle_id if handle_id is not None: # Destroy the existing plugin handle block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) self.account_handles_map.pop(handle_id) account_info.janus_handle_id = None # Create a plugin handle handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip) account_info.janus_handle_id = handle_id self.account_handles_map[handle_id] = account_info data = {'request': 'register', 'username': account_info.uri, 'display_name': account_info.display_name, 'user_agent': account_info.user_agent, 'ha1_secret': account_info.password, 'proxy': proxy} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) except APIError, e: log.error('account-register: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Account %s will register' % account) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_account_unregister(self, request): # extract the fields to avoid going through the descriptor several times account = request.account try: try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) handle_id = account_info.janus_handle_id if handle_id is not None: block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) account_info.janus_handle_id = None self.account_handles_map.pop(handle_id) except APIError, e: log.error('account-unregister: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Account %s will unregister' % account) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_session_create(self, request): # extract the fields to avoid going through the descriptor several times account = request.account session = request.session uri = request.uri sdp = request.sdp try: try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) if session in self.sessions_map: raise APIError('Session ID (%s) already in use' % session) # Create a new plugin handle and 'register' it, without actually doing so handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip) try: proxy = self._lookup_sip_proxy(account_info.id) except DNSLookupError: block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) raise APIError('DNS lookup error') data = {'request': 'register', 'username': account_info.uri, 'display_name': account_info.display_name, 'user_agent': account_info.user_agent, 'ha1_secret': account_info.password, 'proxy': proxy, 'send_register': False} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) session_info = SIPSessionInfo(session) session_info.janus_handle_id = handle_id session_info.init_outgoing(account, uri) # TODO: create a "SessionContainer" object combining the 2 self.sessions_map[session_info.id] = session_info self.session_handles_map[handle_id] = session_info data = {'request': 'call', 'uri': 'sip:%s' % SIP_PREFIX_RE.sub('', uri), 'srtp': 'sdes_optional'} jsep = {'type': 'offer', 'sdp': sdp} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep)) except APIError, e: log.error('session-create: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Outgoing session %s from %s to %s created' % (session, account, uri)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_session_answer(self, request): # extract the fields to avoid going through the descriptor several times session = request.session sdp = request.sdp try: try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.direction != 'incoming': raise APIError('Cannot answer outgoing session') if session_info.state != 'connecting': raise APIError('Invalid state for session answer') data = {'request': 'accept'} jsep = {'type': 'answer', 'sdp': sdp} block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data, jsep)) except APIError, e: log.error('session-answer: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('%s answered session %s' % (session_info.account_id, session)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_session_trickle(self, request): # extract the fields to avoid going through the descriptor several times session = request.session candidates = [c.to_struct() for c in request.candidates] try: try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.state == 'terminated': raise APIError('Session is terminated') block_on(self.protocol.backend.janus_trickle(self.janus_session_id, session_info.janus_handle_id, candidates)) except APIError, e: log.error('session-trickle: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: if candidates: log.msg('Trickled ICE candidate(s) for session %s' % session) else: log.msg('Trickled ICE candidates end for session %s' % session) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_session_terminate(self, request): # extract the fields to avoid going through the descriptor several times session = request.session try: try: session_info = self.sessions_map[session] except KeyError: raise APIError('Unknown session specified: %s' % session) if session_info.state not in ('connecting', 'progress', 'accepted', 'established'): raise APIError('Invalid state for session terminate: \"%s\"' % session_info.state) if session_info.direction == 'incoming' and session_info.state == 'connecting': data = {'request': 'decline', 'code': 486} else: data = {'request': 'hangup'} block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data)) except APIError, e: log.error('session-terminate: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('%s terminated session %s' % (session_info.account_id, session)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_videoroom_join(self, request): account = request.account session = request.session uri = request.uri sdp = request.sdp try: try: account_info = self.accounts_map[account] except KeyError: raise APIError('Unknown account specified: %s' % account) if session in self.videoroom_sessions: raise APIError('VideoRoom session ID (%s) already in use' % session) handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom) # create the room if it doesn't exist try: videoroom = self.protocol.factory.videorooms[uri] except KeyError: videoroom = VideoRoom(uri) self.protocol.factory.videorooms.add(videoroom) data = {'request': 'create', 'room': videoroom.id, 'publishers': 10} try: block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) except Exception, e: code = getattr(e, 'code', -1) if code != 427: # 417 == room exists block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id)) self.protocol.backend.janus_set_event_handler(handle_id, None) raise APIError(str(e)) # join the room data = {'request': 'joinandconfigure', 'room': videoroom.id, 'ptype': 'publisher', 'audio': True, 'video': True} if account_info.display_name: data['display'] = account_info.display_name jsep = {'type': 'offer', 'sdp': sdp} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep)) videoroom_session = VideoRoomSessionInfo(session) videoroom_session.janus_handle_id = handle_id videoroom_session.initialize(account, 'publisher', videoroom) self.videoroom_sessions.add(videoroom_session) except APIError, e: log.error('videoroom-join: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) self._maybe_destroy_videoroom(videoroom) else: log.msg('Video room %s from %s to %s joined' % (videoroom.id, account, uri)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='progress')) self._send_data(json.dumps(data)) def _OH_videoroom_ctl(self, request): if request.option == 'trickle': trickle = request.trickle if not trickle: log.error('videoroom-ctl: missing field') return candidates = [c.to_struct() for c in trickle.candidates] session = trickle.session or request.session try: try: videoroom_session = self.videoroom_sessions[session] except KeyError: raise APIError('trickle: unknown video room session ID specified: %s' % session) block_on(self.protocol.backend.janus_trickle(self.janus_session_id, videoroom_session.janus_handle_id, candidates)) except APIError, e: log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: if candidates: log.msg('Trickled ICE candidate(s) for videoroom session %s' % session) else: log.msg('Trickled ICE candidates end for videoroom session %s' % session) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) elif request.option == 'feed-attach': feed_attach = request.feed_attach if not feed_attach: log.error('videoroom-ctl: missing field') return try: if feed_attach.session in self.videoroom_sessions: raise APIError('feed-attach: video room session ID (%s) already in use' % feed_attach.session) # get the 'base' session, the one used to join and publish try: base_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('feed-attach: unknown video room session ID specified: %s' % request.session) # get the publisher's session try: publisher_session = base_session.room[feed_attach.publisher] except KeyError: raise APIError('feed-attach: unknown publisher video room session ID specified: %s' % feed_attach.publisher) if publisher_session.publisher_id is None: raise APIError('feed-attach: video room session ID does not have a publisher ID' % feed_attach.publisher) handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom')) self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom) # join the room as a listener data = {'request': 'join', 'room': base_session.room.id, 'ptype': 'listener', 'feed': publisher_session.publisher_id} block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data)) videoroom_session = VideoRoomSessionInfo(feed_attach.session) videoroom_session.janus_handle_id = handle_id videoroom_session.parent_session = base_session videoroom_session.publisher_id = publisher_session.id videoroom_session.initialize(base_session.account_id, 'subscriber', base_session.room) self.videoroom_sessions.add(videoroom_session) base_session.feeds[publisher_session.publisher_id] = publisher_session.id except APIError, e: log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('Video room %s: %s attached to %s' % (base_session.room.id, base_session.account_id, feed_attach.publisher)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) elif request.option == 'feed-answer': feed_answer = request.feed_answer if not feed_answer: log.error('videoroom-ctl: missing field') return try: try: videoroom_session = self.videoroom_sessions[request.feed_answer.session] except KeyError: raise APIError('feed-answer: unknown video room session ID specified: %s' % feed_answer.session) data = {'request': 'start', 'room': videoroom_session.room.id} jsep = {'type': 'answer', 'sdp': feed_answer.sdp} block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data, jsep)) except APIError, e: log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) elif request.option == 'feed-detach': feed_detach = request.feed_detach if not feed_detach: log.error('videoroom-ctl: missing field') return try: try: base_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('feed-detach: unknown video room session ID specified: %s' % request.session) try: videoroom_session = self.videoroom_sessions[feed_detach.session] except KeyError: raise APIError('feed-detach: unknown video room session ID specified: %s' % feed_detach.session) data = {'request': 'leave'} block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data)) except APIError, e: log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) block_on(self.protocol.backend.janus_detach(self.janus_session_id, videoroom_session.janus_handle_id)) self.protocol.backend.janus_set_event_handler(videoroom_session.janus_handle_id, None) self.videoroom_sessions.remove(videoroom_session) try: janus_publisher_id = next(k for k, v in base_session.feeds.iteritems() if v == videoroom_session.publisher_id) except StopIteration: pass else: base_session.feeds.pop(janus_publisher_id) elif request.option == 'invite-participants': invite_participants = request.invite_participants if not invite_participants: log.error('videoroom-ctl: missing field') return try: try: base_session = self.videoroom_sessions[request.session] account_info = self.accounts_map[base_session.account_id] except KeyError: raise APIError('invite-participants: unknown video room session ID specified: %s' % request.session) except APIError, e: log.error('videoroom-ctl: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) for conn in self.protocol.factory.connections.difference([self]): if conn.connection_handler: conn.connection_handler._handle_conference_invite(account_info, base_session.room.uri, invite_participants.participants) else: log.error('videoroom-ctl: unsupported option: %s' % request.option) def _OH_videoroom_terminate(self, request): session = request.session try: try: videoroom_session = self.videoroom_sessions[session] except KeyError: raise APIError('Unknown video room session ID specified: %s' % session) data = {'request': 'leave'} block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data)) except APIError, e: log.error('videoroom-terminate: %s' % e) self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) else: log.msg('%s terminated video room session %s' % (videoroom_session.account_id, session)) self._send_response(sylkrtc.AckResponse(transaction=request.transaction)) data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='terminated')) self._send_data(json.dumps(data)) self._cleanup_videoroom_session(videoroom_session) self._maybe_destroy_videoroom(videoroom_session.room) # Event handlers def _OH_janus_event_sip(self, data): handle_id = data['handle_id'] event_type = data['event_type'] event = data['event'] if event_type == 'event': self._janus_event_plugin_sip(data) elif event_type == 'webrtcup': try: session_info = self.session_handles_map[handle_id] except KeyError: log.msg('Could not find session for handle ID %s' % handle_id) return session_info.state = 'established' data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state)) log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) # TODO: SessionEvent model self._send_data(json.dumps(data)) elif event_type == 'hangup': try: session_info = self.session_handles_map[handle_id] except KeyError: log.msg('Could not find session for handle ID %s' % handle_id) return if session_info.state != 'terminated': session_info.state = 'terminated' code = event.get('code', 0) reason = event.get('reason', 'Unknown') reason = '%d %s' % (code, reason) data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state, reason=reason)) log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) # TODO: SessionEvent model self._send_data(json.dumps(data)) self._cleanup_session(session_info) log.msg('%s session %s %s <-> %s terminated (%s)' % (session_info.direction.title(), session_info.id, session_info.local_identity.uri, session_info.remote_identity.uri, reason)) elif event_type in ('media', 'detached'): # ignore pass else: log.warn('Received unexpected event type: %s' % event_type) def _janus_event_plugin_sip(self, data): handle_id = data['handle_id'] event = data['event'] plugin_data = event['plugindata'] assert(plugin_data['plugin'] == 'janus.plugin.sip') event_data = event['plugindata']['data'] assert(event_data.get('sip', '') == 'event') if 'result' not in event_data: log.warn('Unexpected event: %s' % event) return event_data = event_data['result'] jsep = event.get('jsep', None) event_type = event_data['event'] if event_type in ('registering', 'registered', 'registration_failed', 'incomingcall'): # skip 'registered' events from session handles if event_type == 'registered' and event_data['register_sent'] in (False, 'false'): return # account event try: account_info = self.account_handles_map[handle_id] except KeyError: log.warn('Could not find account for handle ID %s' % handle_id) return if event_type == 'incomingcall': originator_uri = SIP_PREFIX_RE.sub('', event_data['username']) originator_display_name = event_data.get('displayname', '').replace('"', '') jsep = event.get('jsep', None) assert jsep is not None session_id = uuid.uuid4().hex session = SIPSessionInfo(session_id) session.janus_handle_id = handle_id session.init_incoming(account_info.id, originator_uri, originator_display_name) self.sessions_map[session_id] = session self.session_handles_map[handle_id] = session data = dict(sylkrtc='account_event', account=account_info.id, session=session_id, event='incoming_session', data=dict(originator=session.remote_identity.__dict__, sdp=jsep['sdp'])) log.msg('Incoming session %s %s <-> %s created' % (session.id, session.remote_identity.uri, session.local_identity.uri)) else: registration_state = event_type if registration_state == 'registration_failed': registration_state = 'failed' if account_info.registration_state == registration_state: return account_info.registration_state = registration_state registration_data = dict(state=registration_state) if registration_state == 'failed': code = event_data['code'] reason = event_data['reason'] registration_data['reason'] = '%d %s' % (code, reason) data = dict(sylkrtc='account_event', account=account_info.id, event='registration_state', data=registration_data) log.msg('Account %s registration state changed to %s' % (account_info.id, registration_state)) # TODO: AccountEvent model self._send_data(json.dumps(data)) elif event_type in ('calling', 'accepted', 'hangup'): # session event try: session_info = self.session_handles_map[handle_id] except KeyError: log.warn('Could not find session for handle ID %s' % handle_id) return if event_type == 'hangup' and session_info.state == 'terminated': return if event_type == 'calling': session_info.state = 'progress' elif event_type == 'accepted': session_info.state = 'accepted' elif event_type == 'hangup': session_info.state = 'terminated' data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state)) log.msg('%s session %s state: %s' % (session_info.direction.title(), session_info.id, session_info.state)) if session_info.state == 'accepted' and session_info.direction == 'outgoing': assert jsep is not None data['data']['sdp'] = jsep['sdp'] elif session_info.state == 'terminated': code = event_data.get('code', 0) reason = event_data.get('reason', 'Unknown') reason = '%d %s' % (code, reason) data['data']['reason'] = reason # TODO: SessionEvent model self._send_data(json.dumps(data)) if session_info.state == 'terminated': self._cleanup_session(session_info) log.msg('%s session %s %s <-> %s terminated (%s)' % (session_info.direction.title(), session_info.id, session_info.local_identity.uri, session_info.remote_identity.uri, reason)) # check if missed incoming call if session_info.direction == 'incoming' and code == 487: data = dict(sylkrtc='account_event', account=session_info.account_id, event='missed_session', data=dict(originator=session_info.remote_identity.__dict__)) log.msg('Incoming session from %s missed' % session_info.remote_identity.uri) # TODO: AccountEvent model self._send_data(json.dumps(data)) elif event_type == 'missed_call': try: account_info = self.account_handles_map[handle_id] except KeyError: log.warn('Could not find account for handle ID %s' % handle_id) return originator_uri = SIP_PREFIX_RE.sub('', event_data['caller']) originator_display_name = event_data.get('displayname', '').replace('"', '') # We have no session, so create an identity object by hand originator = SessionPartyIdentity(originator_uri, originator_display_name) data = dict(sylkrtc='account_event', account=account_info.id, event='missed_session', data=dict(originator=originator.__dict__)) log.msg('Incoming session from %s missed' % originator.uri) # TODO: AccountEvent model self._send_data(json.dumps(data)) elif event_type in ('ack', 'declining', 'hangingup', 'proceeding'): # ignore pass else: log.warn('Unexpected SIP plugin event type: %s' % event_type) def _OH_janus_event_videoroom(self, data): handle_id = data['handle_id'] event_type = data['event_type'] - event = data['event'] if event_type == 'event': self._janus_event_plugin_videoroom(data) elif event_type == 'webrtcup': try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find videoroom session for handle ID %s' % handle_id) return base_session = videoroom_session.parent_session if base_session is None: data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='established')) else: # this is a subscriber session data = dict(sylkrtc='videoroom_event', session=base_session.id, event='feed_established', data=dict(state='established', subscription=videoroom_session.id)) log.msg('Videoroom session %s is established' % videoroom_session.id) self._send_data(json.dumps(data)) elif event_type == 'hangup': try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find videoroom session for handle ID %s' % handle_id) return self._cleanup_videoroom_session(videoroom_session) self._maybe_destroy_videoroom(videoroom_session.room) elif event_type in ('media', 'detached'): # ignore pass else: log.warn('Received unexpected event type: %s' % event_type) def _janus_event_plugin_videoroom(self, data): handle_id = data['handle_id'] event = data['event'] plugin_data = event['plugindata'] assert(plugin_data['plugin'] == 'janus.plugin.videoroom') event_data = event['plugindata']['data'] assert 'videoroom' in event_data event_type = event_data['videoroom'] if event_type == 'joined': # a join request succeeded, this is a publisher try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find videoroom session for handle ID %s' % handle_id) return room = videoroom_session.room videoroom_session.publisher_id = event_data['id'] room.add(videoroom_session) jsep = event.get('jsep', None) assert jsep is not None data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='accepted', sdp=jsep['sdp'])) self._send_data(json.dumps(data)) # send information about existing publishers publishers = [] for p in event_data['publishers']: publisher_id = p['id'] publisher_display = p.get('display', '') try: publisher_session = room[publisher_id] except KeyError: log.warn('Could not find matching session for publisher %s' % publisher_id) continue item = { 'id': publisher_session.id, 'uri': publisher_session.account_id, 'display_name': publisher_display, } publishers.append(item) data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='initial_publishers', data=dict(publishers=publishers)) self._send_data(json.dumps(data)) elif event_type == 'event': if 'publishers' in event_data: try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find videoroom session for handle ID %s' % handle_id) return room = videoroom_session.room # send information about new publishers publishers = [] for p in event_data['publishers']: publisher_id = p['id'] publisher_display = p.get('display', '') try: publisher_session = room[publisher_id] except KeyError: log.warn('Could not find matching session for publisher %s' % publisher_id) continue item = { 'id': publisher_session.id, 'uri': publisher_session.account_id, 'display_name': publisher_display, } publishers.append(item) data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='publishers_joined', data=dict(publishers=publishers)) self._send_data(json.dumps(data)) elif 'leaving' in event_data: try: base_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find videoroom session for handle ID %s' % handle_id) return janus_publisher_id = event_data['leaving'] try: publisher_id = base_session.feeds.pop(janus_publisher_id) except KeyError: return data = dict(sylkrtc='videoroom_event', session=base_session.id, event='publishers_left', data=dict(publishers=[publisher_id])) self._send_data(json.dumps(data)) elif {'started', 'unpublished', 'left'}.intersection(event_data): # ignore pass else: log.warn('Received unexpected plugin "event" event') elif event_type == 'attached': # sent when a feed is subscribed for a given publisher try: videoroom_session = self.videoroom_sessions[handle_id] except KeyError: log.warn('Could not find videoroom session for handle ID %s' % handle_id) return # get the session which originated the subscription base_session = videoroom_session.parent_session assert base_session is not None jsep = event.get('jsep', None) assert jsep is not None assert jsep['type'] == 'offer' data = dict(sylkrtc='videoroom_event', session=base_session.id, event='feed_attached', data=dict(sdp=jsep['sdp'], subscription=videoroom_session.id)) self._send_data(json.dumps(data)) else: log.warn('Received unexpected plugin event type: %s' % event_type) - -class SylkWebSocketServerProtocol(WebSocketServerProtocol): - backend = None - connection_handler = None - - def onConnect(self, request): - log.msg('Incoming connection from %s (origin %s)' % (request.peer, request.origin)) - if SYLK_WS_PROTOCOL not in request.protocols: - log.msg('Rejecting connection, remote does not support our sub-protocol') - raise HttpException(406, 'No compatible protocol specified') - if not self.backend.ready: - log.msg('Rejecting connection, backend is not connected') - raise HttpException(503, 'Backend is not connected') - return SYLK_WS_PROTOCOL - - def onOpen(self): - log.msg('Connection from %s open' % self.transport.getPeer()) - self.factory.connections.add(self) - self.connection_handler = ConnectionHandler(self) - self.connection_handler.start() - - def onMessage(self, payload, is_binary): - if is_binary: - log.warn('Received invalid binary message') - return - if GeneralConfig.trace_websocket: - self.factory.ws_logger.msg("IN", ISOTimestamp.now(), payload) - try: - data = json.loads(payload) - except Exception, e: - log.warn('Error parsing WebSocket payload: %s' % e) - return - self.connection_handler.handle_message(data) - - def onClose(self, clean, code, reason): - if self.connection_handler is None: - # Very early connection closed, onOpen wasn't even called - return - log.msg('Connection from %s closed' % self.transport.getPeer()) - self.factory.connections.discard(self) - self.connection_handler.stop() - self.connection_handler = None - - def disconnect(self, code=1000, reason=u''): - self.sendClose(code, reason) - - -class SylkWebSocketServerFactory(WebSocketServerFactory): - implements(IObserver) - - protocol = SylkWebSocketServerProtocol - connections = set() - videorooms = VideoRoomContainer() - backend = None # assigned by WebHandler - - def __init__(self, *args, **kw): - super(SylkWebSocketServerFactory, self).__init__(*args, **kw) - notification_center = NotificationCenter() - notification_center.add_observer(self, name='JanusBackendDisconnected') - - def buildProtocol(self, addr): - protocol = self.protocol() - protocol.factory = self - protocol.backend = self.backend - return protocol - - def handle_notification(self, notification): - handler = getattr(self, '_NH_%s' % notification.name, Null) - handler(notification) - - def _NH_JanusBackendDisconnected(self, notification): - for conn in self.connections.copy(): - conn.failConnection() - self.videorooms.clear() diff --git a/sylk/applications/webrtcgateway/web/protocol.py b/sylk/applications/webrtcgateway/web/protocol.py new file mode 100644 index 0000000..94187f3 --- /dev/null +++ b/sylk/applications/webrtcgateway/web/protocol.py @@ -0,0 +1,65 @@ + +import json + +from autobahn.twisted.websocket import WebSocketServerProtocol +from sipsimple.util import ISOTimestamp + +try: + from autobahn.websocket.http import HttpException +except ImportError: + # AutoBahn 0.12 changed this + from autobahn.websocket import ConnectionDeny as HttpException + +from sylk.applications.webrtcgateway.configuration import GeneralConfig +from sylk.applications.webrtcgateway.logger import log +from sylk.applications.webrtcgateway.web.handler import ConnectionHandler + + +SYLK_WS_PROTOCOL = 'sylkRTC-1' + + +class SylkWebSocketServerProtocol(WebSocketServerProtocol): + backend = None + connection_handler = None + + def onConnect(self, request): + log.msg('Incoming connection from %s (origin %s)' % (request.peer, request.origin)) + if SYLK_WS_PROTOCOL not in request.protocols: + log.msg('Rejecting connection, remote does not support our sub-protocol') + raise HttpException(406, 'No compatible protocol specified') + if not self.backend.ready: + log.msg('Rejecting connection, backend is not connected') + raise HttpException(503, 'Backend is not connected') + return SYLK_WS_PROTOCOL + + def onOpen(self): + log.msg('Connection from %s open' % self.transport.getPeer()) + self.factory.connections.add(self) + self.connection_handler = ConnectionHandler(self) + self.connection_handler.start() + + def onMessage(self, payload, is_binary): + if is_binary: + log.warn('Received invalid binary message') + return + if GeneralConfig.trace_websocket: + self.factory.ws_logger.msg("IN", ISOTimestamp.now(), payload) + try: + data = json.loads(payload) + except Exception, e: + log.warn('Error parsing WebSocket payload: %s' % e) + return + self.connection_handler.handle_message(data) + + def onClose(self, clean, code, reason): + if self.connection_handler is None: + # Very early connection closed, onOpen wasn't even called + return + log.msg('Connection from %s closed' % self.transport.getPeer()) + self.factory.connections.discard(self) + self.connection_handler.stop() + self.connection_handler = None + + def disconnect(self, code=1000, reason=u''): + self.sendClose(code, reason) +