diff --git a/sylk-db-maintenance b/sylk-db-maintenance index 5eb749f..7190e50 100755 --- a/sylk-db-maintenance +++ b/sylk-db-maintenance @@ -1,357 +1,742 @@ #!/usr/bin/env python3 import argparse import sys import os from application import log from application.process import process CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine.query import LWTException from cassandra.cluster import Cluster, Session, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import DCAwareRoundRobinPolicy from cassandra.cqlengine.management import sync_table, create_keyspace_simple class colors: if sys.stdout.isatty(): TGREEN = '\033[32m' ENDC = '\033[m' BOLD = '\033[1m' else: TGREEN = '' ENDC = '' BOLD = '' class parse_level: def format(record): if record.levelname != 'INFO': return f'{record.levelname:<8s} ' else: return f"{' ......':<8s} " def ask(question): try: while (res := input(colors.TGREEN + f"{'>>':>8s} {question} (Enter y/n) " + colors.ENDC).lower()) not in {"y", "n"}:pass except KeyboardInterrupt: sys.exit(1) if res == "y": return True return False +def ask_num(question, num_range): + while True: + try: + res = input(colors.TGREEN + f"{'>>':>8s} {question} (Enter a number [0-{num_range}] or all) " + colors.ENDC) + except (EOFError, KeyboardInterrupt): + print('\n') + sys.exit(1) + try: + res = int(res) + except ValueError: + if res == 'all': + break + log.info('Enter a valid number') + continue + else: + if res not in range(0, num_range): + log.info('Enter a valid number') + continue + break + return res + + def bold(string): return colors.BOLD + string + colors.ENDC +def green(string): + return colors.TGREEN + string + colors.ENDC + + +def bold_green(string): + return colors.BOLD + colors.TGREEN + string + colors.ENDC + + def db_init(): log.info(f"\n{' SylkServer - Cassandra database create/maintenance ':*^80s}\n") log.warn('Please note, this script can potentially destroy the data in the configured Cassandra keyspace.') log.warn('Make sure you have a backup if you already have data in the Cassandra cluster') if not ask("Would you like to continue?"): sys.exit() os.environ['CQLENG_ALLOW_SCHEMA_MANAGEMENT'] = '1' from sylk.applications.webrtcgateway.configuration import CassandraConfig from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table if CASSANDRA_MODULES_AVAILABLE: if CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) if CassandraConfig.keyspace in cluster.metadata.keyspaces: log.info(f"Keyspace {bold(CassandraConfig.keyspace)} is already on the server.") else: log.warning(f"Keyspace {bold(CassandraConfig.keyspace)} is {bold('not')} defined on the server") if ask("Would you like to create the keyspace with SimpleStrategy?"): create_keyspace_simple(CassandraConfig.keyspace, 1) else: sys.exit(1) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey] for table in tables: table.__keyspace__ = CassandraConfig.keyspace if table.__table_name__ in cluster.metadata.keyspaces[CassandraConfig.keyspace].tables: log.info(f'Table {bold(table.__table_name__)} is in the keyspace {bold(CassandraConfig.keyspace)}') if ask("Would you like to update the schema with the model?"): sync_table(table) else: log.info(f'Table {bold(table.__table_name__)} is not the keyspace {bold(CassandraConfig.keyspace)}') if ask("Would you like to create the table from the model?"): sync_table(table) else: log.warning("Cassandra cluster contact points are not set, please adjust webrtcgateway.ini'") sys.exit() else: log.warning('The python Cassandra drivers are not installed, please make sure they are installed') sys.exit() def remove_account(account): log.info(f"\n{' SylkServer - Remove account ':*^80s}\n") log.warn(f'Please note, will destroy {bold("ALL")} data of the account {account}') if not ask("Would you like to continue?"): sys.exit() from sylk.applications.webrtcgateway.configuration import CassandraConfig from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey] for table in tables: table.__keyspace__ = CassandraConfig.keyspace try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: size = len(messages) for message in messages: message.delete() log.info(f'Removed {size} messages from {account}') try: accounts = ChatAccount.objects(ChatAccount.account == account) except LWTException: pass else: size = len(accounts) for acc in accounts: acc.delete() log.info(f'Removed {account} from accounts') username, domain = account.split('@') try: push_tokens = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain) except LWTException: pass else: size = len(push_tokens) for token in push_tokens: token.delete() log.info(f'Removed {size} push tokens from {account}') try: public_keys = PublicKey.objects(PublicKey.account == account) except LWTException: pass else: size = len(public_keys) for key in public_keys: key.delete() pass log.info(f'Removed public Key from {account}') else: log.warning('The python Cassandra drivers are not installed.') log.warning('We will use the JSON and pickle backend to wipe the account data') if not ask("Would you like to continue?"): sys.exit() from sylk.applications.webrtcgateway.storage import TokenStorage, MessageStorage from sipsimple.threading import ThreadManager import time log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 storage = TokenStorage() storage.load() time.sleep(.5) storage.removeAll(account) storage = MessageStorage() storage.load() time.sleep(.5) storage.remove_account(account) storage.remove_public_key(account) storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') if os.path.exists(os.path.join(storage_path, account[0], f'{account}_messages.json')): os.remove(os.path.join(storage_path, account[0], f'{account}_messages.json')) ThreadManager().stop() def remove_messages(account, contact): log.info(f"\n{' SylkServer - Remove messages ':*^80s}\n") if contact is not None: log.warn(f'Please note, will destroy the messages from {contact} for the account {account} ') else: log.warn(f'Please note, will destroy {bold("ALL")} messages of the account {account}') if not ask("Would you like to continue?"): sys.exit() from sylk.applications.webrtcgateway.configuration import CassandraConfig from sylk.applications.webrtcgateway.models.storage.cassandra import ChatMessage from sylk.applications.webrtcgateway.storage import MessageStorage from sipsimple.threading import ThreadManager log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 if contact is not None: storage = MessageStorage() storage.load() storage.removeChat(account, contact) ThreadManager().stop() log.info(f'Messages from {contact} for {account} removed') sys.exit() configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) if configuration.files: log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(), request_timeout=60 ) cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) try: session = cluster.connect() except NoHostAvailable as e: log.warning("Can't connect to Cassandra cluster") sys.exit() else: connection.set_session(session) keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') tables = [ChatMessage] for table in tables: table.__keyspace__ = CassandraConfig.keyspace try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: size = len(messages) for message in messages: message.delete() log.info(f'Removed {size} messages from {account}') else: storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') if os.path.exists(os.path.join(storage_path, account[0], f'{account}_messages.json')): os.remove(os.path.join(storage_path, account[0], f'{account}_messages.json')) - log.info('Messages for {account} removed') + log.info(f'Messages for {account} removed') + + +def remove_data(account, data_type): + data_types = ['push_token', 'api_token', 'public_key'] + if data_type not in data_types: + return + + log.info(f"\n{' SylkServer - Remove '+ data_type + ' ':*^80s}\n") + log.warn(f'Please note, this will remove the public key the account {account}') + if not ask("Would you like to continue?"): + sys.exit() + + from sylk.applications.webrtcgateway.configuration import CassandraConfig + from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey + + log.Formatter.prefix_format = parse_level + log.Formatter.prefix_length = 0 + + configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) + if configuration.files: + log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) + + if CassandraConfig.push_tokens_table: + PushTokens.__table_name__ = CassandraConfig.push_tokens_table + + if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: + profile = ExecutionProfile( + load_balancing_policy=DCAwareRoundRobinPolicy(), + request_timeout=60 + ) + cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) + try: + session = cluster.connect() + except NoHostAvailable as e: + log.warning("Can't connect to Cassandra cluster") + sys.exit() + else: + connection.set_session(session) + keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] + log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') + tables = [PushTokens, ChatAccount, PublicKey] + + for table in tables: + table.__keyspace__ = CassandraConfig.keyspace + + if data_type == 'api_token': + try: + accounts = ChatAccount.objects(ChatAccount.account == account) + except LWTException: + pass + else: + size = len(accounts) + for acc in accounts: + acc.api_token = '' + acc.save(); + log.info(f'Api token for {account} removed') + return + + if data_type == 'push_token': + username, domain = account.split('@') + try: + push_tokens = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain) + except LWTException: + pass + else: + size = len(push_tokens) + log.info(f'{bold(str(size))} push token(s) stored\n') + if size >= 2: + for idx, token in enumerate(push_tokens): + log.info(f'[{idx}]: {"App:":>13s} {token.app_id}\n{"Device ID:":>18s} {token.device_id}\n{"Token:":>18s} {token.device_token[:20]} ...\n') + num = ask_num("Pick a token to delete", size-1) + if num == 'all': + for token in push_tokens: + log.info(f'Removed token {token.device_token[:20]} from {account}') + token.delete() + else: + log.info(f'Removed {push_tokens[num].device_token[:20]} from {account}') + push_tokens[num].delete() + else: + for token in push_tokens: + log.info(f'Removed token {token.device_token[:20]} from {account}') + token.delete() + return + + if data_type == 'public_key': + try: + public_keys = PublicKey.objects(PublicKey.account == account) + except LWTException: + pass + else: + size = len(public_keys) + for key in public_keys: + key.delete() + pass + log.info(f'Removed public Key from {account}') + return + else: + log.warning('The python Cassandra drivers are not installed.') + log.warning('We will use the JSON and pickle backend to wipe the account data') + if not ask("Would you like to continue?"): + sys.exit() + + from sylk.applications.webrtcgateway.storage import TokenStorage, MessageStorage + from sipsimple.threading import ThreadManager + import time + log.Formatter.prefix_format = parse_level + log.Formatter.prefix_length = 0 + if data_type == 'api_token': + storage = MessageStorage() + storage.load() + time.sleep(.5) + storage.remove_account_token(account) + log.info(f'Api token for {account} removed') + return + + if data_type == 'push_token': + storage = TokenStorage() + storage.load() + time.sleep(.5) + push_tokens = storage[account] + if push_tokens: + size = len(push_tokens) + log.info(f'{bold(str(size))} push token(s) stored\n') + if size >= 2: + for idx, token in enumerate(push_tokens): + token = push_tokens[token] + log.info(f"[{idx}]: {'App:':>13s} {token['app_id']}\n{'Device ID:':>18s} {token['device_id']}\n{'Token:':>18s} {token['device_token'][:20]} ...\n") + + num = ask_num("Pick a token to delete", size - 1) + if num == 'all': + for token in push_tokens: + token = push_tokens[token] + log.info(f"Removed token {token['device_token'][:20]} from {account}") + storage.remove(account, token['app_id'], token['device_id']) + else: + for idx, token in enumerate(push_tokens): + token = push_tokens[token] + if idx == num: + log.info(f"Removed {token['device_token'][:20]} from {account}") + storage.remove(account, token['app_id'], token['device_id']) + else: + for token in push_tokens: + token = push_tokens[token] + log.info(f"Removed token {token['device_token'][:20]} from {account}") + storage.remove(account, token['app_id'], token['device_id']) + return + + if data_type == 'public_key': + storage = TokenStorage() + storage.load() + time.sleep(.5) + storage.remove_public_key(account) + log.info(f'Removed public Key from {account}') + return + + +def show(account): + log.info(f"\n{' SylkServer - Show account data ':*^80s}\n") + + from sylk.applications.webrtcgateway.configuration import CassandraConfig + from sylk.applications.webrtcgateway.models.storage.cassandra import PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey + + log.Formatter.prefix_format = parse_level + log.Formatter.prefix_length = 0 + + configuration = CassandraConfig.__cfgtype__(CassandraConfig.__cfgfile__) + if configuration.files: + log.info('Reading storage configuration from {}'.format(', '.join(configuration.files))) + + if CassandraConfig.push_tokens_table: + PushTokens.__table_name__ = CassandraConfig.push_tokens_table + + if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: + profile = ExecutionProfile( + load_balancing_policy=DCAwareRoundRobinPolicy(), + request_timeout=60 + ) + cluster = Cluster(CassandraConfig.cluster_contact_points, protocol_version=4, execution_profiles={EXEC_PROFILE_DEFAULT: profile}) + try: + session = cluster.connect() + except NoHostAvailable as e: + log.warning("Can't connect to Cassandra cluster") + sys.exit() + else: + connection.set_session(session) + keyspace = cluster.metadata.keyspaces[CassandraConfig.keyspace] + log.info(f'Server has keyspace {bold(keyspace.name)} with replication strategy: {keyspace.replication_strategy.name}') + tables = [PushTokens, ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey] + + for table in tables: + table.__keyspace__ = CassandraConfig.keyspace + + log.info(f"\n{'-'*80}\n{ account :^80s}\n{'-'*80}") + + try: + accounts = ChatAccount.objects(ChatAccount.account == account) + except LWTException: + pass + else: + if not len(accounts): + log.info(f'Message storage is {bold("not")} enabled') + else: + log.info(f'{green("Message storage is")} {bold_green("enabled")}') + + for acc in accounts: + if acc.api_token: + row = session.execute(f'SELECT TTL(api_token) as ttl FROM {ChatAccount.__keyspace__}.{ChatAccount.__table_name__} where account=\'{acc.account}\'') + log.info(f"{'API token for replication:':26s} {bold(str(acc.api_token))}\n{'TTL:':>26s} {row.one()['ttl']}\n") + + log.info(f'{"Last login at: "} {acc.last_login}\n') + try: + messages = ChatMessage.objects(ChatMessage.account == account) + except LWTException: + pass + else: + size = len(messages) + log.info(f'{bold(str(size))} messages stored') + text_messages = [message for message in messages if message.content_type.startswith('text')] + imdn_messages = [message for message in messages if message.content_type.startswith('message/imdn')] + other_messages = [message for message in messages if not message.content_type.startswith('message/imdn') and not message.content_type.startswith('text')] + + log.info(f'\n{"Text Messages:":>23s} {len(text_messages)}\n{"IMDN messages:":>23s} {len(imdn_messages)}\n{"Other Messages:":>23s} {len(other_messages)}\n') + unread_messages = [message for message in messages if ((message.content_type == 'text/plain' or message.content_type == 'text/html') + and message.direction == 'incoming' and message.contact != account + and 'display' in message.disposition)] + log.info(f'{"Unread text Messages:":>23s} {len(unread_messages)}\n') + username, domain = account.split('@') + try: + push_tokens = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain) + except LWTException: + pass + else: + size = len(push_tokens) + log.info(f'{bold(str(size))} push token(s) stored\n') + for token in push_tokens: + log.info(f'{"App:":>18s} {token.app_id}\n{"Device ID:":>18s} {token.device_id}\n{"Token:":>18s} {token.device_token[:20]} ...\n') + + # from sylk.applications.webrtcgateway.storage import FileTokenStorage + # storage = FileTokenStorage() + # test = {} + # test['pn_device'] = token.device_id + # test['pn_silent'] = token.silent + # test['pn_tok'] = f'{token.device_token}-{token.background_token}' + # test['pn_app'] = token.app_id + # test['pn_type'] = token.platform + # storage.load() + # import time + # time.sleep(.5) + # print(test) + # storage.add(account, test, token.user_agent) + + + try: + public_keys = PublicKey.objects(PublicKey.account == account) + except LWTException: + pass + else: + size = len(public_keys) + log.info(f'{bold(str(size))} public key(s) stored\n') + log.Formatter.prefix_format = parse_level_indent + log.Formatter.prefix_length = 0 + for key in public_keys: + log.info(f'{key.public_key}') + log.Formatter.prefix_format = parse_level + log.Formatter.prefix_length = 0 + else: + log.warning('The python Cassandra drivers are not installed.') + log.warning('We will use the JSON and pickle backend for the account data') + import time + from twisted.internet import defer + from sylk.applications.webrtcgateway.storage import TokenStorage, MessageStorage + from sipsimple.threading import ThreadManager + + log.info(f"\n{'-'*80}\n{ account :^80s}\n{'-'*80}") + + def print_messages(messages): + size = len(messages) + log.info(f'{bold(str(size))} messages stored') + text_messages = [message for message in messages if message['content_type'].startswith('text')] + imdn_messages = [message for message in messages if message['content_type'].startswith('message/imdn')] + other_messages = [message for message in messages if not message['content_type'].startswith('message/imdn') and not message['content_type'].startswith('text')] + + log.info(f'\n{"Text Messages:":>23s} {len(text_messages)}\n{"IMDN messages:":>23s} {len(imdn_messages)}\n{"Other Messages:":>23s} {len(other_messages)}\n') + + unread_messages = [message for message in messages if ((message['content_type'] == 'text/plain' or message['content_type'] == 'text/html') + and message['direction'] == 'incoming' and message['contact'] != account + and 'display' in message.disposition)] + log.info(f'{"Unread text Messages:":>23s} {len(unread_messages)}\n') + + storage = MessageStorage() + storage.load() + time.sleep(.5) + acc = storage.get_account(account) + public_key = storage.get_public_key(account) + if not acc: + log.info(f'Message storage is {bold("not")} enabled') + else: + log.info(f'{green("Message storage is")} {bold_green("enabled")}') + token = storage.get_account_token(account) + if token: + ttl = storage._accounts[account]['token_expire'] + log.info(f"{'API token for replication:':26s} {bold(str(token))}\n{'TTL:':>26s} {ttl}") + + last_login = storage._accounts[account]['last_login'] + log.info(f'{"Last login at: "} {last_login}\n') + + msg = storage[[account, None]] + if isinstance(msg, defer.Deferred): + msg.addCallback(lambda result: print_messages(result)) + + storage = TokenStorage() + storage.load() + time.sleep(.5) + push_tokens = storage[account] + if push_tokens: + size = len(push_tokens) + log.info(f'{bold(str(size))} push token(s) stored\n') + for token in push_tokens: + token = push_tokens[token] + log.info(f"{'App:':>18s} {token['app_id']}\n{'Device ID:':>18s} {token['device_id']}\n{'Token:':>18s} {token['device_token'][:20]} ...\n") + + size = 0 + if public_key: + size = 1 + + log.info(f'{bold(str(size))} public key(s) stored\n') + log.Formatter.prefix_format = parse_level_indent + log.Formatter.prefix_length = 0 + if public_key: + log.info(f'{public_key}') + + log.Formatter.prefix_format = parse_level + log.Formatter.prefix_length = 0 + time.sleep(1) + ThreadManager().stop() if __name__ == '__main__': process.configuration.subdirectory = 'sylkserver' parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-h', '--help', action='help', default=argparse.SUPPRESS, help='Show this help message and exit.') parser.add_argument("--config-dir", dest='config_directory', default=None, metavar='PATH', help="Set a config directory.") subparsers = parser.add_subparsers(dest='action') dbinit = subparsers.add_parser('dbinit', help='initialize/Update database (default)') - remove = subparsers.add_parser('remove', help='remove all data from an account') - remove.add_argument('account', help='Account') + remove = subparsers.add_parser('remove', help='remove data from an account') + subparsers_remove = remove.add_subparsers(dest='remove_action') - remove_messages_sub = subparsers.add_parser('remove_messages', help='remove all messages from an account') + remove_all = subparsers_remove.add_parser('all', help='remove all data from an account') + remove_all.add_argument('account', help='Account') + + remove_messages_sub = subparsers_remove.add_parser('messages', help='remove all messages from an account') remove_messages_sub.add_argument('account', help='Account') remove_messages_sub.add_argument('contact', nargs='?', help='optional contact to remove messages from') + remove_api_token = subparsers_remove.add_parser('api_token', help='remove api token from an account') + remove_api_token.add_argument('account', help='Account') + + remove_push_token = subparsers_remove.add_parser('push_token', help='remove push token(s) from an account') + remove_push_token.add_argument('account', help='Account') + + remove_public_key = subparsers_remove.add_parser('public_key', help='remove public key from an account') + remove_public_key.add_argument('account', help='Account') + + show_data = subparsers.add_parser('show', help='show all data from an account') + show_data.add_argument('account', help='Account') + options = parser.parse_args() if options.config_directory is not None: process.configuration.local_directory = options.config_directory log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 from sylk.server import ServerConfig log.Formatter.prefix_format = parse_level log.Formatter.prefix_length = 0 if not options.action: parser.print_help() sys.exit() configuration = ServerConfig.__cfgtype__(ServerConfig.__cfgfile__) if configuration.files: log.info('Reading configuration from {}'.format(', '.join(configuration.files))) else: log.info('Not reading any configuration files (using internal defaults)') if options.action == 'remove': - remove_account(options.account) - elif options.action == 'remove_messages': - remove_messages(options.account, options.contact) + if options.remove_action == 'all': + remove_account(options.account) + elif options.remove_action == 'remove_messages': + remove_messages(options.account, options.contact) + elif options.remove_action in ['api_token', 'push_token', 'public_key']: + remove_data(options.account, options.remove_action) + elif options.action == 'show': + show(options.account) elif options.action == 'dbinit': db_init() diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py index 1b51da3..b017c9b 100644 --- a/sylk/applications/webrtcgateway/storage.py +++ b/sylk/applications/webrtcgateway/storage.py @@ -1,702 +1,707 @@ import datetime import json import pickle as pickle import os from application.python.types import Singleton from application.system import makedirs from collections import defaultdict from sipsimple.threading import run_in_thread from sipsimple.util import ISOTimestamp from twisted.internet import defer from types import SimpleNamespace from sylk.configuration import ServerConfig from .configuration import CassandraConfig from .errors import StorageError from .logger import log __all__ = 'TokenStorage', # TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed, # and thus it's ok to scrap it after a reasonable amount of time. CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra import InvalidRequest from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.query import LWTException from cassandra.cluster import NoHostAvailable from cassandra.policies import DCAwareRoundRobinPolicy from .models.storage.cassandra import PushTokens from .models.storage.cassandra import ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey if CassandraConfig.push_tokens_table: PushTokens.__table_name__ = CassandraConfig.push_tokens_table class FileTokenStorage(object): def __init__(self): self._tokens = defaultdict() @run_in_thread('file-io') def _save(self): with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) @run_in_thread('file-io') def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params, user_agent): try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: token = contact_params['pn_tok'] background_token = None data = { 'device_id': contact_params['pn_device'], 'platform': contact_params['pn_type'], 'silent': contact_params['pn_silent'], 'app_id': contact_params['pn_app'], 'user_agent': user_agent, 'background_token': background_token, 'device_token': token } key = f"{data['app_id']}-{data['device_id']}" if account in self._tokens: if isinstance(self._tokens[account], set): self._tokens[account] = {} # Remove old storage layout based on device id if contact_params['pn_device'] in self._tokens[account]: del self._tokens[account][contact_params['pn_device']] # Remove old storage layout based on token if token in self._tokens[account]: del self._tokens[account][token] # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: del self._tokens[account][contact_params['pn_tok']] except (IndexError, KeyError): pass self._tokens[account][key] = data else: self._tokens[account] = {key: data} self._save() def remove(self, account, app_id, device_id): key = f'{app_id}-{device_id}' try: del self._tokens[account][key] except KeyError: pass self._save() def removeAll(self, account): try: del self._tokens[account] except KeyError: pass self._save() class CassandraConnection(object, metaclass=Singleton): @run_in_thread('cassandra') def __init__(self): try: self.session = connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4) except NoHostAvailable: log.error("Not able to connect to any of the Cassandra contact points") raise StorageError class CassandraTokenStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(key): username, domain = key.split('@', 1) tokens = {} for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.device_token, 'platform': device.platform, 'silent': device.silent, 'app_id': device.app_id, 'background_token': device.background_token} deferred.callback(tokens) return tokens query_tokens(key) return deferred @run_in_thread('cassandra') def add(self, account, contact_params, user_agent): username, domain = account.split('@', 1) token = contact_params['pn_tok'] background_token = None if contact_params['pn_type'] == 'ios': try: (token, background_token) = contact_params['pn_tok'].split('-') except ValueError: pass # Remove old unsplit token if exists, can be removed if all tokens are stored split if background_token is not None: try: old_token = PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain)[0] except (IndexError, LWTException): pass else: if old_token.device_token == contact_params['pn_tok']: old_token.delete() try: PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'], device_token=token, background_token=background_token, platform=contact_params['pn_type'], silent=contact_params['pn_silent'], app_id=contact_params['pn_app'], user_agent=user_agent) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing token failed: {e}') raise StorageError @run_in_thread('cassandra') def remove(self, account, app_id, device_id): username, domain = account.split('@', 1) try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete() except LWTException: pass class FileMessageStorage(object): def __init__(self): self._public_keys = defaultdict() self._accounts = defaultdict() self._storage_path = os.path.join(ServerConfig.spool_dir, 'conversations') def _json_dateconverter(self, o): if isinstance(o, datetime.datetime): return o.__str__() @run_in_thread('file-io') def _save(self): with open(os.path.join(self._storage_path, 'accounts.json'), 'w+') as f: json.dump(self._accounts, f, default=self._json_dateconverter) with open(os.path.join(self._storage_path, 'public_keys.json'), 'w+') as f: json.dump(self._public_keys, f, default=self._json_dateconverter) def _save_messages(self, account, messages): with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'w+') as f: json.dump(messages, f, default=self._json_dateconverter, indent=4) def _save_id_by_timestamp(self, account, ids): with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'w+') as f: json.dump(ids, f, default=self._json_dateconverter) @run_in_thread('file-io') def load(self): makedirs(self._storage_path) try: accounts = json.load(open(os.path.join(self._storage_path, 'accounts.json'), 'r')) except (OSError, IOError): pass else: self._accounts.update(accounts) try: public_keys = json.load(open(os.path.join(self._storage_path, 'public_keys.json'), 'r')) except (OSError, IOError): pass else: self._public_keys.update(public_keys) def _load_id_by_timestamp(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'r') as f: data = json.load(f) return data except (OSError, IOError) as e: raise e def _load_messages(self, account): try: with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'r') as f: messages = json.load(f) return messages except (OSError, IOError) as e: raise e def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('file-io') def query(account, message_id): messages = [] timestamp = None try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): deferred.callback(messages) return if message_id is not None: try: timestamp = id_by_timestamp[message_id] except KeyError: deferred.callback(messages) return else: timestamp = ISOTimestamp(timestamp) try: messages = self._load_messages(account) except (OSError, IOError): deferred.callback(messages) return else: if timestamp is not None: messages = [message for message in messages if ISOTimestamp(message['created_at']) > timestamp] deferred.callback(messages) else: deferred.callback(messages) query(key[0], key[1]) return deferred def get_account(self, account): try: return SimpleNamespace(account=account, **self._accounts[account]) except KeyError: return None def get_account_token(self, account): try: if datetime.datetime.now() < datetime.datetime.fromisoformat(self._accounts[account]['token_expire']): return self._accounts[account]['api_token'] return None except KeyError: return None + def remove_account_token(self, account): + if account in self._accounts: + self._accounts[account]['api_token'] = '' + self._save() + def add_account(self, account): timestamp = datetime.datetime.now() if account not in self._accounts: self._accounts[account] = {'last_login': timestamp} self._save() else: self._accounts[account]['last_login'] = timestamp self._save() def get_public_key(self, account): try: return self._public_keys[account]['content'] except KeyError: return None def remove_account(self, account): if account in self._accounts: del self._accounts[account] self._save() - def remove_public_key_account(self, account): - if account in self._accounts: + def remove_public_key(self, account): + if account in self._public_keys: del self.public_keys[account] self._save() def add_account_token(self, account, token): timestamp = datetime.datetime.now() if account not in self._accounts: log.error(f'Updating API token for {account} failed') return StorageError self._accounts[account]['api_token'] = token self._accounts[account]['token_expire'] = timestamp + datetime.timedelta(seconds=26784000) self._save() @run_in_thread('file-io') def mark_conversation_read(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): return else: for idx, message in enumerate(messages): if message['contact'] == contact: message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) @run_in_thread('file-io') def update(self, account, state, message_id): try: messages = self._load_messages(account) except (OSError, IOError): return try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): return else: try: timestamp = id_by_timestamp[message_id] except KeyError: return else: for idx, message in enumerate(messages): if message['created_at'] == timestamp and message['message_id'] == message_id: if message['state'] != 'received': message['state'] = state if state == 'delivered': try: message['disposition'].remove('positive-delivery') except ValueError: pass elif state == 'displayed': message['disposition'] = [] messages[idx] = message self._save_messages(account, messages) break @run_in_thread('file-io') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() messages = [] id_by_timestamp = {} try: messages = self._load_messages(account) except (OSError, IOError): makedirs(os.path.join(self._storage_path, account[0])) timestamp_not_found = True try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): pass else: try: created_at = id_by_timestamp[message_id] except KeyError: pass else: timestamp_not_found = False if content_type == 'message/imdn+json': return items = [n for n in messages if n['created_at'] == created_at and n['message_id'] == message_id] if len(items) == 1: return if content_type == 'text/pgp-public-key': key = account if direction == "outgoing" else contact self._public_keys[key] = {'content': content, 'created_at': timestamp} self._save() return if content_type == 'text/pgp-private-key': if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) return if not isinstance(disposition_notification, list) and disposition_notification == '': disposition_notification = [] message = {'account': account, 'direction': direction, 'contact': contact, 'content_type': content_type, 'content': content, 'created_at': timestamp, 'message_id': message_id, 'disposition': disposition_notification, 'state': state, 'msg_timestamp': msg_timestamp} messages.append(message) self._save_messages(account, messages) if timestamp_not_found: id_by_timestamp[message_id] = timestamp self._save_id_by_timestamp(account, id_by_timestamp) @run_in_thread('file-io') def removeChat(self, account, contact): try: messages = self._load_messages(account) except (OSError, IOError): pass else: messages[:] = [message for message in messages if message['contact'] != contact] self._save_messages(account, messages) @run_in_thread('file-io') def removeMessage(self, account, message_id): try: id_by_timestamp = self._load_id_by_timestamp(account) except (OSError, IOError): return else: try: timestamp = id_by_timestamp[message_id] except KeyError: return else: try: messages = self._load_messages(account) except (OSError, IOError): return else: item = [n for n in messages if n['created_at'] == timestamp and n['message_id'] == message_id] if len(item) == 1: messages.remove(item[0]) self._save_messages(account, messages) class CassandraMessageStorage(object): @run_in_thread('cassandra') def load(self): CassandraConnection() def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_messages(key, message_id): messages = [] try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except (IndexError, InvalidRequest): timestamp = datetime.datetime.now() - datetime.timedelta(days=3) else: timestamp = timestamp.created_at for message in ChatMessage.objects(ChatMessage.account == key, ChatMessage.created_at > timestamp): messages.append(message) deferred.callback(messages) query_messages(key[0], key[1]) return deferred def get_account(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account) query_tokens(account) return deferred def get_account_token(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(account): try: chat_account = ChatAccount.objects(ChatAccount.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(chat_account.api_token) query_tokens(account) return deferred @run_in_thread('cassandra') def add_account(self, account): timestamp = datetime.datetime.now() try: ChatAccount.create(account=account, last_login=timestamp) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing account failed: {e}') raise StorageError @run_in_thread('cassandra') def add_account_token(self, account, token): try: chat_account = ChatAccount.objects(account=account)[0] chat_account.ttl(2678400).update(api_token=token) except IndexError: log.error(f'Updating API token for {account} failed') raise StorageError @run_in_thread('cassandra') def mark_conversation_read(self, account, contact): for message in ChatMessage.objects(ChatMessage.account == account): if message.contact == contact: if message.content_type == 'application/sylk-conversation-read': message.delete() else: message.disposition.clear() message.save() def get_public_key(self, account): deferred = defer.Deferred() @run_in_thread('cassandra') def query_key(account): try: public_key = PublicKey.objects(PublicKey.account == account)[0] except (IndexError, InvalidRequest): deferred.callback(None) else: deferred.callback(public_key.public_key) query_key(account) return deferred @run_in_thread('cassandra') def update(self, account, state, message_id): try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: return else: try: message = ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == timestamp.created_at, ChatMessage.message_id == message_id)[0] except IndexError: pass else: if message.state != 'received': message.state = state if state == 'delivered': try: message.disposition.remove('positive-delivery') except ValueError: pass elif state == 'displayed': message.disposition.clear() message.save() @run_in_thread('cassandra') def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None): try: msg_timestamp = datetime.datetime.fromisoformat(timestamp) except ValueError: msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z") timestamp = datetime.datetime.now() timestamp_not_found = True try: created_at = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: pass else: timestamp_not_found = False timestamp = created_at.created_at if content_type == 'message/imdn+json': return if ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == created_at.created_at, ChatMessage.message_id == message_id).count() != 0: return if content_type == 'text/pgp-public-key': try: PublicKey.create(account=account if direction == "outgoing" else contact, public_key=content, created_at=timestamp) ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing public key failed: {e}') raise StorageError else: return if content_type == 'text/pgp-private-key': if timestamp_not_found: try: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest): pass return try: ChatMessage.create(account=account, direction=direction, contact=contact, content_type=content_type, content=content, created_at=timestamp, message_id=message_id, disposition=disposition_notification, state=state, msg_timestamp=msg_timestamp) if timestamp_not_found: ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id) except (CQLEngineException, InvalidRequest) as e: log.error(f'Storing message failed: {e}') raise StorageError @run_in_thread('cassandra') def removeChat(self, account, contact): try: messages = ChatMessage.objects(ChatMessage.account == account) except LWTException: pass else: for message in messages: if message.contact == contact: message.delete() @run_in_thread('cassandra') def removeMessage(self, account, message_id): try: timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0] except IndexError: return else: try: ChatMessage.objects(ChatMessage.account == account, ChatMessage.created_at == timestamp.created_at, ChatMessage.message_id == message_id).if_exists().delete() except LWTException: pass class TokenStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraTokenStorage() else: return FileTokenStorage() class MessageStorage(object, metaclass=Singleton): def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraMessageStorage() else: return FileMessageStorage()