diff --git a/pushserver/pns/base.py b/pushserver/pns/base.py index 54d586c..445824e 100644 --- a/pushserver/pns/base.py +++ b/pushserver/pns/base.py @@ -1,249 +1,250 @@ import concurrent import datetime import json import socket import requests from pushserver.resources.utils import log_event class PNS(object): """ Push Notification Service """ def __init__(self, app_id: str, app_name: str, url_push: str, voip: bool = False): """ :param app_id: `str`, Id provided by application. :param app_name: `str`, Application name. :param url_push: `str`, URI to push a notification. :param voip: `bool`, Required for apple, `True` for voip push notification type. """ self.app_id = app_id self.app_name = app_name self.url_push = url_push self.voip = voip class PlatformRegister(object): def __init__(self, config_dict, credentials_path: str, loggers: dict): self.credentials_path = credentials_path self.config_dict = config_dict self.loggers = loggers class PushRequest(object): def __init__(self, error: str, app_name: str, app_id: str, platform: str, request_id: str, headers: str, payload: dict, token: str, media_type: str, loggers: dict, log_remote: dict, wp_request: dict): self.error = error self.app_name = app_name self.app_id = app_id self.platform = platform self.request_id = request_id self.headers = headers self.payload = payload self.token = token self.media_type = media_type self.loggers = loggers self.log_remote = log_remote self.wp_request = wp_request results = {} def retries_params(self, media_type: str) -> tuple: if not media_type or media_type == 'sms': n_tries = 11 else: n_tries = 7 bo_factor = 0.5 return n_tries, bo_factor def log_request(self, path: str) -> None: """ Write in log information about push notification, using log_event function :param path: `str`, path where push notification will be sent. :param app_name: `str` for friendly log. :param platform: `str`, 'apple' or 'firebase'. :param request_id: `str`, request ID generated on request event. :param headers: `json`, of push notification. :param payload: `json`, of push notification. :param loggers: `dict` global logging instances to write messages (params.loggers) """ # log_app_name = app_name.capitalize() log_platform = self.platform.capitalize() log_path = path if path else self.path level = 'info' msg = f'outgoing {log_platform} request {self.request_id} to {log_path}' log_event(loggers=self.loggers, msg=msg, level=level) if self.loggers['debug']: level = 'deb' msg = f'outgoing {log_platform} request {self.request_id} to {log_path}' log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) msg = f'outgoing {log_platform} request {self.request_id} headers: {self.headers}' log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) msg = f'outgoing {log_platform} request {self.request_id} body: {self.payload}' log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) def log_error(self): level = 'error' msg = f"outgoing {self.platform.title()} response for " \ f"{self.request_id}, push failed: " \ f"{self.error}" log_event(loggers=self.loggers, msg=msg, level=level) def server_ip(self, destination): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((destination, 1)) return s.getsockname()[0] except socket.error: return None def log_remotely(self, body: dict, code: str, reason: str, url: str) -> None: """ Fork a log of a payload incoming request to a remote url :param body: `dict` response to push request :param code: `int` of response to push request :param reason: `str` of response to push request """ push_response = {'code': code, 'description': reason, 'push_url': url} headers = {'Content-Type': 'application/json'} server_ip = self.server_ip('1.2.3.4') now = datetime.datetime.now() timestamp = now.strftime("%Y-%m-%d %H:%M:%S") payload = {'request': body, 'response': push_response, 'server_ip': server_ip,'timestamp': timestamp} task = 'log remote' log_key = self.log_remote.get('log_key') log_time_out = self.log_remote.get('log_time_out') results = [] for log_url in self.log_remote['log_urls']: - msg = f'{task} request {self.request_id} to {log_url}' - log_event(loggers=self.loggers, msg=msg, level='info') - if self.loggers['debug']: + msg = f'{task} request {self.request_id} to {log_url}' + log_event(loggers=self.loggers, msg=msg, level='info') msg = f'{task} request {self.request_id} to {log_url} headers: {headers}' log_event(loggers=self.loggers, msg=msg, level='info', to_file=True) msg = f'{task} request {self.request_id} to {log_url} body: {payload}' log_event(loggers=self.loggers, msg=msg, level='info', to_file=True) try: with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: futures = [ executor.submit( lambda: requests.post(url=log_url, json=payload, headers=headers, timeout=log_time_out or 2) ) for log_url in self.log_remote['log_urls'] ] results = [ f.result() for f in futures ] except requests.exceptions.ConnectionError as exc: - msg = f'{task} for {self.request_id}: connection error {exc}' - log_event(loggers=self.loggers, msg=msg, level='error') - log_event(loggers=self.loggers, msg=msg, level='error', to_file=True) + if self.loggers['debug']: + msg = f'{task} for {self.request_id}: connection error {exc}' + log_event(loggers=self.loggers, msg=msg, level='error') + log_event(loggers=self.loggers, msg=msg, level='error', to_file=True) except requests.exceptions.ReadTimeout as exc: - msg = f'{task} for {self.request_id}: connection error {exc}' - log_event(loggers=self.loggers, msg=msg, level='error') - log_event(loggers=self.loggers, msg=msg, level='error', to_file=True) + if self.loggers['debug']: + msg = f'{task} for {self.request_id}: connection error {exc}' + log_event(loggers=self.loggers, msg=msg, level='error') + log_event(loggers=self.loggers, msg=msg, level='error', to_file=True) if not results: return for url, result in list(zip(self.log_remote['log_urls'], results)): code = result.status_code text = result.text[:500] if log_key: try: result = result.json() value = result.get(log_key) except (json.decoder.JSONDecodeError, AttributeError): value = {} if value: - msg = f'{task} response for request {self.request_id} from {url} - ' \ - f'{code} {log_key}: {value}' - log_event(loggers=self.loggers, msg=msg, level='info') if self.loggers['debug']: + msg = f'{task} response for request {self.request_id} from {url} - ' \ + f'{code} {log_key}: {value}' + log_event(loggers=self.loggers, msg=msg, level='info') log_event(loggers=self.loggers, msg=msg, level='info', to_file=True) else: - msg = f'{task} response for request {self.request_id} - ' \ - f'code: {code}, key not found' - log_event(loggers=self.loggers, msg=msg, level='error') - if self.loggers['debug']: + msg = f'{task} response for request {self.request_id} - ' \ + f'code: {code}, key not found' + log_event(loggers=self.loggers, msg=msg, level='error') + msg = f'{task} response for request {self.request_id} - ' \ f'{log_key} key not found in: {text}' log_event(loggers=self.loggers, msg=msg, level='error', to_file=True) else: - msg = f'{task} code response for request {self.request_id} ' \ - f'from {url}: {code}' - log_event(loggers=self.loggers, msg=msg, level='info') if self.loggers['debug']: + msg = f'{task} code response for request {self.request_id} ' \ + f'from {url}: {code}' + log_event(loggers=self.loggers, msg=msg, level='info') msg = f'{task} response for request {self.request_id} ' \ f'from {url}: {code} {text}' log_event(loggers=self.loggers, msg=msg, level='info', to_file=True) def log_results(self): """ Log to journal system the result of push notification """ body = self.results['body'] code = self.results['code'] reason = self.results['reason'] url = self.results['url'] if self.loggers['debug']: level = 'info' body = json.dumps(body) msg = f"outgoing {self.platform.title()} response for request " \ f"{self.request_id} body: {body}" log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) if code == 200: level = 'info' msg = f"outgoing {self.platform.title()} response for request " \ f"{self.request_id}: push notification sent successfully" log_event(loggers=self.loggers, msg=msg, level=level) else: level = 'error' msg = f"outgoing {self.platform.title()} response for " \ f"{self.request_id}, push failed with code {code}: {reason}" log_event(loggers=self.loggers, msg=msg, level=level) body = {'incoming_body': self.wp_request.__dict__, 'outgoing_headers': self.headers, 'outgoing_body': self.payload } if self.log_remote.get('log_urls'): self.log_remotely(body=body, code=code, reason=reason, url=url) diff --git a/pushserver/resources/utils.py b/pushserver/resources/utils.py index ddb5868..749357d 100644 --- a/pushserver/resources/utils.py +++ b/pushserver/resources/utils.py @@ -1,277 +1,273 @@ import hashlib import json import logging import socket import ssl import time from ipaddress import ip_address from datetime import datetime __all__ = ['callid_to_uuid', 'fix_non_serializable_types', 'resources_available', 'ssl_cert', 'try_again', 'check_host', 'log_event', 'fix_device_id', 'fix_platform_name', 'log_incoming_request'] def callid_to_uuid(call_id: str) -> str: """ Generate a UUIDv4 from a callId. UUIDv4 format: five segments of seemingly random hex data, beginning with eight hex characters, followed by three four-character strings, then 12 characters at the end. These segments are separated by a “-”. :param call_id: `str` Globally unique identifier of a call. :return: a str with a uuidv4. """ hexa = hashlib.md5(call_id.encode()).hexdigest() uuidv4 = '%s-%s-%s-%s-%s' % \ (hexa[:8], hexa[8:12], hexa[12:16], hexa[16:20], hexa[20:]) return uuidv4 def fix_non_serializable_types(obj): """ Converts a non serializable object in an appropriate one, if it is possible and in a recursive way. If not, return the str 'No JSON Serializable object' :param obj: obj to convert """ if isinstance(obj, bytes): string = obj.decode() return fix_non_serializable_types(string) elif isinstance(obj, dict): return { fix_non_serializable_types(k): fix_non_serializable_types(v) for k, v in obj.items() } elif isinstance(obj, (tuple, list)): return [fix_non_serializable_types(elem) for elem in obj] elif isinstance(obj, str): try: dict_obj = json.loads(obj) return fix_non_serializable_types(dict_obj) except json.decoder.JSONDecodeError: return obj elif isinstance(obj, (bool, int, float)): return obj else: return def resources_available(host: str, port: int) -> bool: """ Check if a pair ip, port is available for a connection :param: `str` host :param: `int` port :return: a `bool` according to the test result. """ serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if not host or not port: return None try: serversocket.bind((host, port)) serversocket.close() return True except OSError: return False def ssl_cert(cert_file: str, key_file=None) -> bool: """ Check if a ssl certificate is valid. :param cert_file: `str` path to certificate file :param key_file: `str` path to key file :return: `bool` True for a valid certificate. """ ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE try: ssl_context.load_cert_chain(certfile=cert_file, keyfile=key_file) return True except (ssl.SSLError, NotADirectoryError, TypeError): return False def try_again(timer: int, host: str, port: int, start_error: str, loggers: dict) -> None: """ Sleep for a specific time and send log messages in case resources would not be available to start the app. :param timer: `int` time in seconds to wait (30 = DHCP delay) :param host: `str` IP address where app is trying to run :param port: `int` Host where app is trying to run :param start_error: `stṛ` Error msg to show in log. :param loggers: global logging instances to write messages (params.loggers) """ timer = timer # seconds, 30 for dhcp delay. level = 'error' msg = f"[can not init] on {host}:{port} - resources are not available" log_event(msg=start_error, level=level, loggers=loggers) log_event(msg=msg, level=level, loggers=loggers) msg = f'Server will try again in {timer} seconds' log_event(msg=msg, level=level, loggers=loggers) time.sleep(timer) def check_host(host, allowed_hosts) -> bool: """ Check if a host is in allowed_hosts :param host: `str` to check :return: `bool` """ if not allowed_hosts: return True for subnet in allowed_hosts: if ip_address(host) in subnet: return True return False def log_event(loggers: dict, msg: str, level: str = 'deb', to_file: bool = False) -> None: """ Write log messages into log file and in journal if specified. :param loggers: `dict` global logging instances to write messages (params.loggers) :param msg: `str` message to write :param level: `str` info, error, deb or warn :param to_file: `bool` write just in file if True """ if loggers.get('to_file'): logger = loggers.get('to_file') else: logger = loggers.get('to_journal') if len(logger.handlers) > 1: logger.handlers = [logger.handlers[0]] msg = f'{datetime.now()} {msg}' if level == 'info': logger.setLevel(logging.INFO) logger.info(msg) elif level == 'error': logger.setLevel(logging.ERROR) logger.error(msg) elif level == 'warn': logger.setLevel(logging.WARNING) logger.warning(msg) - elif level == 'deb': + elif level in ('deb', 'debug'): logger.setLevel(logging.DEBUG) logger.debug(msg) def fix_device_id(device_id_to_fix: str) -> str: """ Remove special characters from uuid :param device_id_to_fix: `str` uuid with special characters. :return: a `str` with fixed uuid. """ if '>' in device_id_to_fix: uuid = device_id_to_fix.split(':')[-1].replace('>', '') elif ':' in device_id_to_fix: uuid = device_id_to_fix.split(':')[-1] else: uuid = device_id_to_fix device_id = uuid return device_id def fix_platform_name(platform: str) -> str: """ Fix platform name in case its value is 'android' or 'ios', replacing it for 'firebase' and 'apple' :param platform: `str` name of platform :return: a `str` with fixed name. """ if platform in ('firebase', 'android'): return 'firebase' elif platform in ('apple', 'ios'): return 'apple' else: return platform def log_incoming_request(task: str, host: str, loggers: dict, request_id: str = None, body: dict = None, error_msg: str = None) -> None: """ Send log messages according to type of event. :param task: `str` type of event to log, can be 'log_request', 'log_success' or 'log_failure' :param host: `str` client host where request comes from :param loggers: `dict` global logging instances to write messages (params.loggers) :param request_id: `str` request ID generated on request :param body: `dict` body of request :param error_msg: `str` to show in log """ app_id = body.get('app_id') platform = body.get('platform') platform = platform if platform else '' sip_to = body.get('sip_to') device_id = body.get('device_id') device_id = fix_device_id(device_id) if device_id else None event = body.get('event') if task == 'log_request': payload = {} for item in body.keys(): value = body[item] if item in ('sip_to', 'sip_from'): item = item.split('_')[1] else: item = item.replace('_', '-') payload[item] = value - level = 'deb' + level = 'info' if sip_to: if device_id: msg = f'incoming {platform.title()} request {request_id}: ' \ f'{event} for {sip_to} using' \ f' device {device_id} from {host}: {payload}' else: msg = f'incoming {platform.title()} request {request_id}: ' \ f'{event} for {sip_to} ' \ f'from {host}: {payload}' elif device_id: msg = f'incoming {platform.title()} request {request_id}: ' \ f'{event} using' \ f' device {device_id} from {host}: {payload}' else: msg = f'incoming {platform.title()} request {request_id}: ' \ f' from {host}: {payload}' log_event(msg=msg, level=level, loggers=loggers, to_file=True) elif task == 'log_success': msg = f'incoming {platform.title()} response for {request_id}: ' \ f'push accepted' level = 'info' - log_event(msg=msg, level=level, loggers=loggers) - - level = 'deb' log_event(msg=msg, level=level, loggers=loggers, to_file=True) elif task == 'log_failure': level = 'error' resp = error_msg msg = f'incoming {platform.title()} from {host} response for {request_id}, ' \ f'push rejected: {resp}' - log_event(msg=msg, level=level, loggers=loggers) log_event(loggers=loggers, msg=msg, level=level, to_file=True)