diff --git a/sylk/applications/webrtcgateway/configuration.py b/sylk/applications/webrtcgateway/configuration.py index 798a8cf..564caa2 100644 --- a/sylk/applications/webrtcgateway/configuration.py +++ b/sylk/applications/webrtcgateway/configuration.py @@ -1,210 +1,211 @@ import os import re from application.configuration import ConfigFile, ConfigSection, ConfigSetting from application.configuration.datatypes import NetworkAddress, StringList, HostnameList from sylk.configuration import ServerConfig from sylk.configuration.datatypes import Path, SIPProxyAddress, VideoBitrate, VideoCodec __all__ = 'GeneralConfig', 'JanusConfig', 'get_room_config', 'ExternalAuthConfig', 'get_auth_config' # Datatypes class AccessPolicyValue(str): allowed_values = ('allow,deny', 'deny,allow') def __new__(cls, value): value = re.sub('\s', '', value) if value not in cls.allowed_values: raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values)) return str.__new__(cls, value) class Domain(str): domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$") def __new__(cls, value): value = str(value) if not cls.domain_re.match(value): raise ValueError("illegal domain: %s" % value) return str.__new__(cls, value) class SIPAddress(str): def __new__(cls, address): address = str(address) address = address.replace('@', '%40', address.count('@')-1) try: username, domain = address.split('@') Domain(domain) except ValueError: raise ValueError("illegal SIP address: %s, must be in user@domain format" % address) return str.__new__(cls, address) class PolicyItem(object): def __new__(cls, item): lowercase_item = item.lower() if lowercase_item in ('none', ''): return 'none' elif lowercase_item in ('any', 'all', '*'): return 'all' elif '@' in item: return SIPAddress(item) else: return Domain(item) class PolicySettingValue(object): def __init__(self, value): if isinstance(value, (tuple, list)): items = [str(x) for x in value] elif isinstance(value, str): items = re.split(r'\s*,\s*', value) else: raise TypeError("value must be a string, list or tuple") self.items = {PolicyItem(item) for item in items} self.items.discard('none') def __repr__(self): return '{0.__class__.__name__}({1})'.format(self, sorted(self.items)) def match(self, uri): if 'all' in self.items: return True elif not self.items: return False uri = re.sub('^(sip:|sips:)', '', str(uri)) domain = uri.split('@')[-1] return uri in self.items or domain in self.items class ManagementInterfaceAddress(NetworkAddress): default_port = 20888 class AuthType(str): allowed_values = ('SIP', 'IMAP') def __new__(cls, value): value = re.sub('\s', '', value) if value not in cls.allowed_values: raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values)) return str.__new__(cls, value) # Configuration objects class GeneralConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'General' web_origins = ConfigSetting(type=StringList, value=['*']) sip_domains = ConfigSetting(type=StringList, value=['*']) outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None) trace_client = False websocket_ping_interval = 120 recording_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'recordings'))) filesharing_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'files'))) http_management_interface = ConfigSetting(type=ManagementInterfaceAddress, value=ManagementInterfaceAddress('127.0.0.1')) http_management_auth_secret = ConfigSetting(type=str, value=None) sylk_push_url = ConfigSetting(type=str, value=None) class JanusConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'Janus' api_url = 'ws://127.0.0.1:8188' api_secret = '0745f2f74f34451c89343afcdcae5809' trace_janus = False max_bitrate = ConfigSetting(type=VideoBitrate, value=VideoBitrate(2016000)) # ~2 MBits/s video_codec = ConfigSetting(type=VideoCodec, value=VideoCodec('vp9')) decline_code = 486 class CassandraConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'Cassandra' cluster_contact_points = ConfigSetting(type=HostnameList, value=None) keyspace = ConfigSetting(type=str, value='') + push_tokens_table = ConfigSetting(type=str, value='') class RoomConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' record = False access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny')) allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all')) deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none')) max_bitrate = ConfigSetting(type=VideoBitrate, value=JanusConfig.max_bitrate) video_codec = ConfigSetting(type=VideoCodec, value=JanusConfig.video_codec) class VideoroomConfiguration(object): video_codec = 'vp9' max_bitrate = 2016000 record = False recording_dir = None filesharing_dir = None def __init__(self, data): self.__dict__.update(data) @property def janus_data(self): return dict(videocodec=self.video_codec, bitrate=self.max_bitrate, record=self.record, rec_dir=self.recording_dir) def get_room_config(room): config_file = ConfigFile(RoomConfig.__cfgfile__) section = config_file.get_section(room) if section is not None: RoomConfig.read(section=room) config = VideoroomConfiguration(dict(RoomConfig)) RoomConfig.reset() else: config = VideoroomConfiguration(dict(RoomConfig)) # use room defaults config.recording_dir = os.path.join(GeneralConfig.recording_dir, room) config.filesharing_dir = os.path.join(GeneralConfig.filesharing_dir, room) return config class ExternalAuthConfig(ConfigSection): __cfgfile__ = 'auth.ini' __section__ = 'ExternalAuth' enable = False # this can't be per-server due to limitations in imaplib imap_ca_cert_file = ConfigSetting(type=str, value='/etc/ssl/certs/ca-certificates.crt') class AuthConfig(ConfigSection): __cfgfile__ = 'auth.ini' auth_type = ConfigSetting(type=AuthType, value=AuthType('SIP')) imap_server = ConfigSetting(type=str, value='') class AuthConfiguration(object): auth_type = AuthType('SIP') def __init__(self, data): self.__dict__.update(data) def get_auth_config(domain): config_file = ConfigFile(AuthConfig.__cfgfile__) section = config_file.get_section(domain) if section is not None: AuthConfig.read(section=domain) config = AuthConfiguration(dict(AuthConfig)) AuthConfig.reset() else: config = AuthConfiguration(dict(AuthConfig)) # use auth defaults return config diff --git a/sylk/applications/webrtcgateway/errors.py b/sylk/applications/webrtcgateway/errors.py new file mode 100644 index 0000000..0b5c637 --- /dev/null +++ b/sylk/applications/webrtcgateway/errors.py @@ -0,0 +1,2 @@ +class StorageError(Exception): + pass diff --git a/sylk/applications/webrtcgateway/models/storage/cassandra.py b/sylk/applications/webrtcgateway/models/storage/cassandra.py new file mode 100644 index 0000000..b25d1ee --- /dev/null +++ b/sylk/applications/webrtcgateway/models/storage/cassandra.py @@ -0,0 +1,16 @@ + +from cassandra.cqlengine import columns +from cassandra.cqlengine.models import Model + + +class PushTokens(Model): + __table_name__ = 'push_tokens' + username = columns.Text(partition_key=True) + domain = columns.Text(partition_key=True) + device_id = columns.Text(primary_key=True) + app_id = columns.Text(primary_key=True) + background_token = columns.Text(required=False) + device_token = columns.Text() + platform = columns.Text() + silent = columns.Text() + user_agent = columns.Text(required=False) diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py index 5b55617..dfa8022 100644 --- a/sylk/applications/webrtcgateway/storage.py +++ b/sylk/applications/webrtcgateway/storage.py @@ -1,170 +1,186 @@ import pickle as pickle import os from application.python.types import Singleton from collections import defaultdict from sipsimple.threading import run_in_thread from twisted.internet import defer from sylk.configuration import ServerConfig from .configuration import CassandraConfig +from .errors import StorageError +from .logger import log __all__ = 'TokenStorage', # TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed, # and thus it's ok to scrap it after a reasonable amount of time. CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True + from cassandra import InvalidRequest + from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException - class PushTokens(Model): - username = columns.Text(partition_key=True) - domain = columns.Text(partition_key=True) - device_id = columns.Text() - app = columns.Text() - background_token = columns.Text(required=False) - device_token = columns.Text(primary_key=True) - platform = columns.Text() - silent = columns.Text() - user_agent = columns.Text(required=False) + from cassandra.cluster import NoHostAvailable + from cassandra.policies import DCAwareRoundRobinPolicy + from .models.storage.cassandra import PushTokens + if CassandraConfig.push_tokens_table: + PushTokens.__table_name__ = CassandraConfig.push_tokens_table class FileTokenStorage(object): def __init__(self): self._tokens = defaultdict() @run_in_thread('file-io') def _save(self): with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) @run_in_thread('file-io') def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params, user_agent): try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: token = contact_params['pn_tok'] background_token = None data = { 'device_id': contact_params['pn_device'], 'platform': contact_params['pn_type'], 'silent': contact_params['pn_silent'], - 'app': contact_params['pn_app'], + 'app_id': contact_params['pn_app'], 'user_agent': user_agent, 'background_token': background_token } + key = f"{data['app_id']}-{data['device_id']}" if account in self._tokens: if isinstance(self._tokens[account], set): self._tokens[account] = {} # Remove old storage layout based on device id if contact_params['pn_device'] in self._tokens[account]: del self._tokens[account][contact_params['pn_device']] + # Remove old storage layout based on token + if token in self._tokens[account]: + del self._tokens[account][token] + # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: del self._tokens[account][contact_params['pn_tok']] except IndexError: pass - self._tokens[account][token] = data + self._tokens[account][key] = data else: - self._tokens[account] = {token: data} + self._tokens[account] = {key: data} self._save() - def remove(self, account, device_token): - try: - device_token = device_token.split('#')[0] - except IndexError: - pass + def remove(self, account, app_id, device_id): + key = f'{app_id}-{device_id}' try: - del self._tokens[account][device_token] + del self._tokens[account][key] except KeyError: pass self._save() +class CassandraConnection(object, metaclass=Singleton): + @run_in_thread('cassandra') + def __init__(self): + try: + self.session = connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4) + except NoHostAvailable: + self.log.error("Not able to connect to any of the Cassandra contact points") + raise StorageError + + class CassandraTokenStorage(object): @run_in_thread('cassandra') def load(self): - connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, protocol_version=4) + CassandraConnection(); def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(key): username, domain = key.split('@', 1) tokens = {} for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): - tokens[device.device_token] = {'device_id': device.device_id, 'platform': device.platform, - 'silent': device.silent, 'app': device.app} + tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.token, + 'platform': device.platform, 'silent': device.silent, + 'app_id': device.app_id, 'background_token': device.background_token} deferred.callback(tokens) return tokens query_tokens(key) return deferred @run_in_thread('cassandra') def add(self, account, contact_params, user_agent): username, domain = account.split('@', 1) try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: token = contact_params['pn_tok'] background_token = None # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_token == contact_params['pn_tok']).if_exists().delete() except LWTException: pass - PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'], - device_token=token, background_token=background_token, platform=contact_params['pn_type'], - silent=contact_params['pn_silent'], app=contact_params['pn_app'], user_agent=user_agent) + try: + PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'], + device_token=token, background_token=background_token, platform=contact_params['pn_type'], + silent=contact_params['pn_silent'], app_id=contact_params['pn_app'], user_agent=user_agent) + except (CQLEngineException, InvalidRequest) as e: + self.logger.error(f'Storing token failed: {e}') + raise StorageError @run_in_thread('cassandra') - def remove(self, account, device_token): + def remove(self, account, app_id, device_id): username, domain = account.split('@', 1) try: device_token = device_token.split('#')[0] except IndexError: pass try: - PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_token == device_token).if_exists().delete() + PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass class TokenStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraTokenStorage() else: return FileTokenStorage() diff --git a/webrtcgateway.ini.sample b/webrtcgateway.ini.sample index 574382c..051305d 100644 --- a/webrtcgateway.ini.sample +++ b/webrtcgateway.ini.sample @@ -1,80 +1,83 @@ ; SylkServer WebRTC gateway configuration file ; ; For the gateway to work Janus needs to be properly installed and configured, ; please refer to README.webrtc for detailed instructions ; [General] ; List of allowed web origins. The connection origin (Origin header in the ; HTTP request) will be checked against the list defined here, if the domain ; is no allowed the connection will be refused. ; * (the default) means any ; web_origins = * ; Proxy used for outbound SIP traffic ; outbound_sip_proxy = ; List of allowed SIP domains for managing accounts ; sip_domains = * ; Boolean indicating if the WebSocket messages sent to/from clients should be ; logged to a file ; trace_client = False ; WebSocket Ping frames are sent at the configured interval, this helps detect ; dead client connections ; websocket_ping_interval = 120 ; IP and port for the HTTP management interface as IP[:PORT] ; http_management_interface = 127.0.0.1:20888 ; Shared secret for the HTTP management interface (Authorization: THE_KEY) ; http_management_auth_secret = ; Sylk-push URL to send conference push notification ; sylk_push_url = [Janus] ; URL pointing to the Janus API endpoint (only WebSocket is supported) ; api_url = ws://127.0.0.1:8188 ; API secret shared with Janus (must match the value in janus.cfg) ; A random UUID value is recommended, a new value can be generated with ; the following command: ; python -c 'import uuid; print(uuid.uuid4().hex)' api_secret = 0745f2f74f34451c89343afcdcae5809 ; Boolean indicating if the messages between SylkServer and Janus should be ; logged to a file ; trace_janus = False ; Maximum video bitrate allowed per sender in a room in bits/s. This value is ; applied to any room that doesn't define its own. The value is any integer ; number between 64000 and 4194304. Default value is 2016000 (~2Mb/s). ; max_bitrate = 2016000 ; The video codec to be used by all participants in a room. This value is ; applied to any room that doesn't define its own. ; Possible values are: h264, vp8 and vp9. Default is vp9. ; video_codec = vp9 ; code used to decline the calls (usually, 486 busy, 603 busy everywhere). ; For 4XX codes, tipically a SIP Proxy will wait until other devices answer, ; for 6XX codes, tipically a SIP Proxy will end the call forking ; decline_code = 486 [Cassandra] ; Contact points to cassandra cluster ; cluster_contact_points = -; Keyspace to use to retrieve tokens +; Keyspace to use for storage ; keyspace = +; Table name for token storage +; push_tokens_table = push_tokens + ; Per room configuration options ; [room1@videoconference.example.com] ; record = True ; access_policy = deny, allow ; deny = all ; allow = domain1.com, test1@example.com, test2@example.com ; max_bitrate = 512000 ; video_codec = h264