diff --git a/sylk/applications/webrtcgateway/configuration.py b/sylk/applications/webrtcgateway/configuration.py index c9db339..1d8d0ff 100644 --- a/sylk/applications/webrtcgateway/configuration.py +++ b/sylk/applications/webrtcgateway/configuration.py @@ -1,153 +1,156 @@ import os import re from application.configuration import ConfigFile, ConfigSection, ConfigSetting from application.configuration.datatypes import NetworkAddress, StringList from sylk.configuration import ServerConfig from sylk.configuration.datatypes import Path, SIPProxyAddress, VideoBitrate, VideoCodec __all__ = 'GeneralConfig', 'JanusConfig', 'get_room_config' # Datatypes class AccessPolicyValue(str): allowed_values = ('allow,deny', 'deny,allow') def __new__(cls, value): value = re.sub('\s', '', value) if value not in cls.allowed_values: raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values)) return str.__new__(cls, value) class Domain(str): domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$") def __new__(cls, value): value = str(value) if not cls.domain_re.match(value): raise ValueError("illegal domain: %s" % value) return str.__new__(cls, value) class SIPAddress(str): def __new__(cls, address): address = str(address) address = address.replace('@', '%40', address.count('@')-1) try: username, domain = address.split('@') Domain(domain) except ValueError: raise ValueError("illegal SIP address: %s, must be in user@domain format" % address) return str.__new__(cls, address) class PolicyItem(object): def __new__(cls, item): lowercase_item = item.lower() if lowercase_item in ('none', ''): return 'none' elif lowercase_item in ('any', 'all', '*'): return 'all' elif '@' in item: return SIPAddress(item) else: return Domain(item) class PolicySettingValue(object): def __init__(self, value): if isinstance(value, (tuple, list)): items = [str(x) for x in value] elif isinstance(value, str): items = re.split(r'\s*,\s*', value) else: raise TypeError("value must be a string, list or tuple") self.items = {PolicyItem(item) for item in items} self.items.discard('none') def __repr__(self): return '{0.__class__.__name__}({1})'.format(self, sorted(self.items)) def match(self, uri): if 'all' in self.items: return True elif not self.items: return False uri = re.sub('^(sip:|sips:)', '', str(uri)) domain = uri.split('@')[-1] return uri in self.items or domain in self.items class ManagementInterfaceAddress(NetworkAddress): default_port = 20888 # Configuration objects class GeneralConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'General' web_origins = ConfigSetting(type=StringList, value=['*']) sip_domains = ConfigSetting(type=StringList, value=['*']) outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None) trace_client = False websocket_ping_interval = 120 recording_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'recordings'))) + filesharing_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'files'))) http_management_interface = ConfigSetting(type=ManagementInterfaceAddress, value=ManagementInterfaceAddress('127.0.0.1')) http_management_auth_secret = ConfigSetting(type=str, value=None) firebase_server_key = ConfigSetting(type=str, value=None) class JanusConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'Janus' api_url = 'ws://127.0.0.1:8188' api_secret = '0745f2f74f34451c89343afcdcae5809' trace_janus = False max_bitrate = ConfigSetting(type=VideoBitrate, value=VideoBitrate(2016000)) # ~2 MBits/s video_codec = ConfigSetting(type=VideoCodec, value=VideoCodec('vp9')) class RoomConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' record = False access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny')) allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all')) deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none')) max_bitrate = ConfigSetting(type=VideoBitrate, value=JanusConfig.max_bitrate) video_codec = ConfigSetting(type=VideoCodec, value=JanusConfig.video_codec) class VideoroomConfiguration(object): video_codec = 'vp9' max_bitrate = 2016000 record = False recording_dir = None + filesharing_dir = None def __init__(self, data): self.__dict__.update(data) @property def janus_data(self): return dict(videocodec=self.video_codec, bitrate=self.max_bitrate, record=self.record, rec_dir=self.recording_dir) def get_room_config(room): config_file = ConfigFile(RoomConfig.__cfgfile__) section = config_file.get_section(room) if section is not None: RoomConfig.read(section=room) config = VideoroomConfiguration(dict(RoomConfig)) RoomConfig.reset() else: config = VideoroomConfiguration(dict(RoomConfig)) # use room defaults config.recording_dir = os.path.join(GeneralConfig.recording_dir, room) + config.filesharing_dir = os.path.join(GeneralConfig.filesharing_dir, room) return config diff --git a/sylk/applications/webrtcgateway/handler.py b/sylk/applications/webrtcgateway/handler.py index 686d056..c5058dc 100644 --- a/sylk/applications/webrtcgateway/handler.py +++ b/sylk/applications/webrtcgateway/handler.py @@ -1,1281 +1,1338 @@ import hashlib import json import random +import os import time import uuid from application.python import limit from application.python.weakref import defaultweakobjectmap -from application.system import makedirs +from application.system import makedirs, unlink from eventlib import coros, proc +from itertools import count from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.lookup import DNSLookup, DNSLookupError +from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import call_in_green_thread, run_in_green_thread +from shutil import copyfileobj, rmtree from string import maketrans from twisted.internet import reactor from typing import Generic, Container, Iterable, Sized, TypeVar, Dict, Set, Optional, Union +from werkzeug.exceptions import InternalServerError from . import push from .configuration import GeneralConfig, get_room_config from .janus import JanusBackend, JanusError, JanusSession, SIPPluginHandle, VideoroomPluginHandle from .logger import ConnectionLogger, VideoroomLogger from .models import sylkrtc, janus from .storage import TokenStorage class AccountInfo(object): # noinspection PyShadowingBuiltins def __init__(self, id, password, display_name=None, user_agent=None): self.id = id self.password = password self.display_name = display_name self.user_agent = user_agent self.registration_state = None self.janus_handle = None # type: Optional[SIPPluginHandle] @property def uri(self): return 'sip:' + self.id @property def user_data(self): return dict(username=self.uri, display_name=self.display_name, user_agent=self.user_agent, ha1_secret=self.password) class SessionPartyIdentity(object): def __init__(self, uri, display_name=None): self.uri = uri self.display_name = display_name # todo: might need to replace this auto-resetting descriptor with a timer in case we need to know when the slow link state expired class SlowLinkState(object): def __init__(self): self.slow_link = False self.last_reported = 0 class SlowLinkDescriptor(object): __timeout__ = 30 # 30 seconds def __init__(self): self.values = defaultweakobjectmap(SlowLinkState) def __get__(self, instance, owner): if instance is None: return self state = self.values[instance] if state.slow_link and time.time() - state.last_reported > self.__timeout__: state.slow_link = False return state.slow_link def __set__(self, instance, value): state = self.values[instance] if value: state.last_reported = time.time() state.slow_link = bool(value) def __delete__(self, instance): raise AttributeError('Attribute cannot be deleted') class SIPSessionInfo(object): slow_download = SlowLinkDescriptor() slow_upload = SlowLinkDescriptor() # noinspection PyShadowingBuiltins def __init__(self, id): self.id = id self.direction = None self.state = None self.account = None # type: AccountInfo self.local_identity = None # type: SessionPartyIdentity self.remote_identity = None # type: SessionPartyIdentity self.janus_handle = None # type: SIPPluginHandle self.slow_download = False self.slow_upload = False def init_outgoing(self, account, destination): self.account = account self.direction = 'outgoing' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account.id) self.remote_identity = SessionPartyIdentity(destination) def init_incoming(self, account, originator, originator_display_name=''): self.account = account self.direction = 'incoming' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account.id) self.remote_identity = SessionPartyIdentity(originator, originator_display_name) class VideoroomSessionInfo(object): slow_download = SlowLinkDescriptor() slow_upload = SlowLinkDescriptor() # noinspection PyShadowingBuiltins def __init__(self, id, owner): self.id = id self.owner = owner self.account = None # type: AccountInfo self.type = None # publisher / subscriber self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers self.janus_handle = None # type: VideoroomPluginHandle self.room = None self.bitrate = None self.parent_session = None self.slow_download = False self.slow_upload = False self.feeds = PublisherFeedContainer() # keeps references to all the other participant's publisher feeds that we subscribed to # noinspection PyShadowingBuiltins def initialize(self, account, type, room): assert type in ('publisher', 'subscriber') self.account = account self.type = type self.room = room self.bitrate = room.config.max_bitrate def __repr__(self): return '<{0.__class__.__name__}: type={0.type!r} id={0.id!r} janus_handle={0.janus_handle!r}>'.format(self) class PublisherFeedContainer(object): """A container for the other participant's publisher sessions that we have subscribed to""" def __init__(self): self._publishers = set() self._id_map = {} # map publisher.id -> publisher and publisher.publisher_id -> publisher def add(self, session): assert session not in self._publishers assert session.id not in self._id_map and session.publisher_id not in self._id_map self._publishers.add(session) self._id_map[session.id] = self._id_map[session.publisher_id] = session def discard(self, item): # item can be any of session, session.id or session.publisher_id session = self._id_map[item] if item in self._id_map else item if item in self._publishers else None if session is not None: self._publishers.discard(session) self._id_map.pop(session.id, None) self._id_map.pop(session.publisher_id, None) def remove(self, item): # item can be any of session, session.id or session.publisher_id session = self._id_map[item] if item in self._id_map else item self._publishers.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.publisher_id) def pop(self, item): # item can be any of session, session.id or session.publisher_id session = self._id_map[item] if item in self._id_map else item self._publishers.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.publisher_id) return session def clear(self): self._publishers.clear() self._id_map.clear() def __len__(self): return len(self._publishers) def __iter__(self): return iter(self._publishers) def __getitem__(self, key): return self._id_map[key] def __contains__(self, item): return item in self._id_map or item in self._publishers class Videoroom(object): def __init__(self, uri): self.id = random.getrandbits(32) # janus needs numeric room names self.uri = uri self.config = get_room_config(uri) self.log = VideoroomLogger(self) self._active_participants = [] self._sessions = set() # type: Set[VideoroomSessionInfo] self._id_map = {} # type: Dict[Union[str, int], VideoroomSessionInfo] # map session.id -> session and session.publisher_id -> session + self._shared_files = [] if self.config.record: makedirs(self.config.recording_dir, 0o755) self.log.info('created (recording on)') else: self.log.info('created') @property def active_participants(self): return self._active_participants @active_participants.setter def active_participants(self, participant_list): unknown_participants = set(participant_list).difference(self._id_map) if unknown_participants: raise ValueError('unknown participant session id: {}'.format(', '.join(unknown_participants))) if self._active_participants != participant_list: self._active_participants = participant_list self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None)) self._update_bitrate() def add(self, session): assert session not in self._sessions assert session.publisher_id is not None assert session.publisher_id not in self._id_map and session.id not in self._id_map self._sessions.add(session) self._id_map[session.id] = self._id_map[session.publisher_id] = session self.log.info('{session.account.id} has joined the room'.format(session=session)) self._update_bitrate() if self._active_participants: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=self._active_participants, originator='videoroom')) + if self._shared_files: + session.owner.send(sylkrtc.VideoroomFileSharingEvent(session=session.id, files=self._shared_files)) def discard(self, session): if session in self._sessions: self._sessions.discard(session) self._id_map.pop(session.id, None) self._id_map.pop(session.publisher_id, None) self.log.info('{session.account.id} has left the room'.format(session=session)) if session.id in self._active_participants: self._active_participants.remove(session.id) self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None)) for session in self._sessions: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=self._active_participants, originator='videoroom')) self._update_bitrate() def remove(self, session): self._sessions.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.publisher_id) self.log.info('{session.account.id} has left the room'.format(session=session)) if session.id in self._active_participants: self._active_participants.remove(session.id) self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None)) for session in self._sessions: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=self._active_participants, originator='videoroom')) self._update_bitrate() def clear(self): for session in self._sessions: self.log.info('{session.account.id} has left the room'.format(session=session)) self._active_participants = [] + self._shared_files = [] self._sessions.clear() self._id_map.clear() def allow_uri(self, uri): config = self.config if config.access_policy == 'allow,deny': return config.allow.match(uri) and not config.deny.match(uri) else: return not config.deny.match(uri) or config.allow.match(uri) + def add_file(self, upload_request): + self._write_file(upload_request) + + def get_file(self, filename): + path = os.path.join(self.config.filesharing_dir, filename) + if os.path.exists(path): + return path + else: + raise LookupError('file does not exist') + + def _fix_path(self, path): + name, extension = os.path.splitext(path) + for x in count(0, step=-1): + path = '{}{}{}'.format(name, x or '', extension) + if not os.path.exists(path) and not os.path.islink(path): + return path + + @run_in_thread('file-io') + def _write_file(self, upload_request): + makedirs(self.config.filesharing_dir) + path = self._fix_path(os.path.join(self.config.filesharing_dir, upload_request.shared_file.filename)) + upload_request.shared_file.filename = os.path.basename(path) + try: + with open(path, 'wb') as output_file: + copyfileobj(upload_request.content, output_file) + except (OSError, IOError): + upload_request.had_error = True + unlink(path) + self._write_file_done(upload_request) + + @run_in_twisted_thread + def _write_file_done(self, upload_request): + if upload_request.had_error: + upload_request.deferred.errback(InternalServerError('could not save file')) + else: + self._shared_files.append(upload_request.shared_file) + for session in self._sessions: + session.owner.send(sylkrtc.VideoroomFileSharingEvent(session=session.id, files=[upload_request.shared_file])) + upload_request.deferred.callback('OK') + + def cleanup(self): + self._remove_files() + + @run_in_thread('file-io') + def _remove_files(self): + rmtree(self.config.filesharing_dir, ignore_errors=True) + def _update_bitrate(self): if self._sessions: if self._active_participants: # todo: should we use max_bitrate / 2 or max_bitrate for each active participant if there are 2 active participants? active_participant_bitrate = self.config.max_bitrate // len(self._active_participants) other_participant_bitrate = 100000 self.log.info('participant bitrate is {} (active) / {} (others)'.format(active_participant_bitrate, other_participant_bitrate)) for session in self._sessions: if session.id in self._active_participants: bitrate = active_participant_bitrate else: bitrate = other_participant_bitrate if session.bitrate != bitrate: session.bitrate = bitrate session.janus_handle.message(janus.VideoroomUpdatePublisher(bitrate=bitrate), async=True) else: bitrate = self.config.max_bitrate // limit(len(self._sessions) - 1, min=1) self.log.info('participant bitrate is {}'.format(bitrate)) for session in self._sessions: if session.bitrate != bitrate: session.bitrate = bitrate session.janus_handle.message(janus.VideoroomUpdatePublisher(bitrate=bitrate), async=True) # todo: make Videoroom be a context manager that is retained/released on enter/exit and implement __nonzero__ to be different from __len__ # todo: so that a videoroom is not accidentally released by the last participant leaving while a new participant waits to join # todo: this needs a new model for communication with janus and the client that is pseudo-synchronous (uses green threads) def __len__(self): return len(self._sessions) def __iter__(self): return iter(self._sessions) def __getitem__(self, key): return self._id_map[key] def __contains__(self, item): return item in self._id_map or item in self._sessions SessionT = TypeVar('SessionT', SIPSessionInfo, VideoroomSessionInfo) class SessionContainer(Sized, Iterable[SessionT], Container[SessionT], Generic[SessionT]): def __init__(self): self._sessions = set() self._id_map = {} # map session.id -> session and session.janus_handle.id -> session def add(self, session): assert session not in self._sessions assert session.id not in self._id_map and session.janus_handle.id not in self._id_map self._sessions.add(session) self._id_map[session.id] = self._id_map[session.janus_handle.id] = session def discard(self, item): # item can be any of session, session.id or session.janus_handle.id session = self._id_map[item] if item in self._id_map else item if item in self._sessions else None if session is not None: self._sessions.discard(session) self._id_map.pop(session.id, None) self._id_map.pop(session.janus_handle.id, None) def remove(self, item): # item can be any of session, session.id or session.janus_handle.id session = self._id_map[item] if item in self._id_map else item self._sessions.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.janus_handle.id) def pop(self, item): # item can be any of session, session.id or session.janus_handle.id session = self._id_map[item] if item in self._id_map else item self._sessions.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.janus_handle.id) return session def clear(self): self._sessions.clear() self._id_map.clear() def __len__(self): return len(self._sessions) def __iter__(self): return iter(self._sessions) def __getitem__(self, key): return self._id_map[key] def __contains__(self, item): return item in self._id_map or item in self._sessions class OperationName(str): __normalizer__ = maketrans('-', '_') @property def normalized(self): return self.translate(self.__normalizer__) class Operation(object): __slots__ = 'type', 'name', 'data' __types__ = 'request', 'event' # noinspection PyShadowingBuiltins def __init__(self, type, name, data): if type not in self.__types__: raise ValueError("Can't instantiate class {.__class__.__name__} with unknown type: {!r}".format(self, type)) self.type = type self.name = OperationName(name) self.data = data class APIError(Exception): pass class GreenEvent(object): def __init__(self): self._event = coros.event() def set(self): if self._event.ready(): return self._event.send(True) def is_set(self): return self._event.ready() def clear(self): if self._event.ready(): self._event.reset() def wait(self): return self._event.wait() # noinspection PyPep8Naming class ConnectionHandler(object): janus = JanusBackend() def __init__(self, protocol): self.protocol = protocol self.device_id = hashlib.md5(protocol.peer).digest().encode('base64').rstrip('=\n') self.janus_session = None # type: JanusSession self.accounts_map = {} # account ID -> account self.account_handles_map = {} # Janus handle ID -> account self.sip_sessions = SessionContainer() # type: SessionContainer[SIPSessionInfo] # incoming and outgoing SIP sessions self.videoroom_sessions = SessionContainer() # type: SessionContainer[VideoroomSessionInfo] # publisher and subscriber sessions in video rooms self.ready_event = GreenEvent() self.resolver = DNSLookup() self.proc = proc.spawn(self._operations_handler) self.operations_queue = coros.queue() self.log = ConnectionLogger(self) self.state = None self._stop_pending = False @run_in_green_thread def start(self): self.state = 'starting' try: self.janus_session = JanusSession() except Exception as e: self.state = 'failed' self.log.warning('could not create session, disconnecting: %s' % e) if self._stop_pending: # if stop was already called it means we were already disconnected self.stop() else: self.protocol.disconnect(3000, unicode(e)) else: self.state = 'started' self.ready_event.set() if self._stop_pending: self.stop() else: self.send(sylkrtc.ReadyEvent()) def stop(self): if self.state in (None, 'starting'): self._stop_pending = True return self.state = 'stopping' self._stop_pending = False if self.proc is not None: # Kill the operation's handler proc first, in order to not have any operations active while we cleanup. self.proc.kill() # Also proc.kill() will switch to another green thread, which is another reason to do it first so that self.proc = None # we do not switch to another green thread in the middle of the cleanup with a partially deleted handler if self.ready_event.is_set(): # Do not explicitly detach the janus plugin handles before destroying the janus session. Janus runs each request in a different # thread, so making detach and destroy request without waiting for the detach to finish can result in errors from race conditions. # Because we do not want to wait for them, we will rely instead on the fact that janus automatically detaches the plugin handles # when it destroys a session, so we only remove our event handlers and issue a destroy request for the session. for account_info in self.accounts_map.values(): if account_info.janus_handle is not None: self.janus.set_event_handler(account_info.janus_handle.id, None) for session in self.sip_sessions: if session.janus_handle is not None: self.janus.set_event_handler(session.janus_handle.id, None) for session in self.videoroom_sessions: if session.janus_handle is not None: self.janus.set_event_handler(session.janus_handle.id, None) session.room.discard(session) session.feeds.clear() self.janus_session.destroy() # this automatically detaches all plugin handles associated with it, no need to manually do it # cleanup self.ready_event.clear() self.accounts_map.clear() self.account_handles_map.clear() self.sip_sessions.clear() self.videoroom_sessions.clear() self.janus_session = None self.protocol = None self.state = 'stopped' def handle_message(self, message): try: request = sylkrtc.SylkRTCRequest.from_message(message) except sylkrtc.ProtocolError as e: self.log.error(str(e)) except Exception as e: self.log.error('{request_type}: {exception!s}'.format(request_type=message['sylkrtc'], exception=e)) if 'transaction' in message: self.send(sylkrtc.ErrorResponse(transaction=message['transaction'], error=str(e))) else: operation = Operation(type='request', name=request.sylkrtc, data=request) self.operations_queue.send(operation) def send(self, message): if self.protocol is not None: self.protocol.sendMessage(json.dumps(message.__data__)) # internal methods (not overriding / implementing the protocol API) def _cleanup_session(self, session): # should only be called from a green thread. if self.janus_session is None: # The connection was closed, there is noting to do return if session in self.sip_sessions: self.sip_sessions.remove(session) if session.direction == 'outgoing': # Destroy plugin handle for outgoing sessions. For incoming ones it's the same as the account handle, so don't session.janus_handle.detach() def _cleanup_videoroom_session(self, session): # should only be called from a green thread. if self.janus_session is None: # The connection was closed, there is noting to do return if session in self.videoroom_sessions: self.videoroom_sessions.remove(session) if session.type == 'publisher': session.room.discard(session) session.feeds.clear() session.janus_handle.detach() self._maybe_destroy_videoroom(session.room) else: session.parent_session.feeds.discard(session.publisher_id) session.janus_handle.detach() def _maybe_destroy_videoroom(self, videoroom): # should only be called from a green thread. if self.protocol is None or self.janus_session is None: # The connection was closed, there is nothing to do return if videoroom in self.protocol.factory.videorooms and not videoroom: self.protocol.factory.videorooms.remove(videoroom) + videoroom.cleanup() with VideoroomPluginHandle(self.janus_session, event_handler=self._handle_janus_videoroom_event) as videoroom_handle: videoroom_handle.destroy(room=videoroom.id) videoroom.log.info('destroyed') def _lookup_sip_proxy(self, uri): # The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed # as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use # caching to avoid long delays, we randomize the results matching the highest priority route's # transport. proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: sip_uri = SIPURI.parse('sip:%s' % uri) settings = SIPSimpleSettings() try: routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait() except DNSLookupError as e: raise DNSLookupError('DNS lookup error: {exception!s}'.format(exception=e)) if not routes: raise DNSLookupError('DNS lookup error: no results found') route = random.choice([r for r in routes if r.transport == routes[0].transport]) self.log.debug('DNS lookup for SIP proxy for {} yielded {}'.format(uri, route)) # Build a proxy URI Sofia-SIP likes return 'sips:{route.address}:{route.port}'.format(route=route) if route.transport == 'tls' else str(route.uri) def _handle_janus_sip_event(self, event): operation = Operation(type='event', name='janus-sip', data=event) self.operations_queue.send(operation) def _handle_janus_videoroom_event(self, event): operation = Operation(type='event', name='janus-videoroom', data=event) self.operations_queue.send(operation) def _operations_handler(self): self.ready_event.wait() while True: operation = self.operations_queue.wait() handler = getattr(self, '_OH_' + operation.type) handler(operation) del operation, handler def _OH_request(self, operation): handler = getattr(self, '_RH_' + operation.name.normalized) request = operation.data try: handler(request) except (APIError, DNSLookupError, JanusError) as e: self.log.error('{operation.name}: {exception!s}'.format(operation=operation, exception=e)) self.send(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) except Exception as e: self.log.exception('{operation.type} {operation.name}: {exception!s}'.format(operation=operation, exception=e)) self.send(sylkrtc.ErrorResponse(transaction=request.transaction, error='Internal error')) else: self.send(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_event(self, operation): handler = getattr(self, '_EH_' + operation.name.normalized) try: handler(operation.data) except Exception as e: self.log.exception('{operation.type} {operation.name}: {exception!s}'.format(operation=operation, exception=e)) # Request handlers def _RH_account_add(self, request): if request.account in self.accounts_map: raise APIError('Account {request.account} already added'.format(request=request)) # check if domain is acceptable domain = request.account.partition('@')[2] if not {'*', domain}.intersection(GeneralConfig.sip_domains): raise APIError('SIP domain not allowed: %s' % domain) # Create and store our mapping account_info = AccountInfo(request.account, request.password, request.display_name, request.user_agent) self.accounts_map[account_info.id] = account_info self.log.info('added account {request.account} using {request.user_agent}'.format(request=request)) def _RH_account_remove(self, request): try: account_info = self.accounts_map.pop(request.account) except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) # cleanup in case the client didn't unregister before removing the account if account_info.janus_handle is not None: account_info.janus_handle.detach() self.account_handles_map.pop(account_info.janus_handle.id) self.log.info('removed account {request.account}'.format(request=request)) def _RH_account_register(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) proxy = self._lookup_sip_proxy(request.account) if account_info.janus_handle is not None: # Destroy the existing plugin handle account_info.janus_handle.detach() self.account_handles_map.pop(account_info.janus_handle.id) account_info.janus_handle = None # Create a plugin handle account_info.janus_handle = SIPPluginHandle(self.janus_session, event_handler=self._handle_janus_sip_event) self.account_handles_map[account_info.janus_handle.id] = account_info account_info.janus_handle.register(account_info, proxy=proxy) def _RH_account_unregister(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) if account_info.janus_handle is not None: account_info.janus_handle.detach() self.account_handles_map.pop(account_info.janus_handle.id) account_info.janus_handle = None self.log.info('unregistered {request.account} from receiving incoming calls'.format(request=request)) def _RH_account_devicetoken(self, request): if request.account not in self.accounts_map: raise APIError('Unknown account specified: {request.account}'.format(request=request)) storage = TokenStorage() if request.old_token is not None: storage.remove(request.account, request.old_token) self.log.debug('removed token {request.old_token} for {request.account}'.format(request=request)) if request.new_token is not None: storage.add(request.account, request.new_token) self.log.debug('added token {request.new_token} for {request.account}'.format(request=request)) def _RH_session_create(self, request): if request.session in self.sip_sessions: raise APIError('Session ID {request.session} already in use'.format(request=request)) try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) proxy = self._lookup_sip_proxy(request.uri) # Create a new plugin handle and 'register' it, without actually doing so janus_handle = SIPPluginHandle(self.janus_session, event_handler=self._handle_janus_sip_event) try: janus_handle.call(account_info, uri=request.uri, sdp=request.sdp, proxy=proxy) except Exception: janus_handle.detach() raise session_info = SIPSessionInfo(request.session) session_info.janus_handle = janus_handle session_info.init_outgoing(account_info, request.uri) self.sip_sessions.add(session_info) self.log.info('created outgoing session {request.session} from {request.account} to {request.uri}'.format(request=request)) def _RH_session_answer(self, request): try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session specified: {request.session}'.format(request=request)) if session_info.direction != 'incoming': raise APIError('Cannot answer outgoing session {request.session}'.format(request=request)) if session_info.state != 'connecting': raise APIError('Invalid state for answering session {session.id}: {session.state}'.format(session=session_info)) session_info.janus_handle.accept(sdp=request.sdp) self.log.info('answered incoming session {session.id}'.format(session=session_info)) def _RH_session_trickle(self, request): try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session specified: {request.session}'.format(request=request)) if session_info.state == 'terminated': raise APIError('Session {request.session} is terminated'.format(request=request)) session_info.janus_handle.trickle(request.candidates) if not request.candidates: self.log.debug('session {session.id} negotiated ICE'.format(session=session_info)) def _RH_session_terminate(self, request): try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session specified: {request.session}'.format(request=request)) if session_info.state not in ('connecting', 'progress', 'accepted', 'established'): raise APIError('Invalid state for terminating session {session.id}: {session.state}'.format(session=session_info)) if session_info.direction == 'incoming' and session_info.state == 'connecting': session_info.janus_handle.decline() else: session_info.janus_handle.hangup() self.log.info('requested termination for session {session.id}'.format(session=session_info)) def _RH_videoroom_join(self, request): if request.session in self.videoroom_sessions: raise APIError('Session ID {request.session} already in use'.format(request=request)) try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) try: videoroom = self.protocol.factory.videorooms[request.uri] except KeyError: videoroom = Videoroom(request.uri) self.protocol.factory.videorooms.add(videoroom) if not videoroom.allow_uri(request.account): self._maybe_destroy_videoroom(videoroom) raise APIError('{request.account} is not allowed to join room {request.uri}'.format(request=request)) try: videoroom_handle = VideoroomPluginHandle(self.janus_session, event_handler=self._handle_janus_videoroom_event) try: try: videoroom_handle.create(room=videoroom.id, config=videoroom.config, publishers=10) except JanusError as e: if e.code != 427: # 427 means room already exists raise videoroom_handle.join(room=videoroom.id, sdp=request.sdp, display_name=account_info.display_name) except Exception: videoroom_handle.detach() raise except Exception: self._maybe_destroy_videoroom(videoroom) raise videoroom_session = VideoroomSessionInfo(request.session, owner=self) videoroom_session.janus_handle = videoroom_handle videoroom_session.initialize(account_info, 'publisher', videoroom) self.videoroom_sessions.add(videoroom_session) self.send(sylkrtc.VideoroomSessionProgressEvent(session=videoroom_session.id)) self.log.info('created session {request.session} from account {request.account} to video room {request.uri}'.format(request=request)) def _RH_videoroom_leave(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) videoroom_session.janus_handle.leave() self.send(sylkrtc.VideoroomSessionTerminatedEvent(session=videoroom_session.id)) # safety net in case we do not get any answer for the leave request # todo: to be adjusted later after pseudo-synchronous communication with janus is implemented reactor.callLater(2, call_in_green_thread, self._cleanup_videoroom_session, videoroom_session) self.log.info('leaving video room session {request.session}'.format(request=request)) def _RH_videoroom_configure(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) videoroom = videoroom_session.room # todo: should we send out events if the active participant list did not change? try: videoroom.active_participants = request.active_participants except ValueError as e: raise APIError(str(e)) for session in videoroom: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=videoroom.active_participants, originator=request.session)) def _RH_videoroom_feed_attach(self, request): if request.feed in self.videoroom_sessions: raise APIError('Video room session ID {request.feed} already in use'.format(request=request)) try: base_session = self.videoroom_sessions[request.session] # our 'base' session (the one used to join and publish) except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) try: publisher_session = base_session.room[request.publisher] # the publisher's session (the one we want to subscribe to) except KeyError: raise APIError('Unknown publisher video room session to attach to: {request.publisher}'.format(request=request)) if publisher_session.publisher_id is None: raise APIError('Video room session {session.id} does not have a publisher ID'.format(session=publisher_session)) videoroom_handle = VideoroomPluginHandle(self.janus_session, event_handler=self._handle_janus_videoroom_event) try: videoroom_handle.feed_attach(room=base_session.room.id, feed=publisher_session.publisher_id) except Exception: videoroom_handle.detach() raise videoroom_session = VideoroomSessionInfo(request.feed, owner=self) videoroom_session.janus_handle = videoroom_handle videoroom_session.parent_session = base_session videoroom_session.publisher_id = publisher_session.id videoroom_session.initialize(base_session.account, 'subscriber', base_session.room) self.videoroom_sessions.add(videoroom_session) base_session.feeds.add(publisher_session) def _RH_videoroom_feed_answer(self, request): try: videoroom_session = self.videoroom_sessions[request.feed] except KeyError: raise APIError('Unknown video room session: {request.feed}'.format(request=request)) if videoroom_session.parent_session.id != request.session: raise APIError('{request.feed} is not an attached feed of {request.session}'.format(request=request)) videoroom_session.janus_handle.feed_start(sdp=request.sdp) def _RH_videoroom_feed_detach(self, request): try: videoroom_session = self.videoroom_sessions[request.feed] except KeyError: raise APIError('Unknown video room session to detach: {request.feed}'.format(request=request)) if videoroom_session.parent_session.id != request.session: raise APIError('{request.feed} is not an attached feed of {request.session}'.format(request=request)) videoroom_session.janus_handle.feed_detach() # safety net in case we do not get any answer for the feed_detach request # todo: to be adjusted later after pseudo-synchronous communication with janus is implemented reactor.callLater(2, call_in_green_thread, self._cleanup_videoroom_session, videoroom_session) def _RH_videoroom_invite(self, request): try: base_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) room = base_session.room participants = set(request.participants) originator = sylkrtc.SIPIdentity(uri=base_session.account.id, display_name=base_session.account.display_name) event = sylkrtc.AccountConferenceInviteEvent(account='placeholder', room=room.uri, originator=originator) for protocol in self.protocol.factory.connections.difference([self.protocol]): connection_handler = protocol.connection_handler for account in participants.intersection(connection_handler.accounts_map): event.account = account connection_handler.send(event) room.log.info('invitation from %s for %s', originator.uri, account) connection_handler.log.info('received an invitation from %s for %s to join video room %s', originator.uri, account, room.uri) for participant in participants: push.conference_invite(originator=originator.uri, destination=participant, room=room.uri) def _RH_videoroom_session_trickle(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) videoroom_session.janus_handle.trickle(request.candidates) if not request.candidates and videoroom_session.type == 'publisher': self.log.debug('video room session {session.id} negotiated ICE'.format(session=videoroom_session)) def _RH_videoroom_session_update(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) options = request.options.__data__ if options: videoroom_session.janus_handle.update_publisher(options) modified = ', '.join('{}={}'.format(key, options[key]) for key in options) self.log.info('updated video room session {request.session} with {modified}'.format(request=request, modified=modified)) # Event handlers def _EH_janus_sip(self, event): if isinstance(event, janus.PluginEvent): event_id = event.plugindata.data.__id__ try: handler = getattr(self, '_EH_janus_' + '_'.join(event_id)) except AttributeError: self.log.warning('unhandled Janus SIP event: {event_name}'.format(event_name=event_id[-1])) else: self.log.debug('janus SIP event: {event_name} (handle_id={event.sender})'.format(event=event, event_name=event_id[-1])) handler(event) else: # janus.CoreEvent try: handler = getattr(self, '_EH_janus_sip_' + event.janus) except AttributeError: self.log.warning('unhandled Janus SIP event: {event.janus}'.format(event=event)) else: self.log.debug('janus SIP event: {event.janus} (handle_id={event.sender})'.format(event=event)) handler(event) def _EH_janus_sip_error(self, event): # fixme: implement error handling self.log.error('got SIP error event: {}'.format(event.__data__)) handle_id = event.sender if handle_id in self.sip_sessions: pass # this is a session related event elif handle_id in self.account_handles_map: pass # this is an account related event def _EH_janus_sip_webrtcup(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for webrtcup event'.format(event=event)) return session_info.state = 'established' self.send(sylkrtc.SessionEstablishedEvent(session=session_info.id)) self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) self.log.info('established WEBRTC connection for session {session.id}'.format(session=session_info)) def _EH_janus_sip_hangup(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: return if session_info.state != 'terminated': session_info.state = 'terminated' reason = event.reason or 'unspecified reason' self.send(sylkrtc.SessionTerminatedEvent(session=session_info.id, reason=reason)) self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason)) self._cleanup_session(session_info) def _EH_janus_sip_slowlink(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for slowlink event'.format(event=event)) return if event.uplink: # uplink is from janus' point of view if not session_info.slow_download: self.log.info('poor download connectivity for session {session.id}'.format(session=session_info)) session_info.slow_download = True else: if not session_info.slow_upload: self.log.info('poor upload connectivity for session {session.id}'.format(session=session_info)) session_info.slow_upload = True def _EH_janus_sip_media(self, event): pass def _EH_janus_sip_detached(self, event): pass def _EH_janus_sip_event_registering(self, event): try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for registering event'.format(event=event)) return if account_info.registration_state != 'registering': account_info.registration_state = 'registering' self.send(sylkrtc.AccountRegisteringEvent(account=account_info.id)) def _EH_janus_sip_event_registered(self, event): if event.sender in self.sip_sessions: # skip 'registered' events from outgoing session handles return try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for registered event'.format(event=event)) return if account_info.registration_state != 'registered': account_info.registration_state = 'registered' self.send(sylkrtc.AccountRegisteredEvent(account=account_info.id)) self.log.info('registered {account.id} to receive incoming calls'.format(account=account_info)) def _EH_janus_sip_event_registration_failed(self, event): try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for registration failed event'.format(event=event)) return if account_info.registration_state != 'failed': account_info.registration_state = 'failed' reason = '{result.code} {result.reason}'.format(result=event.plugindata.data.result) self.send(sylkrtc.AccountRegistrationFailedEvent(account=account_info.id, reason=reason)) self.log.info('registration for {account.id} failed: {reason}'.format(account=account_info, reason=reason)) def _EH_janus_sip_event_incomingcall(self, event): try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for incoming call event'.format(event=event)) return assert event.jsep is not None data = event.plugindata.data.result # type: janus.SIPResultIncomingCall originator = sylkrtc.SIPIdentity(uri=data.username, display_name=data.displayname) session = SIPSessionInfo(uuid.uuid4().hex) session.janus_handle = account_info.janus_handle session.init_incoming(account_info, originator.uri, originator.display_name) self.sip_sessions.add(session) self.send(sylkrtc.AccountIncomingSessionEvent(account=account_info.id, session=session.id, originator=originator, sdp=event.jsep.sdp)) self.log.info('received incoming session {session.id} from {session.remote_identity.uri!s} to {session.local_identity.uri!s}'.format(session=session)) def _EH_janus_sip_event_missed_call(self, event): try: account_info = self.account_handles_map[event.sender] except KeyError: self.log.warning('could not find account with handle ID {event.sender} for missed call event'.format(event=event)) return data = event.plugindata.data.result # type: janus.SIPResultMissedCall originator = sylkrtc.SIPIdentity(uri=data.caller, display_name=data.displayname) self.send(sylkrtc.AccountMissedSessionEvent(account=account_info.id, originator=originator)) self.log.info('missed incoming call from {originator.uri}'.format(originator=originator)) def _EH_janus_sip_event_calling(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for calling event'.format(event=event)) return session_info.state = 'progress' self.send(sylkrtc.SessionProgressEvent(session=session_info.id)) self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) def _EH_janus_sip_event_accepted(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for accepted event'.format(event=event)) return session_info.state = 'accepted' if session_info.direction == 'outgoing': assert event.jsep is not None self.send(sylkrtc.SessionAcceptedEvent(session=session_info.id, sdp=event.jsep.sdp)) else: self.send(sylkrtc.SessionAcceptedEvent(session=session_info.id)) self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) def _EH_janus_sip_event_hangup(self, event): try: session_info = self.sip_sessions[event.sender] except KeyError: self.log.warning('could not find SIP session with handle ID {event.sender} for hangup event'.format(event=event)) return if session_info.state != 'terminated': session_info.state = 'terminated' data = event.plugindata.data.result # type: janus.SIPResultHangup reason = '{0.code} {0.reason}'.format(data) self.send(sylkrtc.SessionTerminatedEvent(session=session_info.id, reason=reason)) if session_info.direction == 'incoming' and data.code == 487: # incoming call was cancelled -> missed self.send(sylkrtc.AccountMissedSessionEvent(account=session_info.account.id, originator=session_info.remote_identity.__dict__)) if data.code >= 300: self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason)) else: self.log.info('{session.direction} session {session.id} terminated'.format(session=session_info)) self._cleanup_session(session_info) def _EH_janus_sip_event_declining(self, event): pass def _EH_janus_sip_event_hangingup(self, event): pass def _EH_janus_sip_event_proceeding(self, event): pass def _EH_janus_sip_event_progress(self, event): # TODO: implement this to handle 183 w/ SDP -Dan pass def _EH_janus_sip_event_ringing(self, event): # TODO: check if we want to use this -Dan pass def _EH_janus_videoroom(self, event): if isinstance(event, janus.PluginEvent): event_id = event.plugindata.data.__id__ try: handler = getattr(self, '_EH_janus_' + '_'.join(event_id)) except AttributeError: self.log.warning('unhandled Janus videoroom event: {event_name}'.format(event_name=event_id[-1])) else: self.log.debug('janus videoroom event: {event_name} (handle_id={event.sender})'.format(event=event, event_name=event_id[-1])) handler(event) else: # janus.CoreEvent try: handler = getattr(self, '_EH_janus_videoroom_' + event.janus) except AttributeError: self.log.warning('unhandled Janus videoroom event: {event.janus}'.format(event=event)) else: self.log.debug('janus videoroom event: {event.janus} (handle_id={event.sender})'.format(event=event)) handler(event) def _EH_janus_videoroom_error(self, event): # fixme: implement error handling self.log.error('got videoroom error event: {}'.format(event.__data__)) try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find video room session with handle ID {event.sender} for error event'.format(event=event)) return if videoroom_session.type == 'publisher': pass else: pass def _EH_janus_videoroom_webrtcup(self, event): try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find video room session with handle ID {event.sender} for webrtcup event'.format(event=event)) return if videoroom_session.type == 'publisher': self.log.info('established WEBRTC connection for session {session.id}'.format(session=videoroom_session)) self.send(sylkrtc.VideoroomSessionEstablishedEvent(session=videoroom_session.id)) else: self.send(sylkrtc.VideoroomFeedEstablishedEvent(session=videoroom_session.parent_session.id, feed=videoroom_session.id)) def _EH_janus_videoroom_hangup(self, event): try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: return self._cleanup_videoroom_session(videoroom_session) def _EH_janus_videoroom_slowlink(self, event): try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find video room session with handle ID {event.sender} for slowlink event'.format(event=event)) return if event.uplink: # uplink is from janus' point of view if not videoroom_session.slow_download: self.log.info('poor download connectivity to video room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) videoroom_session.slow_download = True else: if not videoroom_session.slow_upload: self.log.info('poor upload connectivity to video room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) videoroom_session.slow_upload = True def _EH_janus_videoroom_media(self, event): pass def _EH_janus_videoroom_detached(self, event): pass def _EH_janus_videoroom_joined(self, event): # send when a publisher successfully joined a room try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find video room session with handle ID {event.sender} for joined event'.format(event=event)) return self.log.info('joined video room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) data = event.plugindata.data # type: janus.VideoroomJoined videoroom_session.publisher_id = data.id room = videoroom_session.room assert event.jsep is not None self.send(sylkrtc.VideoroomSessionAcceptedEvent(session=videoroom_session.id, sdp=event.jsep.sdp)) # send information about existing publishers publishers = [] for publisher in data.publishers: # type: janus.VideoroomPublisher try: publisher_session = room[publisher.id] except KeyError: self.log.warning('could not find matching session for publisher {publisher.id} during joined event'.format(publisher=publisher)) else: publishers.append(dict(id=publisher_session.id, uri=publisher_session.account.id, display_name=publisher.display or '')) self.send(sylkrtc.VideoroomInitialPublishersEvent(session=videoroom_session.id, publishers=publishers)) room.add(videoroom_session) # adding the session to the room might also trigger sending an event with the active participants which must be sent last def _EH_janus_videoroom_attached(self, event): # sent when a feed is subscribed for a given publisher try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find video room session with handle ID {event.sender} for attached event'.format(event=event)) return # get the session which originated the subscription base_session = videoroom_session.parent_session assert base_session is not None assert event.jsep is not None and event.jsep.type == 'offer' self.send(sylkrtc.VideoroomFeedAttachedEvent(session=base_session.id, feed=videoroom_session.id, sdp=event.jsep.sdp)) def _EH_janus_videoroom_slow_link(self, event): pass def _EH_janus_videoroom_event_publishers(self, event): try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: self.log.warning('could not find video room session with handle ID {event.sender} for publishers event'.format(event=event)) return room = videoroom_session.room # send information about new publishers publishers = [] for publisher in event.plugindata.data.publishers: # type: janus.VideoroomPublisher try: publisher_session = room[publisher.id] except KeyError: self.log.warning('could not find matching session for publisher {publisher.id} during publishers event'.format(publisher=publisher)) continue publishers.append(dict(id=publisher_session.id, uri=publisher_session.account.id, display_name=publisher.display or '')) self.send(sylkrtc.VideoroomPublishersJoinedEvent(session=videoroom_session.id, publishers=publishers)) def _EH_janus_videoroom_event_leaving(self, event): # this is a publisher publisher_id = event.plugindata.data.leaving # publisher_id == 'ok' when the event is about ourselves leaving the room, else the publisher's janus ID try: base_session = self.videoroom_sessions[event.sender] except KeyError: if publisher_id != 'ok': self.log.warning('could not find video room session with handle ID {event.sender} for leaving event'.format(event=event)) return if publisher_id == 'ok': self.log.info('left video room {session.room.uri} with session {session.id}'.format(session=base_session)) self._cleanup_videoroom_session(base_session) return try: publisher_session = base_session.feeds.pop(publisher_id) except KeyError: return self.send(sylkrtc.VideoroomPublishersLeftEvent(session=base_session.id, publishers=[publisher_session.id])) def _EH_janus_videoroom_event_left(self, event): # this is a subscriber try: videoroom_session = self.videoroom_sessions[event.sender] except KeyError: pass else: self._cleanup_videoroom_session(videoroom_session) def _EH_janus_videoroom_event_configured(self, event): pass def _EH_janus_videoroom_event_started(self, event): pass def _EH_janus_videoroom_event_unpublished(self, event): pass diff --git a/sylk/applications/webrtcgateway/models/sylkrtc.py b/sylk/applications/webrtcgateway/models/sylkrtc.py index df4f3f5..8e78d4b 100644 --- a/sylk/applications/webrtcgateway/models/sylkrtc.py +++ b/sylk/applications/webrtcgateway/models/sylkrtc.py @@ -1,341 +1,357 @@ from application.python import subclasses from .jsonobjects import BooleanProperty, IntegerProperty, StringProperty, ArrayProperty, ObjectProperty, FixedValueProperty from .jsonobjects import JSONObject, JSONArray, StringArray, CompositeValidator from .validators import AORValidator, DisplayNameValidator, LengthValidator, UniqueItemsValidator # Base models (these are abstract and should not be used directly) class SylkRTCRequestBase(JSONObject): transaction = StringProperty() class SylkRTCResponseBase(JSONObject): transaction = StringProperty() class AccountRequestBase(SylkRTCRequestBase): account = StringProperty(validator=AORValidator()) class SessionRequestBase(SylkRTCRequestBase): session = StringProperty() class VideoroomRequestBase(SylkRTCRequestBase): session = StringProperty() class AccountEventBase(JSONObject): sylkrtc = FixedValueProperty('account-event') account = StringProperty(validator=AORValidator()) class SessionEventBase(JSONObject): sylkrtc = FixedValueProperty('session-event') session = StringProperty() class VideoroomEventBase(JSONObject): sylkrtc = FixedValueProperty('videoroom-event') session = StringProperty() class AccountRegistrationStateEvent(AccountEventBase): event = FixedValueProperty('registration-state') class SessionStateEvent(SessionEventBase): event = FixedValueProperty('state') class VideoroomSessionStateEvent(VideoroomEventBase): event = FixedValueProperty('session-state') # Miscellaneous models class SIPIdentity(JSONObject): uri = StringProperty(validator=AORValidator()) display_name = StringProperty(optional=True, validator=DisplayNameValidator()) class ICECandidate(JSONObject): candidate = StringProperty() sdpMLineIndex = IntegerProperty() sdpMid = StringProperty() class ICECandidates(JSONArray): item_type = ICECandidate class AORList(StringArray): list_validator = UniqueItemsValidator() item_validator = AORValidator() class VideoroomPublisher(JSONObject): id = StringProperty() uri = StringProperty(validator=AORValidator()) display_name = StringProperty(optional=True) class VideoroomPublishers(JSONArray): item_type = VideoroomPublisher class VideoroomActiveParticipants(StringArray): list_validator = CompositeValidator(UniqueItemsValidator(), LengthValidator(maximum=2)) class VideoroomSessionOptions(JSONObject): audio = BooleanProperty(optional=True) video = BooleanProperty(optional=True) bitrate = IntegerProperty(optional=True) +class SharedFile(JSONObject): + filename = StringProperty() + filesize = IntegerProperty() + uploader = ObjectProperty(SIPIdentity) # type: SIPIdentity + session = StringProperty() + + +class SharedFiles(JSONArray): + item_type = SharedFile + + # Response models class AckResponse(SylkRTCResponseBase): sylkrtc = FixedValueProperty('ack') class ErrorResponse(SylkRTCResponseBase): sylkrtc = FixedValueProperty('error') error = StringProperty() # Connection events class ReadyEvent(JSONObject): sylkrtc = FixedValueProperty('ready-event') # Account events class AccountIncomingSessionEvent(AccountEventBase): event = FixedValueProperty('incoming-session') session = StringProperty() originator = ObjectProperty(SIPIdentity) # type: SIPIdentity sdp = StringProperty() class AccountMissedSessionEvent(AccountEventBase): event = FixedValueProperty('missed-session') originator = ObjectProperty(SIPIdentity) # type: SIPIdentity class AccountConferenceInviteEvent(AccountEventBase): event = FixedValueProperty('conference-invite') room = StringProperty(validator=AORValidator()) originator = ObjectProperty(SIPIdentity) # type: SIPIdentity class AccountRegisteringEvent(AccountRegistrationStateEvent): state = FixedValueProperty('registering') class AccountRegisteredEvent(AccountRegistrationStateEvent): state = FixedValueProperty('registered') class AccountRegistrationFailedEvent(AccountRegistrationStateEvent): state = FixedValueProperty('failed') reason = StringProperty(optional=True) # Session events class SessionProgressEvent(SessionStateEvent): state = FixedValueProperty('progress') class SessionAcceptedEvent(SessionStateEvent): state = FixedValueProperty('accepted') sdp = StringProperty(optional=True) # missing for incoming sessions class SessionEstablishedEvent(SessionStateEvent): state = FixedValueProperty('established') class SessionTerminatedEvent(SessionStateEvent): state = FixedValueProperty('terminated') reason = StringProperty(optional=True) # Video room events class VideoroomConfigureEvent(VideoroomEventBase): event = FixedValueProperty('configure') originator = StringProperty() active_participants = ArrayProperty(VideoroomActiveParticipants) # type: VideoroomActiveParticipants class VideoroomSessionProgressEvent(VideoroomSessionStateEvent): state = FixedValueProperty('progress') class VideoroomSessionAcceptedEvent(VideoroomSessionStateEvent): state = FixedValueProperty('accepted') sdp = StringProperty() class VideoroomSessionEstablishedEvent(VideoroomSessionStateEvent): state = FixedValueProperty('established') class VideoroomSessionTerminatedEvent(VideoroomSessionStateEvent): state = FixedValueProperty('terminated') reason = StringProperty(optional=True) class VideoroomFeedAttachedEvent(VideoroomEventBase): event = FixedValueProperty('feed-attached') feed = StringProperty() sdp = StringProperty() class VideoroomFeedEstablishedEvent(VideoroomEventBase): event = FixedValueProperty('feed-established') feed = StringProperty() class VideoroomInitialPublishersEvent(VideoroomEventBase): event = FixedValueProperty('initial-publishers') publishers = ArrayProperty(VideoroomPublishers) # type: VideoroomPublishers class VideoroomPublishersJoinedEvent(VideoroomEventBase): event = FixedValueProperty('publishers-joined') publishers = ArrayProperty(VideoroomPublishers) # type: VideoroomPublishers class VideoroomPublishersLeftEvent(VideoroomEventBase): event = FixedValueProperty('publishers-left') publishers = ArrayProperty(StringArray) # type: StringArray +class VideoroomFileSharingEvent(VideoroomEventBase): + event = FixedValueProperty('file-sharing') + files = ArrayProperty(SharedFiles) + + # Account request models class AccountAddRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-add') password = StringProperty(validator=LengthValidator(minimum=1, maximum=9999)) display_name = StringProperty(optional=True) user_agent = StringProperty(optional=True) class AccountRemoveRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-remove') class AccountRegisterRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-register') class AccountUnregisterRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-unregister') class AccountDeviceTokenRequest(AccountRequestBase): sylkrtc = FixedValueProperty('account-devicetoken') old_token = StringProperty(optional=True) new_token = StringProperty(optional=True) # Session request models class SessionCreateRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-create') account = StringProperty(validator=AORValidator()) uri = StringProperty(validator=AORValidator()) sdp = StringProperty() class SessionAnswerRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-answer') sdp = StringProperty() class SessionTrickleRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-trickle') candidates = ArrayProperty(ICECandidates) # type: ICECandidates class SessionTerminateRequest(SessionRequestBase): sylkrtc = FixedValueProperty('session-terminate') # Videoroom request models class VideoroomJoinRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-join') account = StringProperty(validator=AORValidator()) uri = StringProperty(validator=AORValidator()) sdp = StringProperty() class VideoroomLeaveRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-leave') class VideoroomConfigureRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-configure') active_participants = ArrayProperty(VideoroomActiveParticipants) # type: VideoroomActiveParticipants class VideoroomFeedAttachRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-feed-attach') publisher = StringProperty() feed = StringProperty() class VideoroomFeedAnswerRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-feed-answer') feed = StringProperty() sdp = StringProperty() class VideoroomFeedDetachRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-feed-detach') feed = StringProperty() class VideoroomInviteRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-invite') participants = ArrayProperty(AORList) # type: AORList class VideoroomSessionTrickleRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-session-trickle') candidates = ArrayProperty(ICECandidates) # type: ICECandidates class VideoroomSessionUpdateRequest(VideoroomRequestBase): sylkrtc = FixedValueProperty('videoroom-session-update') options = ObjectProperty(VideoroomSessionOptions) # type: VideoroomSessionOptions # SylkRTC request to model mapping class ProtocolError(Exception): pass class SylkRTCRequest(object): __classmap__ = {cls.sylkrtc.value: cls for cls in subclasses(SylkRTCRequestBase) if hasattr(cls, 'sylkrtc')} @classmethod def from_message(cls, message): try: request_type = message['sylkrtc'] except KeyError: raise ProtocolError('could not get WebSocket message type') try: request_class = cls.__classmap__[request_type] except KeyError: raise ProtocolError('unknown WebSocket request: %s' % request_type) return request_class(**message) diff --git a/sylk/applications/webrtcgateway/web.py b/sylk/applications/webrtcgateway/web.py index 4895e61..3946ffc 100644 --- a/sylk/applications/webrtcgateway/web.py +++ b/sylk/applications/webrtcgateway/web.py @@ -1,173 +1,228 @@ import json from application.python.types import Singleton from autobahn.twisted.resource import WebSocketResource -from twisted.internet import reactor +from twisted.internet import defer, reactor +from twisted.python.failure import Failure from twisted.web.server import Site +from werkzeug.exceptions import Forbidden, NotFound +from werkzeug.utils import secure_filename from sylk import __version__ as sylk_version from sylk.resources import Resources -from sylk.web import Klein, StaticFileResource, server +from sylk.web import File, Klein, StaticFileResource, server from . import push from .configuration import GeneralConfig, JanusConfig from .factory import SylkWebSocketServerFactory from .janus import JanusBackend from .logger import log +from .models import sylkrtc from .protocol import SYLK_WS_PROTOCOL from .storage import TokenStorage __all__ = 'WebHandler', 'AdminWebHandler' +class FileUploadRequest(object): + def __init__(self, shared_file, content): + self.deferred = defer.Deferred() + self.shared_file = shared_file + self.content = content + self.had_error = False + + class WebRTCGatewayWeb(object): __metaclass__ = Singleton app = Klein() def __init__(self, ws_factory): self._resource = self.app.resource() self._ws_resource = WebSocketResource(ws_factory) + self._ws_factory = ws_factory @property def resource(self): return self._resource @app.route('/', branch=True) def index(self, request): return StaticFileResource(Resources.get('html/webrtcgateway/')) @app.route('/ws') def ws(self, request): return self._ws_resource + @app.route('/filesharing///', methods=['OPTIONS', 'POST', 'GET']) + def filesharing(self, request, conference, session_id, filename): + conference_uri = conference.lower() + if conference_uri in self._ws_factory.videorooms: + videoroom = self._ws_factory.videorooms[conference_uri] + if session_id in videoroom: + request.setHeader('Access-Control-Allow-Origin', '*') + request.setHeader('Access-Control-Allow-Headers', 'content-type') + method = request.method.upper() + session = videoroom[session_id] + if method == 'POST': + def log_result(result): + if isinstance(result, Failure): + videoroom.log.warning('{file.uploader.uri} failed to upload {file.filename}: {error}'.format(file=upload_request.shared_file, error=result.value)) + else: + videoroom.log.info('{file.uploader.uri} has uploaded {file.filename}'.format(file=upload_request.shared_file)) + return result + + filename = secure_filename(filename) + filesize = int(request.getHeader('Content-Length')) + shared_file = sylkrtc.SharedFile(filename=filename, filesize=filesize, uploader=dict(uri=session.account.id, display_name=session.account.display_name), session=session_id) + session.owner.log.info('wants to upload file {filename} to video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id)) + upload_request = FileUploadRequest(shared_file, request.content) + videoroom.add_file(upload_request) + upload_request.deferred.addBoth(log_result) + return upload_request.deferred + elif method == 'GET': + filename = secure_filename(filename) + session.owner.log.info('wants to download file {filename} from video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id)) + try: + path = videoroom.get_file(filename) + except LookupError as e: + videoroom.log.warning('{session.account.id} failed to download {filename}: {error}'.format(session=session, filename=filename, error=e)) + raise NotFound() + else: + videoroom.log.info('{session.account.id} is downloading {filename}'.format(session=session, filename=filename)) + request.setHeader('Content-Disposition', 'attachment;filename=%s' % filename) + return File(path) + else: + return 'OK' + raise Forbidden() + class WebHandler(object): def __init__(self): self.backend = None self.factory = None self.resource = None self.web = None def start(self): ws_url = 'ws' + server.url[4:] + '/webrtcgateway/ws' self.factory = SylkWebSocketServerFactory(ws_url, protocols=[SYLK_WS_PROTOCOL], server='SylkServer/%s' % sylk_version) self.factory.setProtocolOptions(allowedOrigins=GeneralConfig.web_origins, allowNullOrigin=GeneralConfig.web_origins == ['*'], autoPingInterval=GeneralConfig.websocket_ping_interval, autoPingTimeout=GeneralConfig.websocket_ping_interval/2) self.web = WebRTCGatewayWeb(self.factory) server.register_resource('webrtcgateway', self.web.resource) log.info('WebSocket handler started at %s' % ws_url) log.info('Allowed web origins: %s' % ', '.join(GeneralConfig.web_origins)) log.info('Allowed SIP domains: %s' % ', '.join(GeneralConfig.sip_domains)) log.info('Using Janus API: %s' % JanusConfig.api_url) self.backend = JanusBackend() self.backend.start() def stop(self): if self.factory is not None: for conn in self.factory.connections.copy(): conn.dropConnection(abort=True) self.factory = None if self.backend is not None: self.backend.stop() self.backend = None # TODO: This implementation is a prototype. Moving forward it probably makes sense to provide admin API # capabilities for other applications too. This could be done in a number of ways: # # * On the main web server, under a /admin/ parent route. # * On a separate web server, which could listen on a different IP and port. # # In either case, HTTPS aside, a token based authentication mechanism would be desired. # Which one is best is not 100% clear at this point. class AuthError(Exception): pass class AdminWebHandler(object): __metaclass__ = Singleton app = Klein() def __init__(self): self.listener = None def start(self): host, port = GeneralConfig.http_management_interface # noinspection PyUnresolvedReferences self.listener = reactor.listenTCP(port, Site(self.app.resource()), interface=host) log.info('Admin web handler started at http://%s:%d' % (host, port)) def stop(self): if self.listener is not None: self.listener.stopListening() self.listener = None # Admin web API def _check_auth(self, request): auth_secret = GeneralConfig.http_management_auth_secret if auth_secret: auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None) if not auth_headers or auth_headers[0] != auth_secret: raise AuthError() @app.handle_errors(AuthError) def auth_error(self, request, failure): request.setResponseCode(403) return 'Authentication error' @app.route('/incoming_call', methods=['POST']) def incoming_session(self, request): self._check_auth(request) request.setHeader('Content-Type', 'application/json') try: data = json.load(request.content) originator = data['originator'] destination = data['destination'] except Exception as e: return json.dumps({'success': False, 'error': str(e)}) else: push.incoming_call(originator, destination) return json.dumps({'success': True}) @app.route('/missed_call', methods=['POST']) def missed_session(self, request): self._check_auth(request) request.setHeader('Content-Type', 'application/json') try: data = json.load(request.content) originator = data['originator'] destination = data['destination'] except Exception as e: return json.dumps({'success': False, 'error': str(e)}) else: push.missed_call(originator, destination) return json.dumps({'success': True}) @app.route('/tokens/') def get_tokens(self, request, account): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() tokens = storage[account] return json.dumps({'tokens': list(tokens)}) @app.route('/tokens//', methods=['POST', 'DELETE']) def process_token(self, request, account, token): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() if request.method == 'POST': storage.add(account, token) elif request.method == 'DELETE': storage.remove(account, token) return json.dumps({'success': True})