diff --git a/pushserver/resources/storage/storage.py b/pushserver/resources/storage/storage.py index 72d3c16..b9dfe5b 100644 --- a/pushserver/resources/storage/storage.py +++ b/pushserver/resources/storage/storage.py @@ -1,155 +1,162 @@ import os import _pickle as pickle import logging from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from pushserver.resources import settings from pushserver.resources.utils import log_event from .configuration import CassandraConfig, ServerConfig __all__ = 'TokenStorage', CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True + from cassandra import InvalidRequest from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.io import asyncioreactor from cassandra.policies import DCAwareRoundRobinPolicy from pushserver.models.cassandra import PushTokens, OpenSips if CassandraConfig.table: PushTokens.__table_name__ = CassandraConfig.table class FileStorage(object): def __init__(self): self._tokens = defaultdict() def _save(self): with open(os.path.join(ServerConfig.spool_dir.normalized, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params): try: (token, background_token) = contact_params.token.split('#') except ValueError: token = contact_params.token background_token = None data = contact_params.__dict__ data['token'] = token data['background_token'] = background_token key = f'{contact_params.device_id}-{contact_params.app_id}' if account in self._tokens: self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, device): try: del self._tokens[account][device] except KeyError: pass self._save() class CassandraStorage(object): def load(self): try: connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4, connection_class=asyncioreactor.AsyncioConnection) except NoHostAvailable: msg='Not able to connect to any of the Cassandra contact points' log_event(loggers=settings.params.loggers, msg=msg, level='error') def __getitem__(self, key): def query_tokens(key): username, domain = key.split('@', 1) tokens = {} for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[f'{device.device_id}-{device.app_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'platform': device.platform, 'app_id': device.app_id, 'silent': bool(int(device.silent))} return tokens return query_tokens(key) def add(self, account, contact_params): username, domain = account.split('@', 1) try: (token, background_token) = contact_params.token.split('#') except ValueError: token = contact_params.token background_token = None - PushTokens.create(username=username, domain=domain, device_id=contact_params.device_id, - device_token=token, background_token=background_token, platform=contact_params.platform, - silent=str(int(contact_params.silent is True)), app_id=contact_params.app_id, - user_agent=contact_params.user_agent) - OpenSips.create(opensipskey=account, opensipsval='1') + try: + PushTokens.create(username=username, domain=domain, device_id=contact_params.device_id, + device_token=token, background_token=background_token, platform=contact_params.platform, + silent=str(int(contact_params.silent is True)), app_id=contact_params.app_id, + user_agent=contact_params.user_agent) + except InvalidRequest as e: + log_event(loggers=settings.params.loggers, msg=f'Storing token failed: {e}', level='error') + try: + OpenSips.create(opensipskey=account, opensipsval='1') + except InvalidRequest as e: + log_event(loggers=settings.params.loggers, msg=e, level='error') def remove(self, account, device): username, domain = account.split('@', 1) device_id, app_id = device.split('-', 1) print(device) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass else: # We need to check for other device_ids/app_ids before we can remove the cache value for OpenSIPS if not self[account]: try: OpenSips.objects(OpenSips.opensipskey == account).if_exists().delete() except LWTException: pass class TokenStorage(object, metaclass=Singleton): def __new__(self): configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: msg='Reading storage configuration from {}'.format(', '.join(configuration.files)) log_event(loggers=settings.params.loggers, msg=msg, level='info') makedirs(ServerConfig.spool_dir.normalized) if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: if CassandraConfig.debug: logging.getLogger('cassandra').setLevel(logging.DEBUG) else: logging.getLogger('cassandra').setLevel(logging.INFO) log_event(loggers=settings.params.loggers, msg='Using Cassandra for token storage', level='info') return CassandraStorage() else: log_event(loggers=settings.params.loggers, msg='Using pickle file for token storage', level='info') return FileStorage()