diff --git a/config/general.ini.sample b/config/general.ini.sample index 7975de3..ed7d95f 100644 --- a/config/general.ini.sample +++ b/config/general.ini.sample @@ -1,60 +1,61 @@ ; The values after the ; are the default values, uncomment them only if you ; want to make changes [server] ; host = 0.0.0.0 ; port = 8400 ; The file containing X.509 certificate and private key in unencrypted format ; If a certificate is set, the server will listen using TLS ; tls_certificate = '' ; by default the server will respond to the client after the outgoing ; request for the push notification is completed. If false, the server will ; reply imediately with 202. The result of the push notification can then ; be found only in the logs. This is designed for client that can block and ; cannot or do not want to wait for the push operation to be completed ; return_async = true ; by default any client is allowed to send requests to the server ; IP addresses and networks in CIDR notation are supported ; e.g: 10.10.10.0/24, 127.0.0.1, 192.168.1.2 ; allowed_hosts = [] ; by default logs go to the journal; uncomment below to also log to a file +; log_to_file = true ; log_file = /var/log/sylk-pushserver/push.log ; Base directory for files created by the token storage ; spool_dir = /var/spool/sylk-pushserver ; If debug is true, headers and payloads for the outgoing requests will also ; be logged ; debug = False [applications] ; paths are relative to the config directory, by default /etc/sylk-pushserver ; and if missing ./config from the curent directory ; mobile applications are configured in this file ; config_file = applications.ini ; credentials relative paths are relative to this directory ; credentials_folder = credentials ; more applications can be added to this directory ; extra_applications_dir = applications/ ; more pns can be added to this directory ; extra_pns_dir = pns/ [Cassandra] ; configuration for token storage to use a Cassandra cluster ; if nothing is set here it will use a pickle file to store the tokens if ; API version 2 is used ; Contact points to cassandra cluster ; cluster_contact_points = ; Keyspace to use to retrieve tokens ; keyspace = diff --git a/pushserver/api/routes/push.py b/pushserver/api/routes/push.py index 574505d..e5a8ccd 100644 --- a/pushserver/api/routes/push.py +++ b/pushserver/api/routes/push.py @@ -1,81 +1,80 @@ import json from fastapi import APIRouter, BackgroundTasks, Request, status from fastapi.responses import JSONResponse from pushserver.models.requests import WakeUpRequest, fix_platform_name from pushserver.resources import settings from pushserver.resources.notification import handle_request from pushserver.resources.utils import (check_host, log_event, log_incoming_request) router = APIRouter() @router.post('', response_model=WakeUpRequest) async def push_requests(request: Request, wp_request: WakeUpRequest, background_tasks: BackgroundTasks): wp_request.platform = fix_platform_name(wp_request.platform) host, port = request.client.host, request.client.port code, description, data = '', '', {} if check_host(host, settings.params.allowed_pool): request_id = f"{wp_request.event}-{wp_request.app_id}-{wp_request.call_id}" if not settings.params.return_async: background_tasks.add_task(log_incoming_request, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=wp_request.__dict__) background_tasks.add_task(log_incoming_request, task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=wp_request.__dict__) background_tasks.add_task(handle_request, wp_request=wp_request, request_id=request_id) status_code, code = status.HTTP_202_ACCEPTED, 202 description, data = 'accepted for delivery', {} try: return JSONResponse(status_code=status_code, content={'code': code, 'description': description, 'data': data}) except json.decoder.JSONDecodeError: return JSONResponse(status_code=status_code, content={'code': code, 'description': description, 'data': {}}) else: log_incoming_request(task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=wp_request.__dict__) log_incoming_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=wp_request.__dict__) results = handle_request(wp_request, request_id=request_id) code = results.get('code') description = 'push notification response' data = results else: msg = f'incoming request from {host} is denied' log_event(loggers=settings.params.loggers, msg=msg, level='deb') code = 403 description = 'access denied by access list' data = {} - if settings.params.loggers['debug']: - log_event(loggers=settings.params.loggers, - msg=msg, level='deb', to_file=True) + log_event(loggers=settings.params.loggers, + msg=msg, level='deb') return JSONResponse(status_code=code, content={'code': code, 'description': description, 'data': data}) diff --git a/pushserver/api/routes/v2/add.py b/pushserver/api/routes/v2/add.py index ff28792..6b8fe16 100644 --- a/pushserver/api/routes/v2/add.py +++ b/pushserver/api/routes/v2/add.py @@ -1,66 +1,65 @@ from fastapi import APIRouter, BackgroundTasks, Request from fastapi.responses import JSONResponse from pushserver.models.requests import AddRequest, fix_platform_name, AddResponse from pushserver.resources import settings from pushserver.resources.storage import TokenStorage from pushserver.resources.utils import (check_host, log_event, log_add_request) router = APIRouter() @router.post('/{account}', response_model=AddResponse) async def add_requests(account: str, request: Request, add_request: AddRequest, background_tasks: BackgroundTasks): add_request.platform = fix_platform_name(add_request.platform) host, port = request.client.host, request.client.port code, description, data = '', '', {} if check_host(host, settings.params.allowed_pool): request_id = f"{add_request.app_id}-{add_request.device_id}" if not settings.params.return_async: background_tasks.add_task(log_add_request, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=add_request.__dict__) background_tasks.add_task(log_add_request, task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=add_request.__dict__) storage = TokenStorage() background_tasks.add_task(storage.add, account, add_request) return add_request else: log_add_request(task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=add_request.__dict__) log_add_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=add_request.__dict__) storage = TokenStorage() storage.add(account, add_request) return add_request else: msg = f'incoming request from {host} is denied' log_event(loggers=settings.params.loggers, msg=msg, level='deb') code = 403 description = 'access denied by access list' data = {} - if settings.params.loggers['debug']: - log_event(loggers=settings.params.loggers, - msg=msg, level='deb', to_file=True) + log_event(loggers=settings.params.loggers, + msg=msg, level='deb') return JSONResponse(status_code=code, content={'code': code, 'description': description, 'data': data}) diff --git a/pushserver/api/routes/v2/push.py b/pushserver/api/routes/v2/push.py index 488fa3a..39e2046 100644 --- a/pushserver/api/routes/v2/push.py +++ b/pushserver/api/routes/v2/push.py @@ -1,130 +1,129 @@ import json from fastapi import APIRouter, BackgroundTasks, Request, status from fastapi.responses import JSONResponse from fastapi.encoders import jsonable_encoder from pydantic import ValidationError from pushserver.models.requests import WakeUpRequest, PushRequest from pushserver.resources import settings from pushserver.resources.storage import TokenStorage from pushserver.resources.notification import handle_request from pushserver.resources.utils import (check_host, log_event, log_incoming_request, log_push_request) router = APIRouter() @router.post('/{account}/push', response_model=PushRequest) async def push_requests(account: str, request: Request, push_request: PushRequest, background_tasks: BackgroundTasks): host, port = request.client.host, request.client.port code, description, data = '', '', [] if check_host(host, settings.params.allowed_pool): request_id = f"{push_request.event}-{account}-{push_request.call_id}" if not settings.params.return_async: background_tasks.add_task(log_push_request, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__) background_tasks.add_task(log_incoming_request, task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__) background_tasks.add_task(handle_request, wp_request=push_request, request_id=request_id) status_code, code = status.HTTP_202_ACCEPTED, 202 description, data = 'accepted for delivery', {} try: return JSONResponse(status_code=status_code, content={'code': code, 'description': description, 'data': data}) except json.decoder.JSONDecodeError: return JSONResponse(status_code=status_code, content={'code': code, 'description': description, 'data': {}}) else: storage = TokenStorage() storage_data = storage[account] expired_devices = [] log_push_request(task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__) if not storage_data: description, data = 'Push request was not sent: user not found', {"account": account} return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content={'code': 404, 'description': description, 'data': data}) for device, push_parameters in storage_data.items(): push_parameters.update(push_request.__dict__) reversed_push_parameters = {} for item in push_parameters.keys(): value = push_parameters[item] if item in ('sip_to', 'sip_from'): item = item.split('_')[1] else: item = item.replace('_', '-') reversed_push_parameters[item] = value try: wp = WakeUpRequest(**reversed_push_parameters) except ValidationError as e: error_msg = e.errors()[0]['msg'] log_push_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=push_request.__dict__, error_msg=error_msg) content = jsonable_encoder({'code': 400, 'description': error_msg, 'data': ''}) return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content=content) log_incoming_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=wp.__dict__) results = handle_request(wp, request_id=request_id) code = results.get('code') if code == 410: expired_devices.append(device) code = 200 description = 'push notification responses' data.append(results) for device in expired_devices: msg = f'Removing {device} from {account}' log_event(loggers=settings.params.loggers, msg=msg, level='deb') storage.remove(account, device) else: msg = f'incoming request from {host} is denied' log_event(loggers=settings.params.loggers, msg=msg, level='deb') code = 403 description = 'access denied by access list' data = {} - if settings.params.loggers['debug']: - log_event(loggers=settings.params.loggers, - msg=msg, level='deb', to_file=True) + log_event(loggers=settings.params.loggers, + sg=msg, level='deb') return JSONResponse(status_code=code, content={'code': code, 'description': description, 'data': data}) diff --git a/pushserver/api/routes/v2/remove.py b/pushserver/api/routes/v2/remove.py index 3fa3095..6f2f692 100644 --- a/pushserver/api/routes/v2/remove.py +++ b/pushserver/api/routes/v2/remove.py @@ -1,90 +1,89 @@ from fastapi import APIRouter, BackgroundTasks, Request, status from fastapi.responses import JSONResponse from pushserver.models.requests import RemoveRequest, RemoveResponse from pushserver.resources import settings from pushserver.resources.storage import TokenStorage from pushserver.resources.utils import (check_host, log_event, log_remove_request) router = APIRouter() @router.delete('/{account}', response_model=RemoveResponse) async def remove_requests(account: str, request: Request, rm_request: RemoveRequest, background_tasks: BackgroundTasks): host, port = request.client.host, request.client.port code, description, data = '', '', {} if check_host(host, settings.params.allowed_pool): request_id = f"{rm_request.device_id}-{rm_request.app_id}" if not settings.params.return_async: background_tasks.add_task(log_remove_request, task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) background_tasks.add_task(log_remove_request, task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) storage = TokenStorage() background_tasks.add_task(storage.remove, account, request_id) return rm_request else: log_remove_request(task='log_request', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) storage = TokenStorage() storage_data = storage[account] print(storage_data) if not storage_data: log_remove_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__, error_msg="User not found in token storage") return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={'result': 'User not found'} ) try: device = storage_data[request_id] except KeyError: log_remove_request(task='log_failure', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__, error_msg="Device or app_id not found in token storage") return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={'result': 'Not found'} ) else: storage.remove(account, request_id) log_remove_request(task='log_success', host=host, loggers=settings.params.loggers, request_id=request_id, body=rm_request.__dict__) msg = f'Removing {device}' log_event(loggers=settings.params.loggers, msg=msg, level='deb') return rm_request else: msg = f'incoming request from {host} is denied' log_event(loggers=settings.params.loggers, msg=msg, level='deb') code = 403 description = 'access denied by access list' data = {} - if settings.params.loggers['debug']: - log_event(loggers=settings.params.loggers, - msg=msg, level='deb', to_file=True) + log_event(loggers=settings.params.loggers, + msg=msg, level='deb') return JSONResponse(status_code=code, content={'code': code, 'description': description, 'data': data}) diff --git a/pushserver/pns/apple.py b/pushserver/pns/apple.py index d7b70ab..086b369 100644 --- a/pushserver/pns/apple.py +++ b/pushserver/pns/apple.py @@ -1,404 +1,403 @@ import json import os import socket import ssl import time import hyper from hyper import HTTP20Connection, tls from pushserver.models.requests import WakeUpRequest from pushserver.resources.utils import log_event, ssl_cert from pushserver.pns.base import PNS, PushRequest, PlatformRegister class ApplePNS(PNS): """ An Apple Push Notification service """ def __init__(self, app_id: str, app_name: str, url_push: str, voip: bool, cert_file: str, key_file: str): """ :param app_id: `str`, blunde id provided by application. :param url_push: `str`, URI to push a notification (from applications.ini) :param cert_file `str`: path to APNS certificate (provided by dev app kit) :param key_file `str`: path to APNS key (provided by dev app kit) :param voip: `bool`, Required for apple, `True` for voip push notification type. """ self.app_id = app_id self.app_name = app_name self.url_push = url_push self.voip = voip self.key_file = key_file self.cert_file = cert_file class AppleConn(ApplePNS): """ An Apple connection """ def __init__(self, app_id: str, app_name: str, url_push: str, voip: bool, cert_file: str, key_file: str, apple_pns: PNS, loggers: dict, port: int = 443): """ :param apple_pns `ApplePNS`: Apple Push Notification Service. :param port `int`: 443 or 2197 to allow APNS traffic but block other HTTP traffic. :param loggers: `dict` global logging instances to write messages (params.loggers) :attribute ssl_context `ssl.SSLContext`: generated with a valid apple certificate. :attribute connection `HTTP20Connection`: related to an app and its corresponding certificate. """ self.app_id = app_id self.app_name = app_name self.url_push = url_push self.voip = voip self.key_file = key_file self.cert_file = cert_file self.apple_pns = apple_pns self.port = port self.loggers = loggers @property def ssl_context(self) -> ssl.SSLContext: """ Define a ssl context using a cert_file to open a connection requires a valid certificate file :return: a ssl.SSLContext object """ cert_file = self.cert_file key_file = self.key_file if self.key_file else self.cert_file ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE ssl_context.load_cert_chain(keyfile=key_file, certfile=cert_file) return ssl_context @property def connection(self) -> HTTP20Connection: """ Open an apple connection requires a ssl context :return: an hyper.http20.connection.HTTP20Connection object """ host = self.url_push port = self.port ssl_context = self.ssl_context connection = HTTP20Connection(host=host, port=port, ssl_context=ssl_context, force_proto=tls.H2C_PROTOCOL) cert_file_name = self.cert_file.split('/')[-1] key_file_name = self.key_file.split('/')[-1] if self.key_file else None - if self.loggers['debug']: - if key_file_name: - msg = f'{self.app_name.capitalize()} app: Connecting to {host}:{port} ' \ - f'using {cert_file_name} certificate ' \ - f'and {key_file_name} key files' - else: - msg = f'{self.app_name.capitalize()} app: Connecting to {host}:{port} ' \ - f'using {cert_file_name} certificate' + if key_file_name: + msg = f'{self.app_name.capitalize()} app: Connecting to {host}:{port} ' \ + f'using {cert_file_name} certificate ' \ + f'and {key_file_name} key files' + else: + msg = f'{self.app_name.capitalize()} app: Connecting to {host}:{port} ' \ + f'using {cert_file_name} certificate' - log_event(loggers=self.loggers, msg=msg, level='deb') + log_event(loggers=self.loggers, msg=msg, level='deb') return connection class AppleRegister(PlatformRegister): def __init__(self, app_id: str, app_name:str, voip: bool, credentials_path: str, config_dict: dict, loggers: dict): self.app_id = app_id self.app_name = app_name self.voip = voip self.credentials_path = credentials_path self.config_dict = config_dict self.loggers = loggers self.error = '' @property def url_push(self) -> str: try: return self.config_dict['apple_push_url'] except KeyError: self.error = 'apple_push_url not found in applications.ini' return None @property def certificate(self) -> dict: if self.error: return {} else: try: cert_file = f"{self.credentials_path}/" \ f"{self.config_dict['apple_certificate']}" cert_exists = os.path.exists(cert_file) if not cert_exists: self.error = f"{cert_file} - no such file." return {} else: return {'cert_file': cert_file, 'cert_exists': cert_exists} except KeyError: self.error = 'apple_certificate not found in applications.ini' return {} @property def key(self) -> dict: if self.error: return {} try: key_file = f"{self.credentials_path}/" \ f"{self.config_dict['apple_key']}" key_exists = os.path.exists(key_file) if not key_exists: self.error = f"{key_file} - no such file." return {} except KeyError: return {} return {'key_file': key_file, 'key_exists': key_exists} @property def ssl_valid_cert(self) -> bool: if self.error: return else: try: cert_file = self.certificate.get('cert_file') key_file = self.key.get('key_file') if not (cert_file or key_file): self.error = 'An apple certificate/key is needed to open a connection' elif not ssl_cert(cert_file, key_file): self.error = f"{cert_file} - bad ssl certificate." return else: return True except FileNotFoundError as exc: self.error = exc return @property def apple_pns(self) -> ApplePNS: if self.error: return if self.ssl_valid_cert: cert_file = self.certificate.get('cert_file') key_file = self.key.get('key_file') return ApplePNS(app_id=self.app_id, app_name=self.app_name, url_push=self.url_push, voip=self.voip, cert_file=cert_file, key_file=key_file) @property def apple_conn(self): if self.error: return return AppleConn(app_id=self.app_id, app_name=self.app_name, url_push=self.url_push, voip=self.voip, cert_file=self.certificate.get('cert_file'), key_file=self.key.get('key_file'), apple_pns=self.apple_pns, loggers=self.loggers).connection @property def register_entries(self): if self.error: return {} return {'pns': self.apple_pns, 'conn': self.apple_conn} class ApplePushRequest(PushRequest): """ Apple push notification request """ def __init__(self, error: str, app_name: str, app_id: str, request_id: str, headers: str, payload: dict, loggers: dict, log_remote: dict, wp_request: WakeUpRequest, register: dict): """ :param error: `str` :param app_name: `str` 'linphone' or 'payload' :param app_id: `str` bundle id :param headers: `AppleHeaders` Apple push notification headers :param payload: `ApplePayload` Apple push notification payload :param wp_request: `WakeUpRequest` :param loggers: `dict` global logging instances to write messages (params.loggers) """ self.error = error self.app_name = app_name self.app_id = app_id self.platform = 'apple' self.request_id = request_id self.headers = headers self.payload = payload self.token = wp_request.token self.call_id = wp_request.call_id self.media_type = wp_request.media_type self.wp_request = wp_request self.loggers = loggers self.log_remote = log_remote self.apple_pns = register['pns'] self.connection = register['conn'] self.path = f'/3/device/{self.token}' self.results = self.send_notification() def send_notification(self) -> dict: """ Send an apple push requests to a single device. If status of response is like 5xx, an exponential backoff factor is implemented to retry the notification according to media type. :param `hstr` token: destination device. :param `str` method: HTTP request method, must be 'POST'. :param `AppleHeaders` headers: Apple push notification headers. :param `ApplePayload` payload: Apple push notification payload. """ if self.error: self.log_error() return {'code': 500, 'body': {}, 'reason': 'Internal server error'} n_retries, backoff_factor = self.retries_params(self.media_type) log_path = f'http://{self.apple_pns.url_push}{self.path}' status_forcelist = tuple([status for status in range(500, 600)]) counter = 0 status = 500 reason = '' body = {} while counter <= n_retries: if self.connection: try: self.log_request(path=log_path) self.connection.request('POST', self.path, self.payload, self.headers) response = self.connection.get_response() reason_str = response.read().decode('utf8').replace("'", '"') if reason_str: reason_json = json.loads(reason_str) reason = reason_json.get('reason') else: reason = reason_str status = response.status if status not in status_forcelist: break except socket.gaierror: reason = 'socket error' except hyper.http20.exceptions.StreamResetError: reason = 'stream error' except ValueError as err: reason = f'Bad type of object in headers or payload: {err}' break else: reason = 'no connection' counter += 1 timer = backoff_factor * (2 ** (counter - 1)) time.sleep(timer) if counter == n_retries: reason = 'max retries reached' url = f'https:{self.connection.host}:{self.connection.port}{self.path}' if status != 200: details = self.apple_error_info(reason) if details: reason = f'{reason} - {details}' if status == 400 and 'BadDeviceToken' in reason: status = 410 results = {'body': body, 'code': status, 'reason': reason, 'url': url, 'platform': 'apple', 'call_id': self.call_id, 'token': self.token } self.results = results self.log_results() return results def apple_error_info(self, reason): """ Give a human readable message according to 'reason' from apple APN. :returns : a string with message according to reason """ description_codes = {'ConnectionFailed': 'There was an error connecting to APNs.', 'InternalException': 'This exception should not be raised. If it is, please report this as a bug.', 'BadPayloadException': 'Something bad with the payload.', 'BadCollapseId': 'The collapse identifier exceeds the maximum allowed size', 'BadDeviceToken': 'The specified device token was bad. Verify that the request contains a valid token and that the token matches the environment.', 'BadExpirationDate:': 'The apns-expiration value is bad.', 'BadMessageId': 'The apns-id value is bad.', 'BadPriority': 'The apns-priority value is bad.', 'BadTopic': 'The apns-topic was invalid.', 'DeviceTokenNotForTopic': 'The device token does not match the specified topic.', 'DuplicateHeaders': 'One or more headers were repeated.', 'IdleTimeout': 'Idle time out.', 'MissingDeviceToken': 'The device token is not specified in the request :path. Verify that the :path header contains the device token.', 'MissingTopic': 'The apns-topic header of the request was not specified and was required. The apns-topic header is mandatory when the client is connected using a certificate that supports multiple topics.', 'PayloadEmpty': 'The message payload was empty.', 'TopicDisallowed': 'Pushing to this topic is not allowed.', 'BadCertificate': 'The certificate was bad.', 'BadCertificateEnvironment': 'The client certificate was for the wrong environment.', 'ExpiredProviderToken': 'The provider token is stale and a new token should be generated.', 'Forbidden': 'The specified action is not allowed.', 'InvalidProviderToken': 'The provider token is not valid or the token signature could not be verified.', 'MissingProviderToken': 'No provider certificate was used to connect to APNs and Authorization header was missing or no provider token was specified.', 'BadPath': 'The request contained a bad :path value.', 'MethodNotAllowed': 'The specified :method was not POST.', 'Unregistered': 'The device token is inactive for the specified topic.', 'PayloadTooLarge': 'The message payload was too large. The maximum payload size is 4096 bytes.', 'TooManyProviderTokenUpdates': 'The provider token is being updated too often.', 'TooManyRequests': 'Too many requests were made consecutively to the same device token.', 'InternalServerError': 'An internal server error occurred.', 'ServiceUnavailable': 'The service is unavailable.', 'Shutdown': 'The server is shutting down.', 'InvalidPushType': 'The apns-push-type value is invalid.'} try: message = description_codes[reason] return message except KeyError: return None diff --git a/pushserver/pns/base.py b/pushserver/pns/base.py index 445824e..22e79a3 100644 --- a/pushserver/pns/base.py +++ b/pushserver/pns/base.py @@ -1,250 +1,223 @@ import concurrent import datetime import json import socket import requests from pushserver.resources.utils import log_event class PNS(object): """ Push Notification Service """ def __init__(self, app_id: str, app_name: str, url_push: str, voip: bool = False): """ :param app_id: `str`, Id provided by application. :param app_name: `str`, Application name. :param url_push: `str`, URI to push a notification. :param voip: `bool`, Required for apple, `True` for voip push notification type. """ self.app_id = app_id self.app_name = app_name self.url_push = url_push self.voip = voip class PlatformRegister(object): def __init__(self, config_dict, credentials_path: str, loggers: dict): self.credentials_path = credentials_path self.config_dict = config_dict self.loggers = loggers class PushRequest(object): def __init__(self, error: str, app_name: str, app_id: str, platform: str, request_id: str, headers: str, payload: dict, token: str, media_type: str, loggers: dict, log_remote: dict, wp_request: dict): self.error = error self.app_name = app_name self.app_id = app_id self.platform = platform self.request_id = request_id self.headers = headers self.payload = payload self.token = token self.media_type = media_type self.loggers = loggers self.log_remote = log_remote self.wp_request = wp_request results = {} def retries_params(self, media_type: str) -> tuple: if not media_type or media_type == 'sms': n_tries = 11 else: n_tries = 7 bo_factor = 0.5 return n_tries, bo_factor def log_request(self, path: str) -> None: """ Write in log information about push notification, using log_event function :param path: `str`, path where push notification will be sent. :param app_name: `str` for friendly log. :param platform: `str`, 'apple' or 'firebase'. :param request_id: `str`, request ID generated on request event. :param headers: `json`, of push notification. :param payload: `json`, of push notification. :param loggers: `dict` global logging instances to write messages (params.loggers) """ # log_app_name = app_name.capitalize() log_platform = self.platform.capitalize() log_path = path if path else self.path level = 'info' msg = f'outgoing {log_platform} request {self.request_id} to {log_path}' log_event(loggers=self.loggers, msg=msg, level=level) - if self.loggers['debug']: - level = 'deb' - msg = f'outgoing {log_platform} request {self.request_id} to {log_path}' - log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) + msg = f'outgoing {log_platform} request {self.request_id} headers: {self.headers}' + log_event(loggers=self.loggers, msg=msg, level='deb') - msg = f'outgoing {log_platform} request {self.request_id} headers: {self.headers}' - log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) - - msg = f'outgoing {log_platform} request {self.request_id} body: {self.payload}' - log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) + msg = f'outgoing {log_platform} request {self.request_id} body: {self.payload}' + log_event(loggers=self.loggers, msg=msg, level='deb') def log_error(self): level = 'error' msg = f"outgoing {self.platform.title()} response for " \ f"{self.request_id}, push failed: " \ f"{self.error}" log_event(loggers=self.loggers, msg=msg, level=level) def server_ip(self, destination): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((destination, 1)) return s.getsockname()[0] except socket.error: return None def log_remotely(self, body: dict, code: str, reason: str, url: str) -> None: """ Fork a log of a payload incoming request to a remote url :param body: `dict` response to push request :param code: `int` of response to push request :param reason: `str` of response to push request """ push_response = {'code': code, 'description': reason, 'push_url': url} headers = {'Content-Type': 'application/json'} server_ip = self.server_ip('1.2.3.4') now = datetime.datetime.now() timestamp = now.strftime("%Y-%m-%d %H:%M:%S") payload = {'request': body, 'response': push_response, 'server_ip': server_ip,'timestamp': timestamp} task = 'log remote' log_key = self.log_remote.get('log_key') log_time_out = self.log_remote.get('log_time_out') results = [] for log_url in self.log_remote['log_urls']: - if self.loggers['debug']: - msg = f'{task} request {self.request_id} to {log_url}' - log_event(loggers=self.loggers, msg=msg, level='info') - msg = f'{task} request {self.request_id} to {log_url} headers: {headers}' - log_event(loggers=self.loggers, msg=msg, level='info', to_file=True) - msg = f'{task} request {self.request_id} to {log_url} body: {payload}' - log_event(loggers=self.loggers, msg=msg, level='info', to_file=True) + msg = f'{task} request {self.request_id} to {log_url}' + log_event(loggers=self.loggers, msg=msg, level='deb') + msg = f'{task} request {self.request_id} to {log_url} headers: {headers}' + log_event(loggers=self.loggers, msg=msg, level='deb') + msg = f'{task} request {self.request_id} to {log_url} body: {payload}' + log_event(loggers=self.loggers, msg=msg, level='deb') try: with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: futures = [ executor.submit( lambda: requests.post(url=log_url, json=payload, headers=headers, timeout=log_time_out or 2) ) for log_url in self.log_remote['log_urls'] ] results = [ f.result() for f in futures ] except requests.exceptions.ConnectionError as exc: - if self.loggers['debug']: - msg = f'{task} for {self.request_id}: connection error {exc}' - log_event(loggers=self.loggers, msg=msg, level='error') - log_event(loggers=self.loggers, msg=msg, level='error', to_file=True) + msg = f'{task} for {self.request_id}: connection error {exc}' + log_event(loggers=self.loggers, msg=msg, level='error') except requests.exceptions.ReadTimeout as exc: - if self.loggers['debug']: - msg = f'{task} for {self.request_id}: connection error {exc}' - log_event(loggers=self.loggers, msg=msg, level='error') - log_event(loggers=self.loggers, msg=msg, level='error', to_file=True) + msg = f'{task} for {self.request_id}: connection error {exc}' + log_event(loggers=self.loggers, msg=msg, level='error') if not results: return for url, result in list(zip(self.log_remote['log_urls'], results)): code = result.status_code text = result.text[:500] if log_key: try: result = result.json() value = result.get(log_key) except (json.decoder.JSONDecodeError, AttributeError): value = {} if value: - if self.loggers['debug']: - msg = f'{task} response for request {self.request_id} from {url} - ' \ - f'{code} {log_key}: {value}' - log_event(loggers=self.loggers, msg=msg, level='info') - log_event(loggers=self.loggers, msg=msg, level='info', to_file=True) - + msg = f'{task} response for request {self.request_id} from {url} - ' \ + f'{code} {log_key}: {value}' + log_event(loggers=self.loggers, msg=msg, level='deb') else: - if self.loggers['debug']: - msg = f'{task} response for request {self.request_id} - ' \ - f'code: {code}, key not found' - log_event(loggers=self.loggers, msg=msg, level='error') - - msg = f'{task} response for request {self.request_id} - ' \ - f'{log_key} key not found in: {text}' - log_event(loggers=self.loggers, msg=msg, level='error', to_file=True) - + msg = f'{task} response for request {self.request_id} - ' \ + f'code: {code}, key not found' + log_event(loggers=self.loggers, msg=msg, level='error') else: - - if self.loggers['debug']: - msg = f'{task} code response for request {self.request_id} ' \ - f'from {url}: {code}' - log_event(loggers=self.loggers, msg=msg, level='info') - msg = f'{task} response for request {self.request_id} ' \ - f'from {url}: {code} {text}' - log_event(loggers=self.loggers, msg=msg, level='info', to_file=True) + msg = f'{task} response for request {self.request_id} ' \ + f'from {url}: {code} {text}' + log_event(loggers=self.loggers, msg=msg, level='deb') def log_results(self): """ Log to journal system the result of push notification """ body = self.results['body'] code = self.results['code'] reason = self.results['reason'] url = self.results['url'] - if self.loggers['debug']: - level = 'info' - body = json.dumps(body) - msg = f"outgoing {self.platform.title()} response for request " \ - f"{self.request_id} body: {body}" - log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) + level = 'info' + body = json.dumps(body) + msg = f"outgoing {self.platform.title()} response for request " \ + f"{self.request_id} body: {body}" + log_event(loggers=self.loggers, msg=msg, level='deb') if code == 200: - level = 'info' msg = f"outgoing {self.platform.title()} response for request " \ f"{self.request_id}: push notification sent successfully" log_event(loggers=self.loggers, msg=msg, level=level) else: - level = 'error' msg = f"outgoing {self.platform.title()} response for " \ f"{self.request_id}, push failed with code {code}: {reason}" - log_event(loggers=self.loggers, msg=msg, level=level) + log_event(loggers=self.loggers, msg=msg, level='error') body = {'incoming_body': self.wp_request.__dict__, 'outgoing_headers': self.headers, 'outgoing_body': self.payload } if self.log_remote.get('log_urls'): self.log_remotely(body=body, code=code, reason=reason, url=url) diff --git a/pushserver/resources/server.py b/pushserver/resources/server.py index 50fb87d..4c0ff08 100644 --- a/pushserver/resources/server.py +++ b/pushserver/resources/server.py @@ -1,148 +1,149 @@ import asyncio import os from typing import Callable from fastapi import FastAPI from fastapi.exceptions import RequestValidationError from pushserver.api.errors.validation_error import validation_exception_handler from pushserver.api.routes.api import router from pushserver.resources import settings from pushserver.resources.utils import log_event def get_server() -> FastAPI: server = FastAPI(title='sylk-pushserver', version='1.0.0', debug=True) server.add_event_handler("startup", create_start_server_handler()) server.add_exception_handler(RequestValidationError, validation_exception_handler) server.include_router(router) - return server async def autoreload_read_config(wait_for: float = 0.1) -> None: """ Set global parameters when config folder changes. :param wait_for: `float` time to sleep between looks for changes. """ # thanks to lbellomo for the concept of this function to_watch = {} paths_list = [settings.params.file['path'], settings.params.apps['path'], settings.params.apps['credentials']] for path in paths_list: try: to_watch[path] = os.stat(path).st_mtime except FileNotFoundError: pass while True: for path in to_watch.keys(): last_st_mtime = to_watch[path] path_modified = last_st_mtime != os.stat(path).st_mtime if path_modified: to_watch[path] = os.stat(path).st_mtime settings.params = settings.update_params(settings.params.config_dir, settings.params.debug, settings.params.ip, settings.params.port) await asyncio.sleep(wait_for) break await asyncio.sleep(wait_for) def create_start_server_handler() -> Callable: # type: ignore wait_for = 0.1 async def start_server() -> None: asyncio.create_task(autoreload_read_config(wait_for=wait_for)) level = 'info' loggers = settings.params.loggers register = settings.params.register pns_register = register['pns_register'] - msg = f"Loaded {len(pns_register)} applications from " \ - f"{settings.params.apps['path']}:" + if settings.params.apps['path']: + msg = f"Loaded {len(pns_register)} applications from " \ + f"{settings.params.apps['path']}:" + else: + msg = f"Loaded {len(pns_register)} applications" log_event(loggers=loggers, msg=msg, level=level) for app in pns_register.keys(): app_id, platform = app name = pns_register[app]['name'] msg = f"Loaded {platform.capitalize()} "\ f"{name.capitalize()} app {app_id}" \ log_event(loggers=loggers, msg=msg, level=level) - if settings.params.loggers['debug']: - headers_class = pns_register[app]['headers_class'] - payload_class = pns_register[app]['payload_class'] + headers_class = pns_register[app]['headers_class'] + payload_class = pns_register[app]['payload_class'] - msg = f"{name.capitalize()} app {app_id} classes: " \ - f"{headers_class.__name__}, {payload_class.__name__}" - log_event(loggers=loggers, msg=msg, level='deb') + msg = f"{name.capitalize()} app {app_id} classes: " \ + f"{headers_class.__name__}, {payload_class.__name__}" + log_event(loggers=loggers, msg=msg, level='deb') - log_remote = pns_register[app]['log_remote'] - if log_remote['error']: - msg = f"{name.capitalize()} loading of log remote settings failed: " \ - f"{log_remote['error']}" - log_event(loggers=loggers, msg=msg, level='deb') - elif log_remote.get('log_remote_urls'): - log_settings = '' - for k, v in log_remote.items(): - if k == 'error': - continue - if k == 'log_urls': - v = ', '.join(v) - if k == 'log_remote_key' and not v: - continue - if k == 'log_remote_timeout' and not v: - continue - log_settings += f'{k}: {v} ' - msg = f'{name.capitalize()} log remote settings: {log_settings}' - log_event(loggers=loggers, msg=msg, level='deb') + log_remote = pns_register[app]['log_remote'] + if log_remote['error']: + msg = f"{name.capitalize()} loading of log remote settings failed: " \ + f"{log_remote['error']}" + log_event(loggers=loggers, msg=msg, level='warn') + elif log_remote.get('log_remote_urls'): + log_settings = '' + for k, v in log_remote.items(): + if k == 'error': + continue + if k == 'log_urls': + v = ', '.join(v) + if k == 'log_remote_key' and not v: + continue + if k == 'log_remote_timeout' and not v: + continue + log_settings += f'{k}: {v} ' + msg = f'{name.capitalize()} log remote settings: {log_settings}' + log_event(loggers=loggers, msg=msg, level='deb') invalid_apps = register['invalid_apps'] for app in invalid_apps.keys(): app_id, platform = app[0], app[1] name = invalid_apps[app]['name'] reason = invalid_apps[app]['reason'] msg = f"{name.capitalize()} app with {app_id} id for {platform} platform " \ f"will not be available, reason: {reason}" - log_event(loggers=loggers, msg=msg, level=level) + log_event(loggers=loggers, msg=msg, level='warn') pnses = register['pnses'] - if settings.params.loggers['debug']: - level = 'deb' + if len(pnses) == 0: + msg = f'Loaded {len(pnses)} Push notification services' + else: msg = f'Loaded {len(pnses)} Push notification services: ' \ - f'{", ".join(pnses)}' - log_event(loggers=loggers, msg=msg, level=level) + f'{", ".join(pnses)}' + log_event(loggers=loggers, msg=msg, level='deb') - for pns in pnses: - msg = f"{pns.split('PNS')[0]} Push Notification Service - " \ - f"{pns} class" - log_event(loggers=loggers, msg=msg, level=level) + for pns in pnses: + msg = f"{pns.split('PNS')[0]} Push Notification Service - " \ + f"{pns} class" + log_event(loggers=loggers, msg=msg, level='deb') if settings.params.allowed_pool: nets = [net.with_prefixlen for net in settings.params.allowed_pool] msg = f"Allowed hosts: " \ f"{', '.join(nets)}" log_event(loggers=loggers, msg=msg, level=level) - if settings.params.loggers['debug']: - msg = 'Server is now ready to answer requests' - log_event(loggers=loggers, msg=msg, level='deb') + msg = 'Server is now ready to answer requests' + log_event(loggers=loggers, msg=msg, level='deb') ip, port = settings.params.server['host'], settings.params.server['port'] msg = f'Sylk Pushserver listening on http://{ip}:{port}' log_event(loggers=loggers, msg=msg, level='info') await asyncio.sleep(wait_for) return start_server server = get_server() diff --git a/pushserver/resources/settings.py b/pushserver/resources/settings.py index f646e05..adc6e37 100644 --- a/pushserver/resources/settings.py +++ b/pushserver/resources/settings.py @@ -1,319 +1,308 @@ import configparser import logging import os from ipaddress import ip_network from pushserver.pns.register import get_pns_from_config - - -try: - from systemd.journal import JournaldLogHandler -except ImportError: - from systemd.journal import JournalHandler +from application import log class ConfigParams(object): """ Settings params to share across modules. :param dir: `dict` with 'path' to config dir an 'error' if exists :param file: `dict` with 'path' to config file :param server: `dict` with host, port and tls_cert from config file :param apps: `dict` with path, credentials and extra_dir from config file :param loggers: `dict` global logging instances to write messages (params.loggers) :param allowed_pool: `list` of allowed hosts for requests if there is any error with config dir or config file, others params will be setted to None. """ def __init__(self, config_dir, debug, ip, port): self.default_host, self.default_port = '127.0.0.1', '8400' self.config_dir = config_dir self.debug = debug self.ip, self.port = ip, port self.cfg_file = f'general.ini' self.dir = self.set_dir() self.file = self.set_file() self.loggers = self.set_loggers() self.apps = self.set_apps() self.register = self.set_register() self.allowed_pool = self.set_allowed_pool() self.return_async = self.set_return_async() def set_dir(self): """ if config directory was not specified from command line look for general.ini in /etc/sylk-pushserver if general.ini is not there, server will start with default settings """ dir, error = {}, '' config_dir = self.config_dir msg = f"Reading configuration from {config_dir}" - logging.info(msg) + log.info(msg) if not os.path.exists(f'{self.config_dir}/{self.cfg_file}'): config_dir = '' error = f'No {self.cfg_file} found in {self.config_dir}, ' \ f'server will run with default settings.' dir['path'], dir['error'] = config_dir, error return dir def set_file(self): file, path, error = {}, '', '' if not self.dir.get('error'): path = f"{self.dir['path']}/{self.cfg_file}" error = '' elif 'default' in self.dir.get('error'): path = '' error = self.dir.get('error') file['path'], file['error'] = path, error return file @property def server(self): server = {} if not self.file.get('error') or 'default' in self.file.get('error'): config = configparser.ConfigParser() config.read(self.file['path']) try: server_settings = config['server'] except KeyError: server_settings = {} if self.ip: server['host'] = self.ip else: server['host'] = server_settings.get('host') or self.default_host if self.port: server['port'] = self.port else: server['port'] = server_settings.get('port') or self.default_port server['tls_cert'] = server_settings.get('tls_certificate') or '' return server def set_apps(self): apps = {} apps_path = f'{self.config_dir}/applications.ini' apps_cred = f'{self.config_dir}/credentials' apps_extra_dir = f'{self.config_dir}/applications' pns_extra_dir = f'{self.config_dir}/pns' if self.file['path']: logging.info(f"Reading: {self.file['path']}") config = configparser.ConfigParser() config.read(self.file['path']) config_apps_path = f"{config['applications'].get('config_file')}" config_apps_cred = f"{config['applications'].get('credentials_folder')}" config_apps_extra_dir = f"{config['applications'].get('extra_applications_dir')}" config_pns_extra_dir = f"{config['applications'].get('extra_pns_dir')}" paths_list = [config_apps_path, config_apps_cred, config_apps_extra_dir, config_pns_extra_dir] for i, path in enumerate(paths_list): if not path.startswith('/'): paths_list[i] = f'{self.config_dir}/{path}' config_apps_path = paths_list[0] config_apps_cred = paths_list[1] config_apps_extra_dir = paths_list[2] config_pns_extra_dir = paths_list[3] apps_path_exists = os.path.exists(config_apps_path) cred_path_exists = os.path.exists(config_apps_cred) extra_apps_dir_exists = os.path.exists(config_apps_extra_dir) extra_pns_dir_exists = os.path.exists(config_pns_extra_dir) if apps_path_exists: apps_path = config_apps_path if cred_path_exists: apps_cred = config_apps_cred if extra_apps_dir_exists: apps_extra_dir = config_apps_extra_dir if extra_pns_dir_exists: pns_extra_dir = config_pns_extra_dir else: logging.info(self.dir['error']) if not os.path.exists(apps_path): self.dir['error'] = f'Required config file not found: {apps_path}' apps_path, apps_cred, apps_extra_dir = '', '', '' else: logging.info(f'Reading: {apps_path}') config = configparser.ConfigParser() config.read(apps_path) if config.sections(): for id in config.sections(): try: config[id]['app_id'] config[id]['app_type'] config[id]['app_platform'].lower() except KeyError: self.dir['error'] = f'Can not start: ' \ f'{apps_path} config file has not ' \ f'valid application settings' apps_path, apps_cred, apps_extra_dir = '', '', '' apps['path'] = apps_path apps['credentials'] = apps_cred apps['apps_extra_dir'] = apps_extra_dir apps['pns_extra_dir'] = pns_extra_dir return apps def set_loggers(self): debug = self.debug if self.debug else False loggers = {} config = configparser.ConfigParser() default_path = '/var/log/sylk-pushserver/push.log' log_path = '' if not self.file['error']: config.read(self.file['path']) try: - log_path = f"{config['server']['log_file']}" - try: - str_debug = config['server']['debug'].lower() - except KeyError: - str_debug = False - debug = True if str_debug == 'true' else False - debug = debug or self.debug + log_to_file = f"{config['server']['log_to_file']}" + log_to_file = True if log_to_file.lower() == 'true' else False except KeyError: - log_path, debug = default_path, False + pass + else: + if log_to_file: + try: + log_path = f"{config['server']['log_file']}" + except KeyError: + log_path = default_path - logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') + try: + str_debug = config['server']['debug'].lower() + except KeyError: + str_debug = False + debug = True if str_debug == 'true' else False + debug = debug or self.debug - formatter = logging.Formatter('[%(levelname)s] %(message)s') + formatter = logging.Formatter('%(asctime)s [%(levelname)-8s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + logger_journal = logging.getLogger() + + loggers['to_journal'] = logger_journal - # log to file if log_path: - logger_file = logging.getLogger('to_file') - logger_file.setLevel(logging.DEBUG) try: - loggers['to_file'] = logger_file hdlr = logging.FileHandler(log_path) hdlr.setFormatter(formatter) - logger_file.addHandler(hdlr) + hdlr.setLevel(logging.DEBUG) + logger_journal.addHandler(hdlr) except PermissionError: - self.dir['error'] = f'Permission denied: {log_path}, ' \ - f'debug log file requires ' \ - f'run sylk-pushserver with sudo.' - # log to journal - logger_journal = logging.getLogger('to_journal') - logger_journal.setLevel(logging.DEBUG) - - loggers['to_journal'] = logger_journal - - try: - journal_handler = JournaldLogHandler() - except NameError: - journal_handler = JournalHandler() - - journal_handler.setFormatter(formatter) - logger_journal.addHandler(journal_handler) + log.warning(f'Permission denied for log file: {log_path}, ' \ + f'logging will only be in the journal or foreground') debug = debug or self.debug - loggers['debug'] = debug + + if debug: + logger_journal.setLevel(logging.DEBUG) return loggers def set_register(self): if not self.dir['error'] or 'default' in self.dir['error']: apps_path, apps_cred = self.apps['path'], self.apps['credentials'] apps_extra_dir = self.apps['apps_extra_dir'] pns_extra_dir = self.apps['pns_extra_dir'] return get_pns_from_config(config_path=apps_path, credentials=apps_cred, apps_extra_dir=apps_extra_dir, pns_extra_dir=pns_extra_dir, loggers=self.loggers) @property def pns_register(self): return self.register['pns_register'] @property def invalid_apps(self): return self.register['invalid_apps'] @property def pnses(self): return self.register['pnses'] def set_allowed_pool(self): if self.dir['error']: return None if not self.file['path']: return None allowed_pool = [] allowed_hosts_str = '' config = configparser.ConfigParser() config.read(self.file['path']) try: allowed_hosts_str = config['server']['allowed_hosts'] allowed_hosts = allowed_hosts_str.split(', ') except KeyError: return allowed_pool except SyntaxError: error = f'allowed_hosts = {allowed_hosts_str} - bad syntax' self.dir['error'] = error return allowed_pool if type(allowed_hosts) not in (list, tuple): error = f'allowed_hosts = {allowed_hosts} - bad syntax' self.dir['error'] = error return allowed_pool config.read(self.file['path']) for addr in allowed_hosts: try: net = f'{addr}/32' if '/' not in addr else addr allowed_pool.append(ip_network(net)) except ValueError as e: error = f'wrong acl settings: {e}' self.dir['error'] = error return [] return set(allowed_pool) def set_return_async(self): return_async = True config = configparser.ConfigParser() if not self.file['error']: config.read(self.file['path']) try: return_async = config['server']['return_async'] return_async = True if return_async.lower() == 'true' else False except KeyError: return_async = True return return_async def init(config_dir, debug, ip, port): global params params = ConfigParams(config_dir, debug, ip, port) return params def update_params(config_dir, debug, ip, port): global params try: params = ConfigParams(config_dir, debug, ip, port) except Exception as ex: print(f'Settings can not be updated, reason: {ex}') return params diff --git a/pushserver/resources/utils.py b/pushserver/resources/utils.py index e447bd5..397f2bc 100644 --- a/pushserver/resources/utils.py +++ b/pushserver/resources/utils.py @@ -1,388 +1,373 @@ import hashlib import json import logging import socket import ssl import time from ipaddress import ip_address from datetime import datetime __all__ = ['callid_to_uuid', 'fix_non_serializable_types', 'resources_available', 'ssl_cert', 'try_again', 'check_host', 'log_event', 'fix_device_id', 'fix_platform_name', 'log_incoming_request'] def callid_to_uuid(call_id: str) -> str: """ Generate a UUIDv4 from a callId. UUIDv4 format: five segments of seemingly random hex data, beginning with eight hex characters, followed by three four-character strings, then 12 characters at the end. These segments are separated by a “-”. :param call_id: `str` Globally unique identifier of a call. :return: a str with a uuidv4. """ hexa = hashlib.md5(call_id.encode()).hexdigest() uuidv4 = '%s-%s-%s-%s-%s' % \ (hexa[:8], hexa[8:12], hexa[12:16], hexa[16:20], hexa[20:]) return uuidv4 def fix_non_serializable_types(obj): """ Converts a non serializable object in an appropriate one, if it is possible and in a recursive way. If not, return the str 'No JSON Serializable object' :param obj: obj to convert """ if isinstance(obj, bytes): string = obj.decode() return fix_non_serializable_types(string) elif isinstance(obj, dict): return { fix_non_serializable_types(k): fix_non_serializable_types(v) for k, v in obj.items() } elif isinstance(obj, (tuple, list)): return [fix_non_serializable_types(elem) for elem in obj] elif isinstance(obj, str): try: dict_obj = json.loads(obj) return fix_non_serializable_types(dict_obj) except json.decoder.JSONDecodeError: return obj elif isinstance(obj, (bool, int, float)): return obj else: return def resources_available(host: str, port: int) -> bool: """ Check if a pair ip, port is available for a connection :param: `str` host :param: `int` port :return: a `bool` according to the test result. """ serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if not host or not port: return None try: serversocket.bind((host, port)) serversocket.close() return True except OSError: return False def ssl_cert(cert_file: str, key_file=None) -> bool: """ Check if a ssl certificate is valid. :param cert_file: `str` path to certificate file :param key_file: `str` path to key file :return: `bool` True for a valid certificate. """ ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE try: ssl_context.load_cert_chain(certfile=cert_file, keyfile=key_file) return True except (ssl.SSLError, NotADirectoryError, TypeError): return False def try_again(timer: int, host: str, port: int, start_error: str, loggers: dict) -> None: """ Sleep for a specific time and send log messages in case resources would not be available to start the app. :param timer: `int` time in seconds to wait (30 = DHCP delay) :param host: `str` IP address where app is trying to run :param port: `int` Host where app is trying to run :param start_error: `stṛ` Error msg to show in log. :param loggers: global logging instances to write messages (params.loggers) """ timer = timer # seconds, 30 for dhcp delay. level = 'error' msg = f"[can not init] on {host}:{port} - resources are not available" log_event(msg=start_error, level=level, loggers=loggers) log_event(msg=msg, level=level, loggers=loggers) msg = f'Server will try again in {timer} seconds' log_event(msg=msg, level=level, loggers=loggers) time.sleep(timer) def check_host(host, allowed_hosts) -> bool: """ Check if a host is in allowed_hosts :param host: `str` to check :return: `bool` """ if not allowed_hosts: return True for subnet in allowed_hosts: if ip_address(host) in subnet: return True return False - -def log_event(loggers: dict, msg: str, level: str = 'deb', - to_file: bool = False) -> None: +def log_event(loggers: dict, msg: str, level: str = 'deb') -> None: """ Write log messages into log file and in journal if specified. :param loggers: `dict` global logging instances to write messages (params.loggers) :param msg: `str` message to write :param level: `str` info, error, deb or warn :param to_file: `bool` write just in file if True """ - - if loggers.get('to_file'): - logger = loggers.get('to_file') - else: - logger = loggers.get('to_journal') - - if len(logger.handlers) > 1: - logger.handlers = [logger.handlers[0]] - - msg = f'{datetime.now()} {msg}' + logger = loggers.get('to_journal') if level == 'info': - logger.setLevel(logging.INFO) logger.info(msg) elif level == 'error': - logger.setLevel(logging.ERROR) logger.error(msg) elif level == 'warn': - logger.setLevel(logging.WARNING) logger.warning(msg) elif level in ('deb', 'debug'): - logger.setLevel(logging.DEBUG) logger.debug(msg) def fix_device_id(device_id_to_fix: str) -> str: """ Remove special characters from uuid :param device_id_to_fix: `str` uuid with special characters. :return: a `str` with fixed uuid. """ if '>' in device_id_to_fix: uuid = device_id_to_fix.split(':')[-1].replace('>', '') elif ':' in device_id_to_fix: uuid = device_id_to_fix.split(':')[-1] else: uuid = device_id_to_fix device_id = uuid return device_id def fix_platform_name(platform: str) -> str: """ Fix platform name in case its value is 'android' or 'ios', replacing it for 'firebase' and 'apple' :param platform: `str` name of platform :return: a `str` with fixed name. """ if platform in ('firebase', 'android'): return 'firebase' elif platform in ('apple', 'ios'): return 'apple' else: return platform def fix_payload(body: dict) -> dict: payload = {} for item in body.keys(): value = body[item] if item in ('sip_to', 'sip_from'): item = item.split('_')[1] else: item = item.replace('_', '-') payload[item] = value return payload def pick_log_function(exc, *args, **kwargs): if ('rm_request' in exc.errors()[0]["loc"][1]): return log_remove_request(**kwargs) if ('add_request' in exc.errors()[0]["loc"][1]): return log_add_request(*args, **kwargs) else: return log_incoming_request(*args, **kwargs) def log_add_request(task: str, host: str, loggers: dict, request_id: str = None, body: dict = None, error_msg: str = None) -> None: """ Send log messages according to type of event. :param task: `str` type of event to log, can be 'log_request', 'log_success' or 'log_failure' :param host: `str` client host where request comes from :param loggers: `dict` global logging instances to write messages (params.loggers) :param request_id: `str` request ID generated on request :param body: `dict` body of request :param error_msg: `str` to show in log """ if task == 'log_request': payload = fix_payload(body) level = 'info' msg = f'{host} - Add Token - {request_id}: ' \ f'{payload}' - log_event(msg=msg, level=level, loggers=loggers, to_file=True) + log_event(msg=msg, level=level, loggers=loggers) elif task == 'log_success': payload = fix_payload(body) msg = f'{host} - Add Token - Response {request_id}: ' \ f'{payload}' level = 'info' - log_event(msg=msg, level=level, loggers=loggers, to_file=True) + log_event(msg=msg, level=level, loggers=loggers) elif task == 'log_failure': level = 'error' resp = error_msg print(resp) msg = f'{host} - Add Token Failed - Response {request_id}: ' \ f'{resp}' - log_event(loggers=loggers, msg=msg, level=level, to_file=True) + log_event(loggers=loggers, msg=msg, level=level) def log_remove_request(task: str, host: str, loggers: dict, request_id: str = None, body: dict = None, error_msg: str = None) -> None: """ Send log messages according to type of event. :param task: `str` type of event to log, can be 'log_request', 'log_success' or 'log_failure' :param host: `str` client host where request comes from :param loggers: `dict` global logging instances to write messages (params.loggers) :param request_id: `str` request ID generated on request :param body: `dict` body of request :param error_msg: `str` to show in log """ if task == 'log_request': payload = fix_payload(body) level = 'info' msg = f'{host} - Remove Token - {request_id}: ' \ f'{payload}' - log_event(msg=msg, level=level, loggers=loggers, to_file=True) + log_event(msg=msg, level=level, loggers=loggers) elif task == 'log_success': payload = fix_payload(body) msg = f'{host} - Remove Token - Response {request_id}: ' \ f'{payload}' level = 'info' - log_event(msg=msg, level=level, loggers=loggers, to_file=True) + log_event(msg=msg, level=level, loggers=loggers) elif task == 'log_failure': level = 'error' resp = error_msg msg = f'{host} - Remove Token Failed - Response {request_id}: ' \ f'{resp}' - log_event(loggers=loggers, msg=msg, level=level, to_file=True) + log_event(loggers=loggers, msg=msg, level=level) def log_push_request(task: str, host: str, loggers: dict, request_id: str = None, body: dict = None, error_msg: str = None) -> None: """ Send log messages according to type of event. :param task: `str` type of event to log, can be 'log_request', 'log_success' or 'log_failure' :param host: `str` client host where request comes from :param loggers: `dict` global logging instances to write messages (params.loggers) :param request_id: `str` request ID generated on request :param body: `dict` body of request :param error_msg: `str` to show in log """ sip_to = body.get('sip_to') event = body.get('event') if task == 'log_request': payload = fix_payload(body) level = 'info' msg = f'{host} - Push - {request_id}: ' \ f'{event} for {sip_to} ' \ f': {payload}' - log_event(msg=msg, level=level, loggers=loggers, to_file=True) + log_event(msg=msg, level=level, loggers=loggers) elif task == 'log_failure': level = 'error' resp = error_msg msg = f'{host} - Push Failed - Response {request_id}: ' \ f'{resp}' - log_event(loggers=loggers, msg=msg, level=level, to_file=True) + log_event(loggers=loggers, msg=msg, level=level) def log_incoming_request(task: str, host: str, loggers: dict, request_id: str = None, body: dict = None, error_msg: str = None) -> None: """ Send log messages according to type of event. :param task: `str` type of event to log, can be 'log_request', 'log_success' or 'log_failure' :param host: `str` client host where request comes from :param loggers: `dict` global logging instances to write messages (params.loggers) :param request_id: `str` request ID generated on request :param body: `dict` body of request :param error_msg: `str` to show in log """ app_id = body.get('app_id') platform = body.get('platform') platform = platform if platform else '' sip_to = body.get('sip_to') device_id = body.get('device_id') device_id = fix_device_id(device_id) if device_id else None event = body.get('event') if task == 'log_request': payload = fix_payload(body) level = 'info' if sip_to: if device_id: msg = f'incoming {platform.title()} request {request_id}: ' \ f'{event} for {sip_to} using' \ f' device {device_id} from {host}: {payload}' else: msg = f'incoming {platform.title()} request {request_id}: ' \ f'{event} for {sip_to} ' \ f'from {host}: {payload}' elif device_id: msg = f'incoming {platform.title()} request {request_id}: ' \ f'{event} using' \ f' device {device_id} from {host}: {payload}' else: msg = f'incoming {platform.title()} request {request_id}: ' \ f' from {host}: {payload}' - log_event(msg=msg, level=level, loggers=loggers, to_file=True) + log_event(msg=msg, level=level, loggers=loggers) elif task == 'log_success': msg = f'incoming {platform.title()} response for {request_id}: ' \ f'push accepted' level = 'info' - log_event(msg=msg, level=level, loggers=loggers, to_file=True) + log_event(msg=msg, level=level, loggers=loggers) elif task == 'log_failure': level = 'error' resp = error_msg msg = f'incoming {platform.title()} from {host} response for {request_id}, ' \ f'push rejected: {resp}' - log_event(loggers=loggers, msg=msg, level=level, to_file=True) + log_event(loggers=loggers, msg=msg, level=level) diff --git a/sylk-pushserver b/sylk-pushserver index 54128db..90ac46c 100755 --- a/sylk-pushserver +++ b/sylk-pushserver @@ -1,112 +1,139 @@ #!/usr/bin/env python3 import argparse import datetime import logging import os import sys import uvicorn +from pushserver import __info__ as package_info from application.process import process - +from application import log from pushserver.resources import settings from pushserver.resources.utils import (log_event, resources_available, ssl_cert, try_again) +name = 'sylk-pushserver' +fullname = "Sylk Pushserver" logging.getLogger("uvicorn").setLevel(logging.WARN) default_dir = '/etc/sylk-pushserver' parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help='Show this help message and exit.') parser.add_argument("--ip", default='', help="If set, server will run in its address") parser.add_argument("--port", default='', help="If set, server will run in its address") parser.add_argument("--config_dir", default=None, metavar='PATH', help="Specify a config directory that contains " "general.ini, applications.ini and " "the credentials directory, " "Default it uses '/etc/sylk-pushserver'") +parser.add_argument('--no-fork', + action='store_false', + dest='fork', + help='log and run in the foreground') + parser.add_argument("--debug", action="store_true", default=False, help="If set, log headers and body requests to log file.") args = parser.parse_args() -logging.basicConfig(level=logging.INFO) -logging.info(f'{datetime.datetime.now()} Starting Sylk Pushserver...') +if args.fork: + try: + from systemd.journal import JournaldLogHandler + except ImportError: + from systemd.journal import JournalHandler + + try: + journal_handler = JournaldLogHandler() + except NameError: + journal_handler = JournalHandler(SYSLOG_IDENTIFIER='sylk-pushserver') + + log.set_handler(journal_handler) + log.capture_output() + +root_logger = log.get_logger() +root_logger.name = name + +if not args.fork: + console_formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + log.set_default_formatter(console_formatter) +log.info('Starting %s %s' % (fullname, package_info.__version__)) config_dir = default_dir if args.config_dir is not None: if not os.path.exists(f'{args.config_dir}'): - logging.info(f'Specified config directory does not exist') + log.info(f'Specified config directory does not exist') sys.exit(1) config_dir = args.config_dir settings.init(config_dir, args.debug, args.ip, args.port) # Since TokenStorage config relies on the config_dir it has to be imported here process.configuration.local_directory = config_dir from pushserver.resources.storage import TokenStorage if __name__ == '__main__': if not settings.params.dir['error'] or 'default' in settings.params.dir['error']: storage = TokenStorage() storage.load() sock_available = False while not sock_available: host = settings.params.server['host'] port = int(settings.params.server['port']) tls_cert = settings.params.server['tls_cert'] sock_available = resources_available(host, port) if sock_available: if tls_cert: if ssl_cert(tls_cert): msg = f'Starting app over SSL...' print(msg) log_event(loggers=settings.params.loggers, msg=msg, level='info') uvicorn.run('pushserver.resources.app:app', host=host, port=port, ssl_certfile=tls_cert, acces_log=False, log_level='error') break else: msg = f'{tls_cert} is not a valid ssl cert, app will be run without it' print(msg) log_event(loggers=settings.params.loggers, msg=msg, level='deb') uvicorn.run('pushserver.resources.server:server', host=host, port=port, access_log=False, log_level='error') break else: uvicorn.run('pushserver.resources.server:server', host=host, port=port, access_log=False, log_level='error') break else: try_again(timer=30, host=host, port=port, start_error=settings.params.dir['error'], loggers=settings.params.loggers) else: log_event(loggers=settings.params.loggers, msg=settings.params.dir['error'], level='error') print(settings.params.dir['error'])