diff --git a/pushserver/api/routes/v2/push.py b/pushserver/api/routes/v2/push.py index 6ef60ba..19c6db8 100644 --- a/pushserver/api/routes/v2/push.py +++ b/pushserver/api/routes/v2/push.py @@ -1,241 +1,243 @@ import json from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, status from fastapi.responses import JSONResponse from fastapi.encoders import jsonable_encoder from pydantic import ValidationError from typing import Optional from pushserver.models.requests import WakeUpRequest, PushRequest from pushserver.resources import settings from pushserver.resources.storage import TokenStorage from pushserver.resources.storage.errors import StorageError from pushserver.resources.notification import handle_request from pushserver.resources.utils import (check_host, log_event, log_incoming_request, log_push_request) router = APIRouter() async def task_push(account: str, push_request: PushRequest, request_id: str, host: str, device: Optional[str] = None): code, description, data = '', '', [] storage = TokenStorage() try: storage_data = storage[account] except StorageError: log_push_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__, error_msg=f'500: {{\"detail\": \"{error.detail}\"}}') return expired_devices = [] if not storage_data: # Push request was not sent: user not found + storage.remove(account) return for device_key, push_parameters in storage_data.items(): if device is not None and device != push_parameters['device_id']: continue push_parameters.update(push_request.__dict__) reversed_push_parameters = {} for item in push_parameters.keys(): value = push_parameters[item] if item in ('sip_to', 'sip_from'): item = item.split('_')[1] else: item = item.replace('_', '-') reversed_push_parameters[item] = value # Use background_token for cancel if push_parameters['event'] == 'cancel' and push_parameters['background_token'] is not None: reversed_push_parameters['token'] = push_parameters['background_token'] try: wp = WakeUpRequest(**reversed_push_parameters) except ValidationError as e: error_msg = e.errors()[0]['msg'] log_push_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__, error_msg=error_msg) return log_incoming_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=wp.__dict__) results = handle_request(wp, request_id=request_id) code = results.get('code') if code == 410: expired_devices.append((push_parameters.app_id, push_parameters.device_id)) code = 200 description = 'push notification responses' data.append(results) for device in expired_devices: msg = f'Removing {device[1]} from {account}' log_event(loggers=settings.params.loggers, msg=msg, level='deb') storage.remove(account, *device) if code == '': description, data = 'Push request was not sent: device not found', {"device_id": push_parameters['device_id']} log_event(loggers=settings.params.loggers, msg=f'{description} {data}', level='warn') else: log_event(loggers=settings.params.loggers, msg=f'{description} {data}', level='deb') @router.post('/{account}/push', response_model=PushRequest) @router.post('/{account}/push/{device}', response_model=PushRequest) async def push_requests(account: str, request: Request, push_request: PushRequest, background_tasks: BackgroundTasks, device: Optional[str] = None): host, port = request.client.host, request.client.port code, description, data = '', '', [] if check_host(host, settings.params.allowed_pool): request_id = f"{push_request.event}-{account}-{push_request.call_id}" if not settings.params.return_async: background_tasks.add_task(log_push_request, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__) background_tasks.add_task(log_incoming_request, task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__) background_tasks.add_task(task_push, account=account, push_request=push_request, request_id=request_id, host=host, device=device) status_code, code = status.HTTP_202_ACCEPTED, 202 description, data = 'accepted for delivery', {} try: return JSONResponse(status_code=status_code, content={'code': code, 'description': description, 'data': data}) except json.decoder.JSONDecodeError: return JSONResponse(status_code=status_code, content={'code': code, 'description': description, 'data': {}}) else: storage = TokenStorage() try: storage_data = storage[account] except StorageError: error = HTTPException(status_code=500, detail="Internal error: storage") log_push_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__, error_msg=f'500: {{\"detail\": \"{error.detail}\"}}') raise error expired_devices = [] log_push_request(task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__) if not storage_data: description, data = 'Push request was not sent: user not found', {"account": account} + storage.remove(account) return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content={'code': 404, 'description': description, 'data': data}) for device_key, push_parameters in storage_data.items(): if device is not None and device != push_parameters['device_id']: continue push_parameters.update(push_request.__dict__) reversed_push_parameters = {} for item in push_parameters.keys(): value = push_parameters[item] if item in ('sip_to', 'sip_from'): item = item.split('_')[1] else: item = item.replace('_', '-') reversed_push_parameters[item] = value # Use background_token for cancel if push_parameters['event'] == 'cancel' and push_parameters['background_token'] is not None: reversed_push_parameters['token'] = push_parameters['background_token'] try: wp = WakeUpRequest(**reversed_push_parameters) except ValidationError as e: error_msg = e.errors()[0]['msg'] log_push_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__, error_msg=error_msg) content = jsonable_encoder({'code': 400, 'description': error_msg, 'data': ''}) return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content=content) log_incoming_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=wp.__dict__) results = handle_request(wp, request_id=request_id) code = results.get('code') if code == 410: expired_devices.append((push_parameters['app_id'], push_parameters['device_id'])) code = 200 description = 'push notification responses' data.append(results) for expired_device in expired_devices: msg = f'Removing {expired_device[1]} from {account}' log_event(loggers=settings.params.loggers, msg=msg, level='info') storage.remove(account, *expired_device) if code == '': description, data = 'Push request was not sent: device not found', {"device_id": push_parameters['device_id']} content = {'code': 404, 'description': description, 'data': data} log_push_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__, error_msg=f'{content}') return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=content) else: msg = f'incoming request from {host} is denied' log_event(loggers=settings.params.loggers, msg=msg, level='deb') code = 403 description = 'access denied by access list' data = {} log_event(loggers=settings.params.loggers, sg=msg, level='deb') return JSONResponse(status_code=code, content={'code': code, 'description': description, 'data': data}) diff --git a/pushserver/resources/storage/storage.py b/pushserver/resources/storage/storage.py index 8da6957..560917f 100644 --- a/pushserver/resources/storage/storage.py +++ b/pushserver/resources/storage/storage.py @@ -1,178 +1,179 @@ import os import _pickle as pickle import logging from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from pushserver.resources import settings from pushserver.resources.utils import log_event from .configuration import CassandraConfig, ServerConfig from .errors import StorageError __all__ = 'TokenStorage' CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.io import asyncioreactor from cassandra.policies import DCAwareRoundRobinPolicy from pushserver.models.cassandra import PushTokens, OpenSips if CassandraConfig.table: PushTokens.__table_name__ = CassandraConfig.table class FileStorage(object): def __init__(self): self._tokens = defaultdict() def _save(self): with open(os.path.join(ServerConfig.spool_dir.normalized, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params): token = contact_params.token background_token = None if contact_params.platform == 'apple': try: (token, background_token) = contact_params.token.split('-') except ValueError: pass contact_params.device_id = contact_params.device_id.strip("") data = contact_params.__dict__ data['token'] = token data['background_token'] = background_token key = f'{contact_params.app_id}-{contact_params.device_id}' if account in self._tokens: self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, app_id, device_id): key = f'{app_id}-{device_id}' try: del self._tokens[account][key] except KeyError: pass self._save() class CassandraStorage(object): def load(self): try: connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4, connection_class=asyncioreactor.AsyncioConnection) except NoHostAvailable: msg='Not able to connect to any of the Cassandra contact points' log_event(loggers=settings.params.loggers, msg=msg, level='error') def __getitem__(self, key): def query_tokens(key): username, domain = key.split('@', 1) tokens = {} try: for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'background_token': device.background_token, 'platform': device.platform, 'app_id': device.app_id, 'silent': bool(int(device.silent))} except CQLEngineException as e: log_event(loggers=settings.params.loggers, msg=f'Get token(s) failed: {e}', level='error') raise StorageError return tokens return query_tokens(key) def add(self, account, contact_params): username, domain = account.split('@', 1) token = contact_params.token background_token = None if contact_params.platform == 'apple': try: (token, background_token) = contact_params.token.split('-') except ValueError: pass contact_params.device_id = contact_params.device_id.strip("") try: PushTokens.create(username=username, domain=domain, device_id=contact_params.device_id, device_token=token, background_token=background_token, platform=contact_params.platform, silent=str(int(contact_params.silent is True)), app_id=contact_params.app_id, user_agent=contact_params.user_agent) except (CQLEngineException, InvalidRequest) as e: log_event(loggers=settings.params.loggers, msg=f'Storing token failed: {e}', level='error') raise StorageError try: OpenSips.create(opensipskey=account, opensipsval='1') except (CQLEngineException, InvalidRequest) as e: log_event(loggers=settings.params.loggers, msg=e, level='error') raise StorageError - def remove(self, account, app_id, device_id): + def remove(self, account, app_id='', device_id=''): username, domain = account.split('@', 1) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass - else: - # We need to check for other device_ids/app_ids before we can remove the cache value for OpenSIPS - if not self[account]: - try: - OpenSips.objects(OpenSips.opensipskey == account).if_exists().delete() - except LWTException: - pass + + # We need to check for other device_ids/app_ids before we can remove the cache value for OpenSIPS + if not self[account]: + try: + OpenSips.objects(OpenSips.opensipskey == account).if_exists().delete() + except LWTException: + pass + class TokenStorage(object, metaclass=Singleton): def __new__(self): configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: msg='Reading storage configuration from {}'.format(', '.join(configuration.files)) log_event(loggers=settings.params.loggers, msg=msg, level='info') makedirs(ServerConfig.spool_dir.normalized) if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: if CassandraConfig.debug: logging.getLogger('cassandra').setLevel(logging.DEBUG) else: logging.getLogger('cassandra').setLevel(logging.INFO) log_event(loggers=settings.params.loggers, msg='Using Cassandra for token storage', level='info') return CassandraStorage() else: log_event(loggers=settings.params.loggers, msg='Using pickle file for token storage', level='info') return FileStorage()