diff --git a/pushserver/api/routes/v2/push.py b/pushserver/api/routes/v2/push.py index c79c71b..e2da433 100644 --- a/pushserver/api/routes/v2/push.py +++ b/pushserver/api/routes/v2/push.py @@ -1,142 +1,142 @@ import json from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, status from fastapi.responses import JSONResponse from fastapi.encoders import jsonable_encoder from pydantic import ValidationError from pushserver.models.requests import WakeUpRequest, PushRequest from pushserver.resources import settings from pushserver.resources.storage import TokenStorage from pushserver.resources.storage.errors import StorageError from pushserver.resources.notification import handle_request from pushserver.resources.utils import (check_host, log_event, log_incoming_request, log_push_request) router = APIRouter() @router.post('/{account}/push', response_model=PushRequest) async def push_requests(account: str, request: Request, push_request: PushRequest, background_tasks: BackgroundTasks): host, port = request.client.host, request.client.port code, description, data = '', '', [] if check_host(host, settings.params.allowed_pool): request_id = f"{push_request.event}-{account}-{push_request.call_id}" if not settings.params.return_async: background_tasks.add_task(log_push_request, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__) background_tasks.add_task(log_incoming_request, task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__) background_tasks.add_task(handle_request, wp_request=push_request, request_id=request_id) status_code, code = status.HTTP_202_ACCEPTED, 202 description, data = 'accepted for delivery', {} try: return JSONResponse(status_code=status_code, content={'code': code, 'description': description, 'data': data}) except json.decoder.JSONDecodeError: return JSONResponse(status_code=status_code, content={'code': code, 'description': description, 'data': {}}) else: storage = TokenStorage() try: storage_data = storage[account] except StorageError: error = HTTPException(status_code=500, detail="Internal error: storage") log_remove_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__, error_msg=f'500: {{\"detail\": \"{error.detail}\"}}') raise error expired_devices = [] log_push_request(task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__) if not storage_data: description, data = 'Push request was not sent: user not found', {"account": account} return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content={'code': 404, 'description': description, 'data': data}) for device, push_parameters in storage_data.items(): push_parameters.update(push_request.__dict__) reversed_push_parameters = {} for item in push_parameters.keys(): value = push_parameters[item] if item in ('sip_to', 'sip_from'): item = item.split('_')[1] else: item = item.replace('_', '-') reversed_push_parameters[item] = value # Use background_token for cancel if push_parameters['event'] == 'cancel' and push_parameters['background_token'] is not None: reversed_push_parameters['token'] = push_parameters['background_token'] try: wp = WakeUpRequest(**reversed_push_parameters) except ValidationError as e: error_msg = e.errors()[0]['msg'] log_push_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__, error_msg=error_msg) content = jsonable_encoder({'code': 400, 'description': error_msg, 'data': ''}) return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content=content) log_incoming_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=wp.__dict__) results = handle_request(wp, request_id=request_id) code = results.get('code') if code == 410: - expired_devices.append(device) - code = 200 + expired_devices.append((push_parameters['app_id'], push_parameters['device_id'])) + code = 200 description = 'push notification responses' data.append(results) - for device in expired_devices: - msg = f'Removing {device} from {account}' + for expired_device in expired_devices: + msg = f'Removing {expired_device[1]} from {account}' log_event(loggers=settings.params.loggers, - msg=msg, level='deb') - storage.remove(account, device) + msg=msg, level='info') + storage.remove(account, *expired_device) else: msg = f'incoming request from {host} is denied' log_event(loggers=settings.params.loggers, msg=msg, level='deb') code = 403 description = 'access denied by access list' data = {} log_event(loggers=settings.params.loggers, sg=msg, level='deb') return JSONResponse(status_code=code, content={'code': code, 'description': description, 'data': data}) diff --git a/pushserver/api/routes/v2/remove.py b/pushserver/api/routes/v2/remove.py index ecdf655..f81212d 100644 --- a/pushserver/api/routes/v2/remove.py +++ b/pushserver/api/routes/v2/remove.py @@ -1,98 +1,97 @@ from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, status from fastapi.responses import JSONResponse from pushserver.models.requests import RemoveRequest, RemoveResponse from pushserver.resources import settings from pushserver.resources.storage import TokenStorage from pushserver.resources.storage.errors import StorageError from pushserver.resources.utils import (check_host, log_event, log_remove_request) router = APIRouter() @router.delete('/{account}', response_model=RemoveResponse) async def remove_requests(account: str, request: Request, rm_request: RemoveRequest, background_tasks: BackgroundTasks): host, port = request.client.host, request.client.port code, description, data = '', '', {} if check_host(host, settings.params.allowed_pool): request_id = f"{account}-{rm_request.app_id}-{rm_request.device_id}" if not settings.params.return_async: background_tasks.add_task(log_remove_request, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) background_tasks.add_task(log_remove_request, task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) storage = TokenStorage() - background_tasks.add_task(storage.remove, account, request_id) - + background_tasks.add_task(storage.remove, account, rm_request.app_id, rm_request.device_id) return rm_request else: log_remove_request(task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) storage = TokenStorage() try: storage_data = storage[account] except StorageError: error = HTTPException(status_code=500, detail="Internal error: storage") log_remove_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__, error_msg=f'500: {{\"detail\": \"{error.detail}\"}}') raise error if not storage_data: log_remove_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__, error_msg="User not found in token storage") return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={'result': 'User not found'} ) + device_id = f"{rm_request.app_id}-{rm_request.device_id}" try: - device = storage_data[request_id] + device = storage_data[device_id] except KeyError: log_remove_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__, error_msg="Device or app_id not found in token storage") return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={'result': 'Not found'} ) else: - device = f"{rm_request.app_id}-{rm_request.device_id}" - storage.remove(account, device) + storage.remove(account, rm_request.app_id, rm_request.device_id) log_remove_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) msg = f'Removing {device}' log_event(loggers=settings.params.loggers, msg=msg, level='deb') return rm_request else: msg = f'incoming request from {host} is denied' log_event(loggers=settings.params.loggers, msg=msg, level='deb') code = 403 description = 'access denied by access list' data = {} log_event(loggers=settings.params.loggers, msg=msg, level='deb') return JSONResponse(status_code=code, content={'code': code, 'description': description, 'data': data}) diff --git a/pushserver/resources/storage/storage.py b/pushserver/resources/storage/storage.py index dfb2c98..5aea6df 100644 --- a/pushserver/resources/storage/storage.py +++ b/pushserver/resources/storage/storage.py @@ -1,169 +1,169 @@ import os import _pickle as pickle import logging from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from pushserver.resources import settings from pushserver.resources.utils import log_event from .configuration import CassandraConfig, ServerConfig from .errors import StorageError __all__ = 'TokenStorage' CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.io import asyncioreactor from cassandra.policies import DCAwareRoundRobinPolicy from pushserver.models.cassandra import PushTokens, OpenSips if CassandraConfig.table: PushTokens.__table_name__ = CassandraConfig.table class FileStorage(object): def __init__(self): self._tokens = defaultdict() def _save(self): with open(os.path.join(ServerConfig.spool_dir.normalized, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params): try: (token, background_token) = contact_params.token.split('#') except ValueError: token = contact_params.token background_token = None data = contact_params.__dict__ data['token'] = token data['background_token'] = background_token key = f'{contact_params.app_id}-{contact_params.device_id}' if account in self._tokens: self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() - def remove(self, account, device): + def remove(self, account, app_id, device_id): + key = f'{app_id}-{device_id}' try: - del self._tokens[account][device] + del self._tokens[account][key] except KeyError: pass self._save() class CassandraStorage(object): def load(self): try: connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4, connection_class=asyncioreactor.AsyncioConnection) except NoHostAvailable: msg='Not able to connect to any of the Cassandra contact points' log_event(loggers=settings.params.loggers, msg=msg, level='error') def __getitem__(self, key): def query_tokens(key): username, domain = key.split('@', 1) tokens = {} try: for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): - tokens[f'{device.device_id}-{device.app_id}'] = {'device_id': device.device_id, 'token': device.device_token, + tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'background_token': device.background_token, 'platform': device.platform, 'app_id': device.app_id, 'silent': bool(int(device.silent))} except CQLEngineException as e: log_event(loggers=settings.params.loggers, msg=f'Get token(s) failed: {e}', level='error') raise StorageError return tokens return query_tokens(key) def add(self, account, contact_params): username, domain = account.split('@', 1) try: (token, background_token) = contact_params.token.split('#') except ValueError: token = contact_params.token background_token = None try: PushTokens.create(username=username, domain=domain, device_id=contact_params.device_id, device_token=token, background_token=background_token, platform=contact_params.platform, silent=str(int(contact_params.silent is True)), app_id=contact_params.app_id, user_agent=contact_params.user_agent) except (CQLEngineException, InvalidRequest) as e: log_event(loggers=settings.params.loggers, msg=f'Storing token failed: {e}', level='error') raise StorageError try: OpenSips.create(opensipskey=account, opensipsval='1') except (CQLEngineException, InvalidRequest) as e: log_event(loggers=settings.params.loggers, msg=e, level='error') raise StorageError - def remove(self, account, device): + def remove(self, account, app_id, device_id): username, domain = account.split('@', 1) - app_id, device_id = device.split('-', 1) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass else: # We need to check for other device_ids/app_ids before we can remove the cache value for OpenSIPS if not self[account]: try: OpenSips.objects(OpenSips.opensipskey == account).if_exists().delete() except LWTException: pass class TokenStorage(object, metaclass=Singleton): def __new__(self): configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: msg='Reading storage configuration from {}'.format(', '.join(configuration.files)) log_event(loggers=settings.params.loggers, msg=msg, level='info') makedirs(ServerConfig.spool_dir.normalized) if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: if CassandraConfig.debug: logging.getLogger('cassandra').setLevel(logging.DEBUG) else: logging.getLogger('cassandra').setLevel(logging.INFO) log_event(loggers=settings.params.loggers, msg='Using Cassandra for token storage', level='info') return CassandraStorage() else: log_event(loggers=settings.params.loggers, msg='Using pickle file for token storage', level='info') return FileStorage()