diff --git a/pushserver/pns/firebase.py b/pushserver/pns/firebase.py index 0afaca7..0f21d37 100644 --- a/pushserver/pns/firebase.py +++ b/pushserver/pns/firebase.py @@ -1,352 +1,350 @@ import json import os import time from datetime import datetime import oauth2client import requests from oauth2client.service_account import ServiceAccountCredentials from pushserver.models.requests import WakeUpRequest from requests.adapters import HTTPAdapter from urllib3 import Retry from pushserver.pns.base import PNS, PushRequest, PlatformRegister from pushserver.resources.utils import log_event, fix_non_serializable_types class FirebasePNS(PNS): """ A Firebase Push Notification service """ def __init__(self, app_id: str, app_name: str, url_push: str, voip: bool, auth_key: str = None, auth_file: str = None): """ :param app_id `str`: Application ID. :param url_push `str`: URI to push a notification. :param voip `bool`: required for apple, `True` for voip push notification type. :param auth_key `str`: A Firebase credential for push notifications. :param auth_file `str`: A Firebase credential for push notifications. """ self.app_id = app_id self.app_name = app_name self.url_push = url_push self.voip = voip self.auth_key = auth_key self.auth_file = auth_file self.error = '' self.last_update_token = None self.access_token = self.set_access_token() def set_access_token(self) -> str: """ Retrieve a valid access token that can be used to authorize requests. :return: `str` Access token. """ if not self.auth_file: self.error = f"Cannot generated Firebase access token, " \ f"no auth file provided" return '' if not os.path.exists(self.auth_file): self.error = f"Cannot generated Firebase access token, " \ f"auth file {self.auth_file} not found" return '' scopes = ['https://www.googleapis.com/auth/firebase.messaging'] try: credentials = ServiceAccountCredentials.from_json_keyfile_name(self.auth_file, scopes) oauth2client.client.logger.setLevel('CRITICAL') access_token_info = credentials.get_access_token() self.last_update_token = datetime.now() return access_token_info.access_token except Exception as e: self.error = f"Error: cannot generated Firebase access token: {e}" return '' class FirebaseRegister(PlatformRegister): def __init__(self, app_id: str, app_name: str, voip: bool, config_dict: dict, credentials_path: str, loggers: dict): self.app_id = app_id self.app_name = app_name self.voip = voip self.credentials_path = credentials_path self.config_dict = config_dict self.loggers = loggers self.error = '' self.auth_key, self.auth_file = self.set_auths() @property def url_push(self): try: return self.config_dict['firebase_push_url'] except KeyError: self.error = 'firebase_push_url not found in applications.ini' return None def set_auths(self): auth_key = None auth_file = None try: auth_key = self.config_dict['firebase_authorization_key'] except KeyError: try: auth_file = self.config_dict['firebase_authorization_file'] if self.credentials_path: auth_file = f"{self.credentials_path}/" \ f"{auth_file}" else: pass if not os.path.exists(auth_file): self.error = f'{auth_file} - no such file' except KeyError: self.error = 'not firebase_authorization_key or ' \ 'firebase_authorization_file found in applications.ini' return auth_key, auth_file @property def pns(self) -> FirebasePNS: pns = None if self.auth_key: auth_file = '' pns = FirebasePNS(app_id=self.app_id, app_name=self.app_name, url_push=self.url_push, voip=self.voip, auth_key=self.auth_key, auth_file=auth_file) elif self.auth_file: pns = FirebasePNS(app_id=self.app_id, app_name=self.app_name, url_push=self.url_push, voip=self.voip, auth_file=self.auth_file) self.error = pns.error if pns.error else '' return pns @property def register_entries(self): if self.error: return {} return {'pns': self.pns, 'access_token': self.pns.access_token, 'auth_key': self.auth_key, 'auth_file': self.auth_file} class FirebasePushRequest(PushRequest): """ Firebase push notification request """ def __init__(self, error: str, app_name: str, app_id: str, request_id: str, headers: str, payload: dict, loggers: dict, log_remote: dict, wp_request: WakeUpRequest, register: dict): """ :param error: `str` :param app_name: `str` 'linphone' or 'payload' :param app_id: `str` bundle id :param headers: `AppleHeaders` Apple push notification headers :param payload: `ApplePayload` Apple push notification payload :param wp_request: `WakeUpRequest` :param loggers: `dict` global logging instances to write messages (params.loggers) """ self.error = error self.app_name = app_name self.app_id = app_id self.platform = 'firebase' self.request_id = request_id self.headers = headers self.payload = payload self.token = wp_request.token self.wp_request = wp_request self.loggers = loggers self.log_remote = log_remote self.pns = register['pns'] self.path = self.pns.url_push self.results = self.send_notification() def requests_retry_session(self, counter=0): """ Define parameters to retry a push notification according to media_type. :param counter: `int` (optional) if retries was necessary because of connection fails Following rfc3261 specification, an exponential backoff factor is used. More specifically: backoff = 0.5 T1 = 500ms max_retries_call = 7 time_to_live_call = 64 seconds max_retries_sms = 11 time_to_live_sms ~ 2 hours """ retries = self.retries_params[self.media_type] - counter backoff_factor = self.retries_params['bo_factor'] * 0.5 * 2 ** counter status_forcelist = tuple([status for status in range(500, 600)]) session = None session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session - def send_notification(self) -> dict: + def send_notification(self, got401=False) -> dict: """ Send a Firebase push notification """ if self.error: self.log_error() return {'code': 500, 'body': {}, 'reason': 'Internal server error'} n_retries, backoff_factor = self.retries_params(self.wp_request.media_type) counter = 0 error = False code = 500 reason = "" body = None response = None while counter <= n_retries: self.log_request(path=self.pns.url_push) try: response = requests.post(self.pns.url_push, self.payload, headers=self.headers) break except requests.exceptions.RequestException as e: error = True reason = f'connection failed: {e}' counter += 1 timer = backoff_factor * (2 ** (counter - 1)) time.sleep(timer) if counter == n_retries: reason = "maximum retries reached" elif error: try: response = self.requests_retry_session(counter). \ post(self.pns.url_push, self.payload, headers=self.headers) except Exception as x: level = 'error' msg = f"outgoing {self.platform.title()} response for " \ f"{self.request_id}, push failed: " \ f"an error occurred in {x.__class__.__name__}" log_event(loggers=self.loggers, msg=msg, level=level) try: body = response.__dict__ except (TypeError, ValueError): code = 500 reason = 'cannot parse response body' body = {} else: reason = body.get('reason') code = response.status_code for k in ('raw', 'request', 'connection', 'cookies', 'elapsed'): try: del body[k] except KeyError: pass except TypeError: break body = json.dumps(fix_non_serializable_types(body)) if isinstance(body, str): body = json.loads(body) if code == 200: try: failure = body['_content']['failure'] except KeyError: pass else: if failure == 1: reason = body['_content']['results'][0]['error'] code = 410 elif code == 404: try: payload_code = body['_content']['error']['code'] except KeyError: pass else: if payload_code == 404: code = 410 keys = list(body.keys()) for key in keys: if not body[key]: del body[key] results = {'body': body, 'code': code, 'reason': reason, 'url': self.pns.url_push, 'call_id': self.wp_request.call_id, 'token': self.token } self.results = results self.log_results() # Request is missing required authentication credential. # Expected OAuth 2 access token, login cookie or other valid authentication # credential. UNAUTHENTICATED code = results.get('code') reason = results.get('reason') - if code == 401 and reason == 'Unauthorized': + if not got401 and code == 401 and reason == 'Unauthorized': delta = datetime.now() - self.pns.last_update_token - #if delta.total_seconds() > 300: # 5 min - if True: - level = 'warn' - msg = f"outgoing {self.platform.title()} response for request " \ - f"{self.request_id} need a new access token - " \ - f"server will refresh it and try again" - log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) - # retry with a new Fireplace access token - self.pns.access_token = self.pns.set_access_token() - level = 'warn' - msg = f"outgoing {self.platform.title()} response for request " \ - f"{self.request_id} a new access token was generated - " \ - f"trying again" - log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) - - self.results = self.send_notification() + level = 'warn' + msg = f"outgoing {self.platform.title()} response for request " \ + f"{self.request_id} need a new access token - " \ + f"server will refresh it and try again" + log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) + # retry with a new Fireplace access token + self.pns.access_token = self.pns.set_access_token() + level = 'warn' + msg = f"outgoing {self.platform.title()} response for request " \ + f"{self.request_id} a new access token {self.pns.access_token} was generated - " \ + f"trying again" + log_event(loggers=self.loggers, msg=msg, level=level, to_file=True) + + self.results = self.send_notification(got401=True) return results diff --git a/pushserver/resources/utils.py b/pushserver/resources/utils.py index 749357d..f45f17b 100644 --- a/pushserver/resources/utils.py +++ b/pushserver/resources/utils.py @@ -1,273 +1,273 @@ import hashlib import json import logging import socket import ssl import time from ipaddress import ip_address from datetime import datetime __all__ = ['callid_to_uuid', 'fix_non_serializable_types', 'resources_available', 'ssl_cert', 'try_again', 'check_host', 'log_event', 'fix_device_id', 'fix_platform_name', 'log_incoming_request'] def callid_to_uuid(call_id: str) -> str: """ Generate a UUIDv4 from a callId. UUIDv4 format: five segments of seemingly random hex data, beginning with eight hex characters, followed by three four-character strings, then 12 characters at the end. These segments are separated by a “-”. :param call_id: `str` Globally unique identifier of a call. :return: a str with a uuidv4. """ hexa = hashlib.md5(call_id.encode()).hexdigest() uuidv4 = '%s-%s-%s-%s-%s' % \ (hexa[:8], hexa[8:12], hexa[12:16], hexa[16:20], hexa[20:]) return uuidv4 def fix_non_serializable_types(obj): """ Converts a non serializable object in an appropriate one, if it is possible and in a recursive way. If not, return the str 'No JSON Serializable object' :param obj: obj to convert """ if isinstance(obj, bytes): string = obj.decode() return fix_non_serializable_types(string) elif isinstance(obj, dict): return { fix_non_serializable_types(k): fix_non_serializable_types(v) for k, v in obj.items() } elif isinstance(obj, (tuple, list)): return [fix_non_serializable_types(elem) for elem in obj] elif isinstance(obj, str): try: dict_obj = json.loads(obj) return fix_non_serializable_types(dict_obj) except json.decoder.JSONDecodeError: return obj elif isinstance(obj, (bool, int, float)): return obj else: return def resources_available(host: str, port: int) -> bool: """ Check if a pair ip, port is available for a connection :param: `str` host :param: `int` port :return: a `bool` according to the test result. """ serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if not host or not port: return None try: serversocket.bind((host, port)) serversocket.close() return True except OSError: return False def ssl_cert(cert_file: str, key_file=None) -> bool: """ Check if a ssl certificate is valid. :param cert_file: `str` path to certificate file :param key_file: `str` path to key file :return: `bool` True for a valid certificate. """ ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE try: ssl_context.load_cert_chain(certfile=cert_file, keyfile=key_file) return True except (ssl.SSLError, NotADirectoryError, TypeError): return False def try_again(timer: int, host: str, port: int, start_error: str, loggers: dict) -> None: """ Sleep for a specific time and send log messages in case resources would not be available to start the app. :param timer: `int` time in seconds to wait (30 = DHCP delay) :param host: `str` IP address where app is trying to run :param port: `int` Host where app is trying to run :param start_error: `stṛ` Error msg to show in log. :param loggers: global logging instances to write messages (params.loggers) """ timer = timer # seconds, 30 for dhcp delay. level = 'error' msg = f"[can not init] on {host}:{port} - resources are not available" log_event(msg=start_error, level=level, loggers=loggers) log_event(msg=msg, level=level, loggers=loggers) msg = f'Server will try again in {timer} seconds' log_event(msg=msg, level=level, loggers=loggers) time.sleep(timer) def check_host(host, allowed_hosts) -> bool: """ Check if a host is in allowed_hosts :param host: `str` to check :return: `bool` """ if not allowed_hosts: return True for subnet in allowed_hosts: if ip_address(host) in subnet: return True return False def log_event(loggers: dict, msg: str, level: str = 'deb', to_file: bool = False) -> None: """ Write log messages into log file and in journal if specified. :param loggers: `dict` global logging instances to write messages (params.loggers) :param msg: `str` message to write :param level: `str` info, error, deb or warn :param to_file: `bool` write just in file if True """ - + if loggers.get('to_file'): logger = loggers.get('to_file') else: logger = loggers.get('to_journal') if len(logger.handlers) > 1: logger.handlers = [logger.handlers[0]] msg = f'{datetime.now()} {msg}' if level == 'info': logger.setLevel(logging.INFO) logger.info(msg) elif level == 'error': logger.setLevel(logging.ERROR) logger.error(msg) elif level == 'warn': logger.setLevel(logging.WARNING) logger.warning(msg) elif level in ('deb', 'debug'): logger.setLevel(logging.DEBUG) logger.debug(msg) def fix_device_id(device_id_to_fix: str) -> str: """ Remove special characters from uuid :param device_id_to_fix: `str` uuid with special characters. :return: a `str` with fixed uuid. """ if '>' in device_id_to_fix: uuid = device_id_to_fix.split(':')[-1].replace('>', '') elif ':' in device_id_to_fix: uuid = device_id_to_fix.split(':')[-1] else: uuid = device_id_to_fix device_id = uuid return device_id def fix_platform_name(platform: str) -> str: """ Fix platform name in case its value is 'android' or 'ios', replacing it for 'firebase' and 'apple' :param platform: `str` name of platform :return: a `str` with fixed name. """ if platform in ('firebase', 'android'): return 'firebase' elif platform in ('apple', 'ios'): return 'apple' else: return platform def log_incoming_request(task: str, host: str, loggers: dict, request_id: str = None, body: dict = None, error_msg: str = None) -> None: """ Send log messages according to type of event. :param task: `str` type of event to log, can be 'log_request', 'log_success' or 'log_failure' :param host: `str` client host where request comes from :param loggers: `dict` global logging instances to write messages (params.loggers) :param request_id: `str` request ID generated on request :param body: `dict` body of request :param error_msg: `str` to show in log """ app_id = body.get('app_id') platform = body.get('platform') platform = platform if platform else '' sip_to = body.get('sip_to') device_id = body.get('device_id') device_id = fix_device_id(device_id) if device_id else None event = body.get('event') if task == 'log_request': payload = {} for item in body.keys(): value = body[item] if item in ('sip_to', 'sip_from'): item = item.split('_')[1] else: item = item.replace('_', '-') payload[item] = value level = 'info' if sip_to: if device_id: msg = f'incoming {platform.title()} request {request_id}: ' \ f'{event} for {sip_to} using' \ f' device {device_id} from {host}: {payload}' else: msg = f'incoming {platform.title()} request {request_id}: ' \ f'{event} for {sip_to} ' \ f'from {host}: {payload}' elif device_id: msg = f'incoming {platform.title()} request {request_id}: ' \ f'{event} using' \ f' device {device_id} from {host}: {payload}' else: msg = f'incoming {platform.title()} request {request_id}: ' \ f' from {host}: {payload}' log_event(msg=msg, level=level, loggers=loggers, to_file=True) elif task == 'log_success': msg = f'incoming {platform.title()} response for {request_id}: ' \ f'push accepted' level = 'info' log_event(msg=msg, level=level, loggers=loggers, to_file=True) elif task == 'log_failure': level = 'error' resp = error_msg msg = f'incoming {platform.title()} from {host} response for {request_id}, ' \ f'push rejected: {resp}' log_event(loggers=loggers, msg=msg, level=level, to_file=True)