diff --git a/debian/control b/debian/control index 98f35fa..9feab95 100644 --- a/debian/control +++ b/debian/control @@ -1,48 +1,49 @@ Source: sylkserver Section: net Priority: optional Maintainer: Dan Pascu Uploaders: Adrian Georgescu Build-Depends: debhelper (>= 11), dh-python, python-all (>= 2.7), rename Standards-Version: 4.5.0 Package: sylkserver Architecture: all Depends: ${python:Depends}, ${misc:Depends}, lsb-base, python-application (>= 2.8.0), python-autobahn, python-eventlib, python-klein, python-lxml, python-sipsimple (>= 3.5.0), python-systemd, python-twisted, python-typing Suggests: libavahi-compat-libdnssd1, python-wokkel, sylkserver-webrtc-gateway Recommends: sylkserver-sounds Description: Extensible real-time-communications application server SylkServer is a SIP applications server that provides applications like echo, playback and conference, as well as act as a gateway between SIP and IRC, XMPP and WEBRTC. Package: sylkserver-sounds Architecture: all Depends: ${misc:Depends} Description: Extensible real-time-communications application server sounds SylkServer is a SIP applications server that provides applications like echo, playback and conference, as well as act as a gateway between SIP and IRC, XMPP and WEBRTC. . This package contains sounds used by SylkServer. Package: sylkserver-webrtc-gateway Architecture: all Depends: ${misc:Depends}, sylkserver, janus +Recommends: python-cassandra (>=3.7.1-2.1) Description: Extensible real-time-communications application server WebRTC gateway SylkServer is a SIP applications server that provides applications like echo, playback and conference, as well as act as a gateway between SIP and IRC, XMPP and WEBRTC. . This is a meta-package containing the dependencies required to run the WebRTC gateway application. diff --git a/sylk/applications/webrtcgateway/configuration.py b/sylk/applications/webrtcgateway/configuration.py index 7dcf6dd..bba0999 100644 --- a/sylk/applications/webrtcgateway/configuration.py +++ b/sylk/applications/webrtcgateway/configuration.py @@ -1,156 +1,164 @@ import os import re from application.configuration import ConfigFile, ConfigSection, ConfigSetting -from application.configuration.datatypes import NetworkAddress, StringList +from application.configuration.datatypes import NetworkAddress, StringList, HostnameList from sylk.configuration import ServerConfig from sylk.configuration.datatypes import Path, SIPProxyAddress, VideoBitrate, VideoCodec __all__ = 'GeneralConfig', 'JanusConfig', 'get_room_config' # Datatypes class AccessPolicyValue(str): allowed_values = ('allow,deny', 'deny,allow') def __new__(cls, value): value = re.sub('\s', '', value) if value not in cls.allowed_values: raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values)) return str.__new__(cls, value) class Domain(str): domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$") def __new__(cls, value): value = str(value) if not cls.domain_re.match(value): raise ValueError("illegal domain: %s" % value) return str.__new__(cls, value) class SIPAddress(str): def __new__(cls, address): address = str(address) address = address.replace('@', '%40', address.count('@')-1) try: username, domain = address.split('@') Domain(domain) except ValueError: raise ValueError("illegal SIP address: %s, must be in user@domain format" % address) return str.__new__(cls, address) class PolicyItem(object): def __new__(cls, item): lowercase_item = item.lower() if lowercase_item in ('none', ''): return 'none' elif lowercase_item in ('any', 'all', '*'): return 'all' elif '@' in item: return SIPAddress(item) else: return Domain(item) class PolicySettingValue(object): def __init__(self, value): if isinstance(value, (tuple, list)): items = [str(x) for x in value] elif isinstance(value, str): items = re.split(r'\s*,\s*', value) else: raise TypeError("value must be a string, list or tuple") self.items = {PolicyItem(item) for item in items} self.items.discard('none') def __repr__(self): return '{0.__class__.__name__}({1})'.format(self, sorted(self.items)) def match(self, uri): if 'all' in self.items: return True elif not self.items: return False uri = re.sub('^(sip:|sips:)', '', str(uri)) domain = uri.split('@')[-1] return uri in self.items or domain in self.items class ManagementInterfaceAddress(NetworkAddress): default_port = 20888 # Configuration objects class GeneralConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'General' web_origins = ConfigSetting(type=StringList, value=['*']) sip_domains = ConfigSetting(type=StringList, value=['*']) outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None) trace_client = False websocket_ping_interval = 120 recording_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'recordings'))) filesharing_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'files'))) http_management_interface = ConfigSetting(type=ManagementInterfaceAddress, value=ManagementInterfaceAddress('127.0.0.1')) http_management_auth_secret = ConfigSetting(type=str, value=None) sylk_push_url = ConfigSetting(type=str, value=None) class JanusConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'Janus' api_url = 'ws://127.0.0.1:8188' api_secret = '0745f2f74f34451c89343afcdcae5809' trace_janus = False max_bitrate = ConfigSetting(type=VideoBitrate, value=VideoBitrate(2016000)) # ~2 MBits/s video_codec = ConfigSetting(type=VideoCodec, value=VideoCodec('vp9')) +class CassandraConfig(ConfigSection): + __cfgfile__ = 'webrtcgateway.ini' + __section__ = 'Cassandra' + + cluster_contact_points = ConfigSetting(type=HostnameList, value=None) + keyspace = ConfigSetting(type=str, value='') + + class RoomConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' record = False access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny')) allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all')) deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none')) max_bitrate = ConfigSetting(type=VideoBitrate, value=JanusConfig.max_bitrate) video_codec = ConfigSetting(type=VideoCodec, value=JanusConfig.video_codec) class VideoroomConfiguration(object): video_codec = 'vp9' max_bitrate = 2016000 record = False recording_dir = None filesharing_dir = None def __init__(self, data): self.__dict__.update(data) @property def janus_data(self): return dict(videocodec=self.video_codec, bitrate=self.max_bitrate, record=self.record, rec_dir=self.recording_dir) def get_room_config(room): config_file = ConfigFile(RoomConfig.__cfgfile__) section = config_file.get_section(room) if section is not None: RoomConfig.read(section=room) config = VideoroomConfiguration(dict(RoomConfig)) RoomConfig.reset() else: config = VideoroomConfiguration(dict(RoomConfig)) # use room defaults config.recording_dir = os.path.join(GeneralConfig.recording_dir, room) config.filesharing_dir = os.path.join(GeneralConfig.filesharing_dir, room) return config diff --git a/sylk/applications/webrtcgateway/push.py b/sylk/applications/webrtcgateway/push.py index cb49d2a..3646940 100644 --- a/sylk/applications/webrtcgateway/push.py +++ b/sylk/applications/webrtcgateway/push.py @@ -1,72 +1,78 @@ import json from twisted.internet import defer, reactor from twisted.web.client import Agent from twisted.web.iweb import IBodyProducer from twisted.web.http_headers import Headers from zope.interface import implementer from .configuration import GeneralConfig from .logger import log from .models import sylkpush from .storage import TokenStorage __all__ = 'conference_invite' agent = Agent(reactor) headers = Headers({'User-Agent': ['SylkServer'], 'Content-Type': ['application/json']}) @implementer(IBodyProducer) class StringProducer(object): def __init__(self, data): self.body = data self.length = len(data) def startProducing(self, consumer): consumer.write(self.body) return defer.succeed(None) def pauseProducing(self): pass def stopProducing(self): pass +def _construct_and_send(result, request): + for device_id, push_parameters in result.iteritems(): + try: + request.token = push_parameters['token'].split('#')[1] + except IndexError: + request.token = push_parameters['token'] + request.app_id = push_parameters['app'] + request.platform = push_parameters['platform'] + request.device_id = device_id + _send_push_notification(json.dumps(request.__data__)) def conference_invite(originator, destination, room, call_id): tokens = TokenStorage() request = sylkpush.ConferenceInviteEvent(token='dummy', app_id='dummy', platform='dummy', device_id='dummy', originator=originator.uri, from_display_name=originator.display_name, to=room, call_id=str(call_id)) - if isinstance(tokens[destination], set): + user_tokens = tokens[destination] + if isinstance(user_tokens, set): return else: - for device_id, push_parameters in tokens[destination].iteritems(): - try: - request.token = push_parameters['token'].split('#')[1] - except IndexError: - request.token = push_parameters['token'] - request.app_id = push_parameters['app'] - request.platform = push_parameters['platform'] - request.device_id = device_id - _send_push_notification(json.dumps(request.__data__)) + if isinstance(user_tokens, defer.Deferred): + user_tokens.addCallback(lambda result: _construct_and_send(result, request)) + else: + _construct_and_send(user_tokens, request) @defer.inlineCallbacks def _send_push_notification(payload): if GeneralConfig.sylk_push_url: try: r = yield agent.request('POST', GeneralConfig.sylk_push_url, headers, StringProducer(payload)) except Exception as e: log.info('Error sending push notification: %s', e) else: if r.code != 200: log.warning('Error sending push notification: %s', r.phrase) else: log.debug('Sent push notification: %s', payload) else: log.warning('Cannot send push notification: no Firebase server key configured') diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py index 6835d5f..24b888a 100644 --- a/sylk/applications/webrtcgateway/storage.py +++ b/sylk/applications/webrtcgateway/storage.py @@ -1,67 +1,130 @@ import cPickle as pickle import os from application.python.types import Singleton from collections import defaultdict from sipsimple.threading import run_in_thread +from twisted.internet import defer from sylk.configuration import ServerConfig +from .configuration import CassandraConfig __all__ = 'TokenStorage', -# TODO: This implementation is a prototype. It should be refactored to store tokens in a -# distributed DB so other SylkServer instances can access them. Also add some metadata -# like the modification date so we know when a token was refreshed, and thus it's ok to -# scrap it after a reasonable amount of time. +# TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed, +# and thus it's ok to scrap it after a reasonable amount of time. +CASSANDRA_MODULES_AVAILABLE = False +try: + from cassandra.cqlengine import columns, connection +except ImportError: + pass +else: + try: + from cassandra.cqlengine.models import Model + except ImportError: + pass + else: + CASSANDRA_MODULES_AVAILABLE = True + class PushTokens(Model): + username = columns.Text(partition_key=True) + domain = columns.Text(partition_key=True) + device_id = columns.Text(primary_key=True) + app = columns.Text() + device_token = columns.Text() + platform = columns.Text() + silent = columns.Text() + user_agent = columns.Text(required=False) -class TokenStorage(object): - __metaclass__ = Singleton +class FileStorage(object): def __init__(self): self._tokens = defaultdict() @run_in_thread('file-io') def _save(self): with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) @run_in_thread('file-io') def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params): data = { 'token': contact_params['pn_tok'], 'platform': contact_params['pn_type'], 'silent': contact_params['pn_silent'], 'app': contact_params['pn_app'] } if account in self._tokens: if isinstance(self._tokens[account], set): self._tokens[account] = {} self._tokens[account][contact_params['pn_device']] = data else: self._tokens[account] = {contact_params['pn_device']: data} self._save() def remove(self, account, device_id): try: del self._tokens[account][device_id] - except KeyError as e: + except KeyError: pass self._save() + + +class CassandraStorage(object): + @run_in_thread('cassandra') + def load(self): + connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, protocol_version=4) + + def __getitem__(self, key): + deferred = defer.Deferred() + + @run_in_thread('cassandra') + def query_tokens(key): + username, domain = key.split('@', 1) + tokens = {} + for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): + tokens[device.device_id] = {'token': device.device_token, 'platform': device.platform, + 'silent': device.silent, 'app': device.app} + deferred.callback(tokens) + return tokens + query_tokens(key) + return deferred + + @run_in_thread('cassandra') + def add(self, account, contact_params): + username, domain = account.split('@', 1) + PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'], + device_token=contact_params['pn_tok'], platform=contact_params['pn_type'], + silent=contact_params['pn_silent'], app=contact_params['pn_app']) + + @run_in_thread('cassandra') + def remove(self, account, device_id): + username, domain = account.split('@', 1) + PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id).if_exists().delete() + + +class TokenStorage(object): + __metaclass__ = Singleton + + def __new__(self): + if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: + return CassandraStorage() + else: + return FileStorage() diff --git a/sylk/applications/webrtcgateway/web.py b/sylk/applications/webrtcgateway/web.py index fd9a3e6..6d9e734 100644 --- a/sylk/applications/webrtcgateway/web.py +++ b/sylk/applications/webrtcgateway/web.py @@ -1,198 +1,201 @@ import json from application.python.types import Singleton from autobahn.twisted.resource import WebSocketResource from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.web.server import Site from werkzeug.exceptions import Forbidden, NotFound from werkzeug.utils import secure_filename from sylk import __version__ as sylk_version from sylk.resources import Resources from sylk.web import File, Klein, StaticFileResource, server from . import push from .configuration import GeneralConfig, JanusConfig from .factory import SylkWebSocketServerFactory from .janus import JanusBackend from .logger import log from .models import sylkrtc from .protocol import SYLK_WS_PROTOCOL from .storage import TokenStorage __all__ = 'WebHandler', 'AdminWebHandler' class FileUploadRequest(object): def __init__(self, shared_file, content): self.deferred = defer.Deferred() self.shared_file = shared_file self.content = content self.had_error = False class WebRTCGatewayWeb(object): __metaclass__ = Singleton app = Klein() def __init__(self, ws_factory): self._resource = self.app.resource() self._ws_resource = WebSocketResource(ws_factory) self._ws_factory = ws_factory @property def resource(self): return self._resource @app.route('/', branch=True) def index(self, request): return StaticFileResource(Resources.get('html/webrtcgateway/')) @app.route('/ws') def ws(self, request): return self._ws_resource @app.route('/filesharing///', methods=['OPTIONS', 'POST', 'GET']) def filesharing(self, request, conference, session_id, filename): conference_uri = conference.lower() if conference_uri in self._ws_factory.videorooms: videoroom = self._ws_factory.videorooms[conference_uri] if session_id in videoroom: request.setHeader('Access-Control-Allow-Origin', '*') request.setHeader('Access-Control-Allow-Headers', 'content-type') method = request.method.upper() session = videoroom[session_id] if method == 'POST': def log_result(result): if isinstance(result, Failure): videoroom.log.warning('{file.uploader.uri} failed to upload {file.filename}: {error}'.format(file=upload_request.shared_file, error=result.value)) else: videoroom.log.info('{file.uploader.uri} has uploaded {file.filename}'.format(file=upload_request.shared_file)) return result filename = secure_filename(filename) filesize = int(request.getHeader('Content-Length')) shared_file = sylkrtc.SharedFile(filename=filename, filesize=filesize, uploader=dict(uri=session.account.id, display_name=session.account.display_name), session=session_id) session.owner.log.info('wants to upload file {filename} to video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id)) upload_request = FileUploadRequest(shared_file, request.content) videoroom.add_file(upload_request) upload_request.deferred.addBoth(log_result) return upload_request.deferred elif method == 'GET': filename = secure_filename(filename) session.owner.log.info('wants to download file {filename} from video room {conference_uri} with session {session_id}'.format(filename=filename, conference_uri=conference_uri, session_id=session_id)) try: path = videoroom.get_file(filename) except LookupError as e: videoroom.log.warning('{session.account.id} failed to download {filename}: {error}'.format(session=session, filename=filename, error=e)) raise NotFound() else: videoroom.log.info('{session.account.id} is downloading {filename}'.format(session=session, filename=filename)) request.setHeader('Content-Disposition', 'attachment;filename=%s' % filename) return File(path) else: return 'OK' raise Forbidden() class WebHandler(object): def __init__(self): self.backend = None self.factory = None self.resource = None self.web = None def start(self): ws_url = 'ws' + server.url[4:] + '/webrtcgateway/ws' self.factory = SylkWebSocketServerFactory(ws_url, protocols=[SYLK_WS_PROTOCOL], server='SylkServer/%s' % sylk_version) self.factory.setProtocolOptions(allowedOrigins=GeneralConfig.web_origins, allowNullOrigin=GeneralConfig.web_origins == ['*'], autoPingInterval=GeneralConfig.websocket_ping_interval, autoPingTimeout=GeneralConfig.websocket_ping_interval/2) self.web = WebRTCGatewayWeb(self.factory) server.register_resource('webrtcgateway', self.web.resource) log.info('WebSocket handler started at %s' % ws_url) log.info('Allowed web origins: %s' % ', '.join(GeneralConfig.web_origins)) log.info('Allowed SIP domains: %s' % ', '.join(GeneralConfig.sip_domains)) log.info('Using Janus API: %s' % JanusConfig.api_url) self.backend = JanusBackend() self.backend.start() def stop(self): if self.factory is not None: for conn in self.factory.connections.copy(): conn.dropConnection(abort=True) self.factory = None if self.backend is not None: self.backend.stop() self.backend = None # TODO: This implementation is a prototype. Moving forward it probably makes sense to provide admin API # capabilities for other applications too. This could be done in a number of ways: # # * On the main web server, under a /admin/ parent route. # * On a separate web server, which could listen on a different IP and port. # # In either case, HTTPS aside, a token based authentication mechanism would be desired. # Which one is best is not 100% clear at this point. class AuthError(Exception): pass class AdminWebHandler(object): __metaclass__ = Singleton app = Klein() def __init__(self): self.listener = None def start(self): host, port = GeneralConfig.http_management_interface # noinspection PyUnresolvedReferences self.listener = reactor.listenTCP(port, Site(self.app.resource()), interface=host) log.info('Admin web handler started at http://%s:%d' % (host, port)) def stop(self): if self.listener is not None: self.listener.stopListening() self.listener = None # Admin web API def _check_auth(self, request): auth_secret = GeneralConfig.http_management_auth_secret if auth_secret: auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None) if not auth_headers or auth_headers[0] != auth_secret: raise AuthError() @app.handle_errors(AuthError) def auth_error(self, request, failure): request.setResponseCode(403) return 'Authentication error' @app.route('/tokens/') def get_tokens(self, request, account): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() tokens = storage[account] - return json.dumps({'tokens': tokens}) + if isinstance(tokens, defer.Deferred): + return tokens.addCallback(lambda result: json.dumps({'tokens': result})) + else: + return json.dumps({'tokens': tokens}) @app.route('/tokens//', methods=['DELETE']) def process_token(self, request, account, device_id): self._check_auth(request) request.setHeader('Content-Type', 'application/json') storage = TokenStorage() if request.method == 'DELETE': storage.remove(account, device_id) return json.dumps({'success': True}) diff --git a/webrtcgateway.ini.sample b/webrtcgateway.ini.sample index 3f3ad08..50a5f55 100644 --- a/webrtcgateway.ini.sample +++ b/webrtcgateway.ini.sample @@ -1,67 +1,74 @@ ; SylkServer WebRTC gateway configuration file ; ; For the gateway to work Janus needs to be properly installed and configured, ; please refer to README.webrtc for detailed instructions ; [General] ; List of allowed web origins. The connection origin (Origin header in the ; HTTP request) will be checked against the list defined here, if the domain ; is no allowed the connection will be refused. ; * (the default) means any ; web_origins = * ; Proxy used for outbound SIP traffic ; outbound_sip_proxy = ; List of allowed SIP domains for managing accounts ; sip_domains = * ; Boolean indicating if the WebSocket messages sent to/from clients should be ; logged to a file ; trace_client = False ; WebSocket Ping frames are sent at the configured interval, this helps detect ; dead client connections ; websocket_ping_interval = 120 ; IP and port for the HTTP management interface as IP[:PORT] ; http_management_interface = 127.0.0.1:20888 ; Shared secret for the HTTP management interface (Authorization: THE_KEY) ; http_management_auth_secret = ; Sylk-push URL to send conference push notification ; sylk_push_url = [Janus] ; URL pointing to the Janus API endpoint (only WebSocket is supported) ; api_url = ws://127.0.0.1:8188 ; API secret shared with Janus (must match the value in janus.cfg) ; A random UUID value is recommended, a new value can be generated with ; the following command: ; python -c 'import uuid; print(uuid.uuid4().hex)' api_secret = 0745f2f74f34451c89343afcdcae5809 ; Boolean indicating if the messages between SylkServer and Janus should be ; logged to a file ; trace_janus = False ; Maximum video bitrate allowed per sender in a room in bits/s. This value is ; applied to any room that doesn't define its own. The value is any integer ; number between 64000 and 4194304. Default value is 2016000 (~2Mb/s). ; max_bitrate = 2016000 ; The video codec to be used by all participants in a room. This value is ; applied to any room that doesn't define its own. ; Possible values are: h264, vp8 and vp9. Default is vp9. ; video_codec = vp9 +[Cassandra] +; Contact points to cassandra cluster +; cluster_contact_points = + +; Keyspace to use to retrieve tokens +; keyspace = + ; Per room configuration options ; [room1@videoconference.example.com] ; record = True ; access_policy = deny, allow ; deny = all ; allow = domain1.com, test1@example.com, test2@example.com ; max_bitrate = 512000 ; video_codec = h264