diff --git a/config/general.ini.sample b/config/general.ini.sample index f98cffd..d485e1b 100644 --- a/config/general.ini.sample +++ b/config/general.ini.sample @@ -1,64 +1,67 @@ ; The values after the ; are the default values, uncomment them only if you ; want to make changes [server] ; host = 0.0.0.0 ; port = 8400 ; The file containing X.509 certificate and private key in unencrypted format ; If a certificate is set, the server will listen using TLS ; tls_certificate = '' ; by default the server will respond to the client after the outgoing ; request for the push notification is completed. If false, the server will ; reply imediately with 202. The result of the push notification can then ; be found only in the logs. This is designed for client that can block and ; cannot or do not want to wait for the push operation to be completed ; return_async = true ; by default any client is allowed to send requests to the server ; IP addresses and networks in CIDR notation are supported ; e.g: 10.10.10.0/24, 127.0.0.1, 192.168.1.2 ; allowed_hosts = [] ; by default logs go to the journal; uncomment below to also log to a file ; log_to_file = true ; log_file = /var/log/sylk-pushserver/push.log ; Base directory for files created by the token storage ; spool_dir = /var/spool/sylk-pushserver ; If debug is true, headers and payloads for the outgoing requests will also ; be logged ; debug = False [applications] ; paths are relative to the config directory, by default /etc/sylk-pushserver ; and if missing ./config from the curent directory ; mobile applications are configured in this file ; config_file = applications.ini ; credentials relative paths are relative to this directory ; credentials_folder = credentials ; more applications can be added to this directory ; extra_applications_dir = applications/ ; more pns can be added to this directory ; extra_pns_dir = pns/ [Cassandra] ; configuration for token storage to use a Cassandra cluster ; if nothing is set here it will use a pickle file to store the tokens if ; API version 2 is used ; Contact points to cassandra cluster ; cluster_contact_points = ; Keyspace to use to retrieve tokens ; keyspace = ; Table to use to store tokens, default it will use push_tokens ; table = + +; Debug cassandra +; debug = false diff --git a/pushserver/resources/storage/configuration.py b/pushserver/resources/storage/configuration.py index 6a0587a..57c725c 100644 --- a/pushserver/resources/storage/configuration.py +++ b/pushserver/resources/storage/configuration.py @@ -1,56 +1,57 @@ import os import sys from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import HostnameList from application.python.descriptor import classproperty __all__ = 'CassandraConfig', 'ServerConfig' class Path(str): def __new__(cls, path): if path: path = os.path.normpath(path) return str.__new__(cls, path) @property def normalized(self): return os.path.expanduser(self) class VarResources(object): """Provide access to Sylk-Pushserver's resources that should go in /var""" _cached_directory = None @classproperty def directory(cls): if cls._cached_directory is None: binary_directory = os.path.dirname(os.path.realpath(sys.argv[0])) if os.path.basename(binary_directory) == 'bin': path = '/var' else: path = 'var' cls._cached_directory = os.path.abspath(path) return cls._cached_directory @classmethod def get(cls, resource): return os.path.join(cls.directory, resource or u'') class CassandraConfig(ConfigSection): __cfgfile__ = 'general.ini' __section__ = 'Cassandra' cluster_contact_points = ConfigSetting(type=HostnameList, value=None) keyspace = ConfigSetting(type=str, value='') table = ConfigSetting(type=str, value='') + debug = False class ServerConfig(ConfigSection): __cfgfile__ = 'general.ini' __section__ = 'server' spool_dir = ConfigSetting(type=Path, value=Path(VarResources.get('spool/sylk-pushserver'))) diff --git a/pushserver/resources/storage/storage.py b/pushserver/resources/storage/storage.py index ee0336b..72d3c16 100644 --- a/pushserver/resources/storage/storage.py +++ b/pushserver/resources/storage/storage.py @@ -1,149 +1,155 @@ import os import _pickle as pickle +import logging + from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from pushserver.resources import settings from pushserver.resources.utils import log_event from .configuration import CassandraConfig, ServerConfig __all__ = 'TokenStorage', CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.io import asyncioreactor from cassandra.policies import DCAwareRoundRobinPolicy from pushserver.models.cassandra import PushTokens, OpenSips if CassandraConfig.table: PushTokens.__table_name__ = CassandraConfig.table class FileStorage(object): def __init__(self): self._tokens = defaultdict() def _save(self): with open(os.path.join(ServerConfig.spool_dir.normalized, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params): try: (token, background_token) = contact_params.token.split('#') except ValueError: token = contact_params.token background_token = None data = contact_params.__dict__ data['token'] = token data['background_token'] = background_token key = f'{contact_params.device_id}-{contact_params.app_id}' if account in self._tokens: self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, device): try: del self._tokens[account][device] except KeyError: pass self._save() class CassandraStorage(object): def load(self): try: connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4, connection_class=asyncioreactor.AsyncioConnection) except NoHostAvailable: msg='Not able to connect to any of the Cassandra contact points' log_event(loggers=settings.params.loggers, msg=msg, level='error') def __getitem__(self, key): def query_tokens(key): username, domain = key.split('@', 1) tokens = {} for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[f'{device.device_id}-{device.app_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'platform': device.platform, 'app_id': device.app_id, 'silent': bool(int(device.silent))} return tokens return query_tokens(key) def add(self, account, contact_params): username, domain = account.split('@', 1) try: (token, background_token) = contact_params.token.split('#') except ValueError: token = contact_params.token background_token = None PushTokens.create(username=username, domain=domain, device_id=contact_params.device_id, device_token=token, background_token=background_token, platform=contact_params.platform, silent=str(int(contact_params.silent is True)), app_id=contact_params.app_id, user_agent=contact_params.user_agent) OpenSips.create(opensipskey=account, opensipsval='1') def remove(self, account, device): username, domain = account.split('@', 1) device_id, app_id = device.split('-', 1) print(device) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass else: # We need to check for other device_ids/app_ids before we can remove the cache value for OpenSIPS if not self[account]: try: OpenSips.objects(OpenSips.opensipskey == account).if_exists().delete() except LWTException: pass class TokenStorage(object, metaclass=Singleton): def __new__(self): configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: msg='Reading storage configuration from {}'.format(', '.join(configuration.files)) log_event(loggers=settings.params.loggers, msg=msg, level='info') makedirs(ServerConfig.spool_dir.normalized) if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: + if CassandraConfig.debug: + logging.getLogger('cassandra').setLevel(logging.DEBUG) + else: + logging.getLogger('cassandra').setLevel(logging.INFO) log_event(loggers=settings.params.loggers, msg='Using Cassandra for token storage', level='info') return CassandraStorage() else: log_event(loggers=settings.params.loggers, msg='Using pickle file for token storage', level='info') return FileStorage()