diff --git a/pushserver/resources/storage/storage.py b/pushserver/resources/storage/storage.py index 560917f..7f0e766 100644 --- a/pushserver/resources/storage/storage.py +++ b/pushserver/resources/storage/storage.py @@ -1,179 +1,179 @@ import os import _pickle as pickle import logging from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from pushserver.resources import settings from pushserver.resources.utils import log_event from .configuration import CassandraConfig, ServerConfig from .errors import StorageError __all__ = 'TokenStorage' CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.io import asyncioreactor from cassandra.policies import DCAwareRoundRobinPolicy from pushserver.models.cassandra import PushTokens, OpenSips if CassandraConfig.table: PushTokens.__table_name__ = CassandraConfig.table class FileStorage(object): def __init__(self): self._tokens = defaultdict() def _save(self): with open(os.path.join(ServerConfig.spool_dir.normalized, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params): token = contact_params.token background_token = None if contact_params.platform == 'apple': try: (token, background_token) = contact_params.token.split('-') except ValueError: pass contact_params.device_id = contact_params.device_id.strip("") data = contact_params.__dict__ data['token'] = token data['background_token'] = background_token key = f'{contact_params.app_id}-{contact_params.device_id}' if account in self._tokens: self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, app_id, device_id): key = f'{app_id}-{device_id}' try: del self._tokens[account][key] except KeyError: pass self._save() class CassandraStorage(object): def load(self): try: connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4, connection_class=asyncioreactor.AsyncioConnection) except NoHostAvailable: msg='Not able to connect to any of the Cassandra contact points' log_event(loggers=settings.params.loggers, msg=msg, level='error') def __getitem__(self, key): def query_tokens(key): username, domain = key.split('@', 1) tokens = {} try: for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'background_token': device.background_token, 'platform': device.platform, 'app_id': device.app_id, 'silent': bool(int(device.silent))} except CQLEngineException as e: log_event(loggers=settings.params.loggers, msg=f'Get token(s) failed: {e}', level='error') raise StorageError return tokens return query_tokens(key) def add(self, account, contact_params): username, domain = account.split('@', 1) token = contact_params.token background_token = None if contact_params.platform == 'apple': try: (token, background_token) = contact_params.token.split('-') except ValueError: pass contact_params.device_id = contact_params.device_id.strip("") try: PushTokens.create(username=username, domain=domain, device_id=contact_params.device_id, device_token=token, background_token=background_token, platform=contact_params.platform, silent=str(int(contact_params.silent is True)), app_id=contact_params.app_id, user_agent=contact_params.user_agent) except (CQLEngineException, InvalidRequest) as e: log_event(loggers=settings.params.loggers, msg=f'Storing token failed: {e}', level='error') raise StorageError try: OpenSips.create(opensipskey=account, opensipsval='1') except (CQLEngineException, InvalidRequest) as e: log_event(loggers=settings.params.loggers, msg=e, level='error') raise StorageError def remove(self, account, app_id='', device_id=''): username, domain = account.split('@', 1) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass # We need to check for other device_ids/app_ids before we can remove the cache value for OpenSIPS if not self[account]: try: OpenSips.objects(OpenSips.opensipskey == account).if_exists().delete() except LWTException: pass class TokenStorage(object, metaclass=Singleton): def __new__(self): configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: - msg='Reading storage configuration from {}'.format(', '.join(configuration.files)) + msg = 'Reading storage configuration from {}'.format(', '.join(configuration.files)) log_event(loggers=settings.params.loggers, msg=msg, level='info') makedirs(ServerConfig.spool_dir.normalized) if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: if CassandraConfig.debug: logging.getLogger('cassandra').setLevel(logging.DEBUG) else: logging.getLogger('cassandra').setLevel(logging.INFO) log_event(loggers=settings.params.loggers, msg='Using Cassandra for token storage', level='info') return CassandraStorage() else: log_event(loggers=settings.params.loggers, msg='Using pickle file for token storage', level='info') return FileStorage() diff --git a/sylk-pushserver b/sylk-pushserver index be51d15..374fa39 100755 --- a/sylk-pushserver +++ b/sylk-pushserver @@ -1,139 +1,138 @@ #!/usr/bin/env python3 import argparse -import datetime import logging import os import sys import uvicorn from pushserver import __info__ as package_info from application.process import process from application import log from pushserver.resources import settings from pushserver.resources.utils import (log_event, resources_available, ssl_cert, try_again) name = 'sylk-pushserver' fullname = "Sylk Pushserver" logging.getLogger("uvicorn").setLevel(logging.WARN) default_dir = '/etc/sylk-pushserver' parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help='Show this help message and exit.') parser.add_argument("--ip", default='', help="If set, server will run in its address") parser.add_argument("--port", default='', help="If set, server will run in its address") parser.add_argument("--config_dir", default=None, metavar='PATH', help="Specify a config directory that contains " "general.ini, applications.ini and " "the credentials directory, " "Default it uses '/etc/sylk-pushserver'") parser.add_argument('--no-fork', action='store_false', dest='fork', help='log and run in the foreground') parser.add_argument("--debug", action="store_true", default=False, help="If set, log headers and body requests to log file.") args = parser.parse_args() if args.fork: try: from cysystemd.journal import JournaldLogHandler except ImportError: from systemd.journal import JournalHandler try: journal_handler = JournaldLogHandler() except NameError: journal_handler = JournalHandler(SYSLOG_IDENTIFIER='sylk-pushserver') log.set_handler(journal_handler) log.capture_output() root_logger = log.get_logger() root_logger.name = name if not args.fork: - console_formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') - log.set_default_formatter(console_formatter) + console_formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + log.set_default_formatter(console_formatter) log.info('Starting %s %s' % (fullname, package_info.__version__)) config_dir = default_dir if args.config_dir is not None: if not os.path.exists(f'{args.config_dir}'): - log.info(f'Specified config directory does not exist') + log.info('Specified config directory does not exist') sys.exit(1) config_dir = args.config_dir settings.init(config_dir, args.debug, args.ip, args.port) # Since TokenStorage config relies on the config_dir it has to be imported here process.configuration.local_directory = config_dir from pushserver.resources.storage import TokenStorage if __name__ == '__main__': if not settings.params.dir['error'] or 'default' in settings.params.dir['error']: storage = TokenStorage() storage.load() sock_available = False while not sock_available: host = settings.params.server['host'] port = int(settings.params.server['port']) tls_cert = settings.params.server['tls_cert'] sock_available = resources_available(host, port) if sock_available: if tls_cert: if ssl_cert(tls_cert): - msg = f'Starting app over SSL...' + msg = 'Starting app over SSL...' print(msg) log_event(loggers=settings.params.loggers, msg=msg, level='info') uvicorn.run('pushserver.resources.app:app', host=host, port=port, ssl_certfile=tls_cert, acces_log=False, log_level='error') break else: msg = f'{tls_cert} is not a valid ssl cert, app will be run without it' print(msg) log_event(loggers=settings.params.loggers, msg=msg, level='deb') uvicorn.run('pushserver.resources.server:server', host=host, port=port, access_log=False, log_level='error') break else: uvicorn.run('pushserver.resources.server:server', host=host, port=port, access_log=False, log_level='error') break else: try_again(timer=30, host=host, port=port, start_error=settings.params.dir['error'], loggers=settings.params.loggers) else: log_event(loggers=settings.params.loggers, msg=settings.params.dir['error'], level='error') print(settings.params.dir['error'])