diff --git a/pushserver/models/__init__.py b/pushserver/models/__init__.py index b03d22f..7e7342d 100644 --- a/pushserver/models/__init__.py +++ b/pushserver/models/__init__.py @@ -1 +1 @@ -__all__ = ['requests'] \ No newline at end of file +__all__ = ['cassandra', 'requests'] diff --git a/pushserver/models/cassandra.py b/pushserver/models/cassandra.py new file mode 100644 index 0000000..896a560 --- /dev/null +++ b/pushserver/models/cassandra.py @@ -0,0 +1,21 @@ +from cassandra.cqlengine import columns +from cassandra.cqlengine.models import Model + + +class PushTokens(Model): + __table_name__ = 'push_tokens_tester' + username = columns.Text(partition_key=True) + domain = columns.Text(partition_key=True) + device_id = columns.Text(primary_key=True) + app_id = columns.Text(primary_key=True) + background_token = columns.Text(required=False) + device_token = columns.Text() + platform = columns.Text() + silent = columns.Text() + user_agent = columns.Text(required=False) + + +class OpenSips(Model): + __table_name__ = 'mobile_devices' + opensipskey = columns.Text(primary_key=True) + opensipsval = columns.Text() diff --git a/pushserver/resources/__init__.py b/pushserver/resources/__init__.py index 95bf8af..2ca12fb 100644 --- a/pushserver/resources/__init__.py +++ b/pushserver/resources/__init__.py @@ -1 +1 @@ -__all__ = ['notification', 'pns', 'settings', 'utils'] +__all__ = ['notification', 'pns', 'settings', 'utils', 'storage'] diff --git a/pushserver/resources/storage/__init__.py b/pushserver/resources/storage/__init__.py new file mode 100644 index 0000000..5ec9efe --- /dev/null +++ b/pushserver/resources/storage/__init__.py @@ -0,0 +1 @@ +from .storage import TokenStorage diff --git a/pushserver/resources/storage/configuration.py b/pushserver/resources/storage/configuration.py new file mode 100644 index 0000000..59a9a17 --- /dev/null +++ b/pushserver/resources/storage/configuration.py @@ -0,0 +1,56 @@ +import os +import sys + +from application.configuration import ConfigSection, ConfigSetting +from application.configuration.datatypes import HostnameList +from application.python.descriptor import classproperty + + +__all__ = 'CassandraConfig', 'ServerConfig' + + +class Path(str): + def __new__(cls, path): + if path: + path = os.path.normpath(path) + return str.__new__(cls, path) + + @property + def normalized(self): + return os.path.expanduser(self) + + +class VarResources(object): + """Provide access to Sylk-Pushserver's resources that should go in /var""" + + _cached_directory = None + + @classproperty + def directory(cls): + if cls._cached_directory is None: + binary_directory = os.path.dirname(os.path.realpath(sys.argv[0])) + if os.path.basename(binary_directory) == 'bin': + path = '/var' + else: + path = 'var' + cls._cached_directory = os.path.abspath(path) + return cls._cached_directory + + @classmethod + def get(cls, resource): + return os.path.join(cls.directory, resource or u'') + + +class CassandraConfig(ConfigSection): + __cfgfile__ = 'general.ini' + __section__ = 'Cassandra' + + cluster_contact_points = ConfigSetting(type=HostnameList, value=None) + keyspace = ConfigSetting(type=str, value='') + + +class ServerConfig(ConfigSection): + __cfgfile__ = 'general.ini' + __section__ = 'server' + + spool_dir = ConfigSetting(type=Path, value=Path(VarResources.get('spool/sylk-pushserver'))) diff --git a/pushserver/resources/storage/storage.py b/pushserver/resources/storage/storage.py new file mode 100644 index 0000000..c8b905b --- /dev/null +++ b/pushserver/resources/storage/storage.py @@ -0,0 +1,147 @@ +import os +import _pickle as pickle + +from application.python.types import Singleton +from application.system import makedirs + +from collections import defaultdict + +from pushserver.resources import settings +from pushserver.resources.utils import log_event + +from .configuration import CassandraConfig, ServerConfig + + +__all__ = 'TokenStorage', + + +CASSANDRA_MODULES_AVAILABLE = False +try: + from cassandra.cqlengine import columns, connection +except ImportError: + pass +else: + try: + from cassandra.cqlengine.models import Model + except ImportError: + pass + else: + CASSANDRA_MODULES_AVAILABLE = True + from cassandra.cqlengine.query import LWTException + from cassandra.cluster import NoHostAvailable + from cassandra.io import asyncioreactor + from cassandra.policies import DCAwareRoundRobinPolicy + from pushserver.models.cassandra import PushTokens, OpenSips + + +class FileStorage(object): + def __init__(self): + self._tokens = defaultdict() + + def _save(self): + with open(os.path.join(ServerConfig.spool_dir.normalized, 'webrtc_device_tokens'), 'wb+') as f: + pickle.dump(self._tokens, f) + + def load(self): + try: + tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) + except Exception: + pass + else: + self._tokens.update(tokens) + + def __getitem__(self, key): + try: + return self._tokens[key] + except KeyError: + return {} + + def add(self, account, contact_params): + try: + (token, background_token) = contact_params.token.split('#') + except ValueError: + token = contact_params.token + background_token = None + + data = contact_params.__dict__ + data['token'] = token + data['background_token'] = background_token + + key = f'{contact_params.device_id}-{contact_params.app_id}' + if account in self._tokens: + self._tokens[account][key] = data + else: + self._tokens[account] = {key: data} + self._save() + + def remove(self, account, device): + try: + del self._tokens[account][device] + except KeyError: + pass + self._save() + + +class CassandraStorage(object): + def load(self): + try: + connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4, connection_class=asyncioreactor.AsyncioConnection) + except NoHostAvailable: + msg='Not able to connect to any of the Cassandra contact points' + log_event(loggers=settings.params.loggers, msg=msg, level='error') + + def __getitem__(self, key): + def query_tokens(key): + username, domain = key.split('@', 1) + tokens = {} + for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): + tokens[f'{device.device_id}-{device.app_id}'] = {'device_id': device.device_id, 'token': device.device_token, + 'platform': device.platform, 'app_id': device.app_id, + 'silent': bool(int(device.silent))} + return tokens + return query_tokens(key) + + def add(self, account, contact_params): + username, domain = account.split('@', 1) + try: + (token, background_token) = contact_params.token.split('#') + except ValueError: + token = contact_params.token + background_token = None + + PushTokens.create(username=username, domain=domain, device_id=contact_params.device_id, + device_token=token, background_token=background_token, platform=contact_params.platform, + silent=str(int(contact_params.silent is True)), app_id=contact_params.app_id, + user_agent=contact_params.user_agent) + OpenSips.create(opensipskey=account, opensipsval='1') + + def remove(self, account, device): + username, domain = account.split('@', 1) + device_id, app_id = device.split('-', 1) + print(device) + try: + PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() + except LWTException: + pass + else: + # We need to check for other device_ids/app_ids before we can remove the cache value for OpenSIPS + if not self[account]: + try: + OpenSips.objects(OpenSips.opensipskey == account).if_exists().delete() + except LWTException: + pass + +class TokenStorage(object, metaclass=Singleton): + + def __new__(self): + configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) + if configuration.files: + msg='Reading storage configuration from {}'.format(', '.join(configuration.files)) + log_event(loggers=settings.params.loggers, msg=msg, level='info') + makedirs(ServerConfig.spool_dir.normalized) + if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: + log_event(loggers=settings.params.loggers, msg='Using Cassandra for token storage', level='info') + return CassandraStorage() + else: + log_event(loggers=settings.params.loggers, msg='Using pickle file for token storage', level='info') + return FileStorage() diff --git a/sylk-pushserver b/sylk-pushserver index 435314a..6b6fd76 100755 --- a/sylk-pushserver +++ b/sylk-pushserver @@ -1,105 +1,110 @@ #!/usr/bin/python3 import argparse import datetime import logging import os import sys import uvicorn +from application.process import process + from pushserver.resources import settings from pushserver.resources.utils import (log_event, resources_available, ssl_cert, try_again) logging.getLogger("uvicorn").setLevel(logging.WARN) default_dir = '/etc/sylk-pushserver' parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help='Show this help message and exit.') parser.add_argument("--ip", default='', help="If set, server will run in its address") parser.add_argument("--port", default='', help="If set, server will run in its address") parser.add_argument("--config_dir", default=None, metavar='PATH', help="Specify a config directory that contains " "general.ini, applications.ini and " "the credentials directory, " "Default it uses '/etc/sylk-pushserver'") parser.add_argument("--debug", action="store_true", default=False, help="If set, log headers and body requests to log file.") args = parser.parse_args() logging.basicConfig(level=logging.INFO) logging.info(f'{datetime.datetime.now()} Starting Sylk Pushserver...') config_dir = default_dir if args.config_dir is not None: if not os.path.exists(f'{args.config_dir}'): logging.info(f'Specified config directory does not exist') sys.exit(1) config_dir = args.config_dir settings.init(config_dir, args.debug, args.ip, args.port) +# Since TokenStorage config relies on the config_dir it has to be imported here +process.configuration.local_directory = config_dir +from pushserver.resources.storage import TokenStorage if __name__ == '__main__': if not settings.params.dir['error'] or 'default' in settings.params.dir['error']: sock_available = False while not sock_available: host = settings.params.server['host'] port = int(settings.params.server['port']) tls_cert = settings.params.server['tls_cert'] sock_available = resources_available(host, port) if sock_available: if tls_cert: if ssl_cert(tls_cert): msg = f'Starting app over SSL...' print(msg) log_event(loggers=settings.params.loggers, msg=msg, level='info') uvicorn.run('pushserver.resources.app:app', host=host, port=port, ssl_certfile=tls_cert, acces_log=False, log_level='error') break else: msg = f'{tls_cert} is not a valid ssl cert, app will be run without it' print(msg) log_event(loggers=settings.params.loggers, msg=msg, level='deb') uvicorn.run('pushserver.resources.server:server', host=host, port=port, access_log=False, log_level='error') break else: uvicorn.run('pushserver.resources.server:server', host=host, port=port, access_log=False, log_level='error') break else: try_again(timer=30, host=host, port=port, start_error=settings.params.dir['error'], loggers=settings.params.loggers) else: log_event(loggers=settings.params.loggers, msg=settings.params.dir['error'], level='error') print(settings.params.dir['error'])