diff --git a/pushserver/api/errors/validation_error.py b/pushserver/api/errors/validation_error.py index 52ec826..a3fc7ae 100644 --- a/pushserver/api/errors/validation_error.py +++ b/pushserver/api/errors/validation_error.py @@ -1,53 +1,60 @@ from typing import Union from fastapi import Request, status from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse from pydantic import ValidationError from pushserver.resources import settings from pushserver.resources.utils import pick_log_function async def validation_exception_handler(request: Request, exc: Union[RequestValidationError, ValidationError]) -> JSONResponse: host, port = request.scope['client'][0], request.scope['client'][1] + account = None error_msg = None code = 400 status_code = status.HTTP_400_BAD_REQUEST for entry in exc.errors(): if 'found' in entry['msg'] or 'configured' in entry['msg']: status_code = status.HTTP_404_NOT_FOUND code = 404 if not error_msg: if '__root__' not in exc.errors()[0]["loc"][2]: error_msg = f'{exc.errors()[0]["msg"]}: {exc.errors()[0]["loc"][2]}' else: error_msg = exc.errors()[0]["msg"] try: request_id = f"{exc.body['event']} - " \ f"{exc.body['app-id']}-" \ f"{exc.body['call-id']}" except (KeyError, TypeError): - request_id = "unknown" + try: + account = request['path_params']['account'] + request_id = f"{account}-" \ + f"{exc.body['app-id']}-" \ + f"{exc.body['device-id']}" + except (KeyError, TypeError): + request_id = "unknown" pick_log_function(exc, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=exc.body) pick_log_function(exc, task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=exc.body, error_msg=error_msg) content = jsonable_encoder({'code': code, 'description': error_msg, 'data': ''}) return JSONResponse( status_code=status_code, content=content) diff --git a/pushserver/api/routes/v2/add.py b/pushserver/api/routes/v2/add.py index 6b8fe16..22dec9f 100644 --- a/pushserver/api/routes/v2/add.py +++ b/pushserver/api/routes/v2/add.py @@ -1,65 +1,64 @@ from fastapi import APIRouter, BackgroundTasks, Request from fastapi.responses import JSONResponse from pushserver.models.requests import AddRequest, fix_platform_name, AddResponse from pushserver.resources import settings from pushserver.resources.storage import TokenStorage from pushserver.resources.utils import (check_host, log_event, log_add_request) router = APIRouter() @router.post('/{account}', response_model=AddResponse) async def add_requests(account: str, request: Request, add_request: AddRequest, background_tasks: BackgroundTasks): add_request.platform = fix_platform_name(add_request.platform) host, port = request.client.host, request.client.port code, description, data = '', '', {} if check_host(host, settings.params.allowed_pool): - request_id = f"{add_request.app_id}-{add_request.device_id}" - + request_id = f"{account}-{add_request.app_id}-{add_request.device_id}" if not settings.params.return_async: background_tasks.add_task(log_add_request, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=add_request.__dict__) background_tasks.add_task(log_add_request, task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=add_request.__dict__) storage = TokenStorage() background_tasks.add_task(storage.add, account, add_request) return add_request else: log_add_request(task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=add_request.__dict__) log_add_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=add_request.__dict__) storage = TokenStorage() storage.add(account, add_request) return add_request else: msg = f'incoming request from {host} is denied' log_event(loggers=settings.params.loggers, msg=msg, level='deb') code = 403 description = 'access denied by access list' data = {} log_event(loggers=settings.params.loggers, msg=msg, level='deb') return JSONResponse(status_code=code, content={'code': code, 'description': description, 'data': data}) diff --git a/pushserver/api/routes/v2/remove.py b/pushserver/api/routes/v2/remove.py index 6f2f692..f2b4315 100644 --- a/pushserver/api/routes/v2/remove.py +++ b/pushserver/api/routes/v2/remove.py @@ -1,89 +1,90 @@ from fastapi import APIRouter, BackgroundTasks, Request, status from fastapi.responses import JSONResponse from pushserver.models.requests import RemoveRequest, RemoveResponse from pushserver.resources import settings from pushserver.resources.storage import TokenStorage from pushserver.resources.utils import (check_host, log_event, log_remove_request) router = APIRouter() @router.delete('/{account}', response_model=RemoveResponse) async def remove_requests(account: str, request: Request, rm_request: RemoveRequest, background_tasks: BackgroundTasks): host, port = request.client.host, request.client.port code, description, data = '', '', {} if check_host(host, settings.params.allowed_pool): - request_id = f"{rm_request.device_id}-{rm_request.app_id}" + request_id = f"{account}-{rm_request.app_id}-{rm_request.device_id}" if not settings.params.return_async: background_tasks.add_task(log_remove_request, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) background_tasks.add_task(log_remove_request, task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) storage = TokenStorage() background_tasks.add_task(storage.remove, account, request_id) return rm_request else: log_remove_request(task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) storage = TokenStorage() storage_data = storage[account] print(storage_data) if not storage_data: log_remove_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__, error_msg="User not found in token storage") return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={'result': 'User not found'} ) try: device = storage_data[request_id] except KeyError: log_remove_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__, error_msg="Device or app_id not found in token storage") return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={'result': 'Not found'} ) else: - storage.remove(account, request_id) + device = f"{rm_request.app_id}-{rm_request.device_id}" + storage.remove(account, device) log_remove_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) msg = f'Removing {device}' log_event(loggers=settings.params.loggers, msg=msg, level='deb') return rm_request else: msg = f'incoming request from {host} is denied' log_event(loggers=settings.params.loggers, msg=msg, level='deb') code = 403 description = 'access denied by access list' data = {} log_event(loggers=settings.params.loggers, msg=msg, level='deb') return JSONResponse(status_code=code, content={'code': code, 'description': description, 'data': data}) diff --git a/pushserver/resources/storage/storage.py b/pushserver/resources/storage/storage.py index 3ec5043..c22bae5 100644 --- a/pushserver/resources/storage/storage.py +++ b/pushserver/resources/storage/storage.py @@ -1,170 +1,170 @@ import os import _pickle as pickle import logging from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from pushserver.resources import settings from pushserver.resources.utils import log_event from .configuration import CassandraConfig, ServerConfig from .errors import StorageError __all__ = 'TokenStorage' CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.io import asyncioreactor from cassandra.policies import DCAwareRoundRobinPolicy from pushserver.models.cassandra import PushTokens, OpenSips if CassandraConfig.table: PushTokens.__table_name__ = CassandraConfig.table class FileStorage(object): def __init__(self): self._tokens = defaultdict() def _save(self): with open(os.path.join(ServerConfig.spool_dir.normalized, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params): try: (token, background_token) = contact_params.token.split('#') except ValueError: token = contact_params.token background_token = None data = contact_params.__dict__ data['token'] = token data['background_token'] = background_token - key = f'{contact_params.device_id}-{contact_params.app_id}' + key = f'{contact_params.app_id}-{contact_params.device_id}' if account in self._tokens: self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, device): try: del self._tokens[account][device] except KeyError: pass self._save() class CassandraStorage(object): def load(self): try: connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4, connection_class=asyncioreactor.AsyncioConnection) except NoHostAvailable: msg='Not able to connect to any of the Cassandra contact points' log_event(loggers=settings.params.loggers, msg=msg, level='error') def __getitem__(self, key): def query_tokens(key): username, domain = key.split('@', 1) tokens = {} try: for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[f'{device.device_id}-{device.app_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'platform': device.platform, 'app_id': device.app_id, 'silent': bool(int(device.silent))} except CQLEngineException as e: log_event(loggers=settings.params.loggers, msg=f'Get token(s) failed: {e}', level='error') raise StorageError return tokens return query_tokens(key) def add(self, account, contact_params): username, domain = account.split('@', 1) try: (token, background_token) = contact_params.token.split('#') except ValueError: token = contact_params.token background_token = None try: PushTokens.create(username=username, domain=domain, device_id=contact_params.device_id, device_token=token, background_token=background_token, platform=contact_params.platform, silent=str(int(contact_params.silent is True)), app_id=contact_params.app_id, user_agent=contact_params.user_agent) except (CQLEngineException, InvalidRequest) as e: log_event(loggers=settings.params.loggers, msg=f'Storing token failed: {e}', level='error') raise StorageError try: OpenSips.create(opensipskey=account, opensipsval='1') except (CQLEngineException, InvalidRequest) as e: log_event(loggers=settings.params.loggers, msg=e, level='error') raise StorageError def remove(self, account, device): username, domain = account.split('@', 1) - device_id, app_id = device.split('-', 1) + app_id, device_id = device.split('-', 1) print(device) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass else: # We need to check for other device_ids/app_ids before we can remove the cache value for OpenSIPS if not self[account]: try: OpenSips.objects(OpenSips.opensipskey == account).if_exists().delete() except LWTException: pass class TokenStorage(object, metaclass=Singleton): def __new__(self): configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: msg='Reading storage configuration from {}'.format(', '.join(configuration.files)) log_event(loggers=settings.params.loggers, msg=msg, level='info') makedirs(ServerConfig.spool_dir.normalized) if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: if CassandraConfig.debug: logging.getLogger('cassandra').setLevel(logging.DEBUG) else: logging.getLogger('cassandra').setLevel(logging.INFO) log_event(loggers=settings.params.loggers, msg='Using Cassandra for token storage', level='info') return CassandraStorage() else: log_event(loggers=settings.params.loggers, msg='Using pickle file for token storage', level='info') return FileStorage()