Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py
index 7e58908..1109ba8 100644
--- a/sylk/applications/webrtcgateway/storage.py
+++ b/sylk/applications/webrtcgateway/storage.py
@@ -1,183 +1,603 @@
+import datetime
+import json
import pickle as pickle
import os
from application.python.types import Singleton
+from application.system import makedirs
from collections import defaultdict
from sipsimple.threading import run_in_thread
+from sipsimple.util import ISOTimestamp
from twisted.internet import defer
+from types import SimpleNamespace
from sylk.configuration import ServerConfig
from .configuration import CassandraConfig
from .errors import StorageError
from .logger import log
__all__ = 'TokenStorage',
# TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed,
# and thus it's ok to scrap it after a reasonable amount of time.
CASSANDRA_MODULES_AVAILABLE = False
try:
from cassandra.cqlengine import columns, connection
except ImportError:
pass
else:
try:
from cassandra.cqlengine.models import Model
except ImportError:
pass
else:
CASSANDRA_MODULES_AVAILABLE = True
from cassandra import InvalidRequest
from cassandra.cqlengine import CQLEngineException
from cassandra.cqlengine.query import LWTException
from cassandra.cluster import NoHostAvailable
from cassandra.policies import DCAwareRoundRobinPolicy
from .models.storage.cassandra import PushTokens
+ from .models.storage.cassandra import ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey
if CassandraConfig.push_tokens_table:
PushTokens.__table_name__ = CassandraConfig.push_tokens_table
class FileTokenStorage(object):
def __init__(self):
self._tokens = defaultdict()
@run_in_thread('file-io')
def _save(self):
with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f:
pickle.dump(self._tokens, f)
@run_in_thread('file-io')
def load(self):
try:
tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb'))
except Exception:
pass
else:
self._tokens.update(tokens)
def __getitem__(self, key):
try:
return self._tokens[key]
except KeyError:
return {}
def add(self, account, contact_params, user_agent):
try:
(token, background_token) = contact_params['pn_tok'].split('-')
except ValueError:
token = contact_params['pn_tok']
background_token = None
data = {
'device_id': contact_params['pn_device'],
'platform': contact_params['pn_type'],
'silent': contact_params['pn_silent'],
'app_id': contact_params['pn_app'],
'user_agent': user_agent,
'background_token': background_token
}
key = f"{data['app_id']}-{data['device_id']}"
if account in self._tokens:
if isinstance(self._tokens[account], set):
self._tokens[account] = {}
# Remove old storage layout based on device id
if contact_params['pn_device'] in self._tokens[account]:
del self._tokens[account][contact_params['pn_device']]
# Remove old storage layout based on token
if token in self._tokens[account]:
del self._tokens[account][token]
# Remove old unsplit token if exists, can be removed if all tokens are stored split
if background_token is not None:
try:
del self._tokens[account][contact_params['pn_tok']]
except IndexError:
pass
self._tokens[account][key] = data
else:
self._tokens[account] = {key: data}
self._save()
def remove(self, account, app_id, device_id):
key = f'{app_id}-{device_id}'
try:
del self._tokens[account][key]
except KeyError:
pass
self._save()
class CassandraConnection(object, metaclass=Singleton):
@run_in_thread('cassandra')
def __init__(self):
try:
self.session = connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4)
except NoHostAvailable:
self.log.error("Not able to connect to any of the Cassandra contact points")
raise StorageError
class CassandraTokenStorage(object):
@run_in_thread('cassandra')
def load(self):
CassandraConnection()
def __getitem__(self, key):
deferred = defer.Deferred()
@run_in_thread('cassandra')
def query_tokens(key):
username, domain = key.split('@', 1)
tokens = {}
for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain):
tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.token,
'platform': device.platform, 'silent': device.silent,
'app_id': device.app_id, 'background_token': device.background_token}
deferred.callback(tokens)
return tokens
query_tokens(key)
return deferred
@run_in_thread('cassandra')
def add(self, account, contact_params, user_agent):
username, domain = account.split('@', 1)
try:
(token, background_token) = contact_params['pn_tok'].split('-')
except ValueError:
token = contact_params['pn_tok']
background_token = None
# Remove old unsplit token if exists, can be removed if all tokens are stored split
if background_token is not None:
try:
PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_token == contact_params['pn_tok']).if_exists().delete()
except LWTException:
pass
try:
PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'],
device_token=token, background_token=background_token, platform=contact_params['pn_type'],
silent=contact_params['pn_silent'], app_id=contact_params['pn_app'], user_agent=user_agent)
except (CQLEngineException, InvalidRequest) as e:
self.logger.error(f'Storing token failed: {e}')
raise StorageError
@run_in_thread('cassandra')
def remove(self, account, app_id, device_id):
username, domain = account.split('@', 1)
try:
PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain,
PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete()
except LWTException:
pass
+class FileMessageStorage(object):
+ def __init__(self):
+ self._public_keys = defaultdict()
+ self._accounts = defaultdict()
+ self._storage_path = os.path.join(ServerConfig.spool_dir, 'conversations')
+
+ def _json_dateconverter(self, o):
+ if isinstance(o, datetime.datetime):
+ return o.__str__()
+
+ @run_in_thread('file-io')
+ def _save(self):
+ with open(os.path.join(self._storage_path, 'accounts.json'), 'w+') as f:
+ json.dump(self._accounts, f, default=self._json_dateconverter)
+ with open(os.path.join(self._storage_path, 'public_keys.json'), 'w+') as f:
+ json.dump(self._public_keys, f, default=self._json_dateconverter)
+
+ def _save_messages(self, account, messages):
+ with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'w+') as f:
+ json.dump(messages, f, default=self._json_dateconverter, indent=4)
+
+ def _save_id_by_timestamp(self, account, ids):
+ with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'w+') as f:
+ json.dump(ids, f, default=self._json_dateconverter)
+
+ @run_in_thread('file-io')
+ def load(self):
+ makedirs(self._storage_path)
+ try:
+ accounts = json.load(open(os.path.join(self._storage_path, 'accounts.json'), 'r'))
+ except (OSError, IOError):
+ pass
+ else:
+ self._accounts.update(accounts)
+
+ def _load_id_by_timestamp(self, account):
+ try:
+ with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'r') as f:
+ data = json.load(f)
+ return data
+ except (OSError, IOError) as e:
+ raise e
+
+ def _load_messages(self, account):
+ try:
+ with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'r') as f:
+ messages = json.load(f)
+ return messages
+ except (OSError, IOError) as e:
+ raise e
+
+ def __getitem__(self, key):
+ deferred = defer.Deferred()
+
+ @run_in_thread('file-io')
+ def query(account, message_id):
+ messages = []
+ timestamp = None
+
+ try:
+ id_by_timestamp = self._load_id_by_timestamp(account)
+ except (OSError, IOError):
+ deferred.callback(messages)
+ return
+
+ if message_id is not None:
+ try:
+ timestamp = id_by_timestamp[message_id]
+ except KeyError:
+ deferred.callback(messages)
+ return
+ else:
+ timestamp = ISOTimestamp(timestamp)
+
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ deferred.callback(messages)
+ return
+ else:
+ if timestamp is not None:
+ messages = [message for message in messages if ISOTimestamp(message['created_at']) > timestamp]
+ deferred.callback(messages)
+ else:
+ deferred.callback(messages)
+ query(key[0], key[1])
+ return deferred
+
+ def get_account(self, account):
+ try:
+ return SimpleNamespace(account=account, **self._accounts[account])
+ except KeyError:
+ return None
+
+ def get_account_token(self, account):
+ try:
+ if datetime.datetime.now() < datetime.datetime.fromisoformat(self._accounts[account]['token_expire']):
+ return self._accounts[account]['api_token']
+ return None
+ except KeyError:
+ return None
+
+ def add_account(self, account):
+ timestamp = datetime.datetime.now()
+
+ if account not in self._accounts:
+ self._accounts[account] = {'last_login': timestamp}
+ self._save()
+
+ def add_account_token(self, account, token):
+ timestamp = datetime.datetime.now()
+ if account not in self._accounts:
+ log.error(f'Updating API token for {account} failed')
+ return StorageError
+ self._accounts[account]['api_token'] = token
+ self._accounts[account]['token_expire'] = timestamp + datetime.timedelta(seconds=26784000)
+ self._save()
+
+ @run_in_thread('file-io')
+ def update(self, account, state, message_id):
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ return
+
+ try:
+ id_by_timestamp = self._load_id_by_timestamp(account)
+ except (OSError, IOError):
+ return
+ else:
+ try:
+ timestamp = id_by_timestamp[message_id]
+ except KeyError:
+ return
+ else:
+ for idx, message in enumerate(messages):
+ if message['created_at'] == timestamp and message['message_id'] == message_id:
+ if message['state'] != 'received':
+ message['state'] = state
+ if state == 'delivered':
+ try:
+ message['disposition'].remove('positive-delivery')
+ except ValueError:
+ pass
+ elif state == 'displayed':
+ message['disposition'] = []
+ messages[idx] = message
+ self._save_messages(account, messages)
+ break
+
+ @run_in_thread('file-io')
+ def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None):
+ try:
+ msg_timestamp = datetime.datetime.fromisoformat(timestamp)
+ except ValueError:
+ msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z")
+ timestamp = datetime.datetime.now()
+
+ messages = []
+ id_by_timestamp = {}
+
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ makedirs(os.path.join(self._storage_path, account[0]))
+
+ try:
+ id_by_timestamp = self._load_id_by_timestamp(account)
+ except (OSError, IOError):
+ pass
+ else:
+ try:
+ created_at = id_by_timestamp[message_id]
+ except KeyError:
+ pass
+ else:
+ if content_type == 'message/imdn+json':
+ return
+
+ items = [n for n in messages if n['created_at'] == created_at and n['message_id'] == message_id]
+ if len(items) == 1:
+ return
+
+ if content_type == 'text/pgp-public-key':
+ self._public_keys[contact] = {'content': content, 'created_at': timestamp}
+ self._save()
+ return
+
+ if not isinstance(disposition_notification, list) and disposition_notification == '':
+ disposition_notification = []
+
+ message = {'account': account,
+ 'direction': direction,
+ 'contact': contact,
+ 'content_type': content_type,
+ 'content': content,
+ 'created_at': timestamp,
+ 'message_id': message_id,
+ 'disposition': disposition_notification,
+ 'state': state,
+ 'msg_timestamp': msg_timestamp}
+ messages.append(message)
+
+ self._save_messages(account, messages)
+
+ id_by_timestamp[message_id] = timestamp
+ self._save_id_by_timestamp(account, id_by_timestamp)
+
+ @run_in_thread('file-io')
+ def removeChat(self, account, contact):
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ pass
+ else:
+ for message in messages:
+ if message['contact'] == contact:
+ messages.remove(message)
+
+ self._save_messages(account, messages)
+
+ @run_in_thread('file-io')
+ def removeMessage(self, account, message_id):
+ try:
+ id_by_timestamp = self._load_id_by_timestamp(account)
+ except (OSError, IOError):
+ return
+ else:
+ try:
+ timestamp = id_by_timestamp[message_id]
+ except KeyError:
+ return
+ else:
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ return
+ else:
+ item = [n for n in messages if n['created_at'] == timestamp and n['message_id'] == message_id]
+ if len(item) == 1:
+ messages.remove(item[0])
+ self._save_messages(account, messages)
+
+
+class CassandraMessageStorage(object):
+ @run_in_thread('cassandra')
+ def load(self):
+ CassandraConnection()
+
+ def __getitem__(self, key):
+ deferred = defer.Deferred()
+
+ @run_in_thread('cassandra')
+ def query_tokens(key, message_id):
+ messages = []
+ try:
+ timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0]
+ except (IndexError, InvalidRequest):
+ timestamp = datetime.datetime.now() - datetime.timedelta(days=3)
+ else:
+ timestamp = timestamp.created_at
+ for message in ChatMessage.objects(ChatMessage.account == key, ChatMessage.created_at > timestamp):
+ messages.append(message)
+ deferred.callback(messages)
+
+ query_tokens(key[0], key[1])
+ return deferred
+
+ def get_account(self, account):
+ deferred = defer.Deferred()
+
+ @run_in_thread('cassandra')
+ def query_tokens(account):
+ try:
+ chat_account = ChatAccount.objects(ChatAccount.account == account)[0]
+ except (IndexError, InvalidRequest):
+ deferred.callback(None)
+ else:
+ deferred.callback(chat_account)
+
+ query_tokens(account)
+ return deferred
+
+ def get_account_token(self, account):
+ deferred = defer.Deferred()
+
+ @run_in_thread('cassandra')
+ def query_tokens(account):
+ try:
+ chat_account = ChatAccount.objects(ChatAccount.account == account)[0]
+ except (IndexError, InvalidRequest):
+ deferred.callback(None)
+ else:
+ deferred.callback(chat_account.api_token)
+
+ query_tokens(account)
+ return deferred
+
+ @run_in_thread('cassandra')
+ def add_account(self, account):
+ timestamp = datetime.datetime.now()
+
+ try:
+ ChatAccount.create(account=account, last_login=timestamp)
+ except (CQLEngineException, InvalidRequest) as e:
+ log.error(f'Storing account failed: {e}')
+ raise StorageError
+
+ @run_in_thread('cassandra')
+ def add_account_token(self, account, token):
+ try:
+ chat_account = ChatAccount.objects(account=account)[0]
+ chat_account.ttl(2678400).update(api_token=token)
+ except IndexError:
+ log.error(f'Updating API token for {account} failed')
+ raise StorageError
+
+ @run_in_thread('cassandra')
+ def update(self, account, state, message_id):
+ try:
+ timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0]
+ except IndexError:
+ return
+ else:
+ try:
+ message = ChatMessage.objects(ChatMessage.account == account,
+ ChatMessage.created_at == timestamp.created_at,
+ ChatMessage.message_id == message_id)[0]
+ except IndexError:
+ pass
+ else:
+ if message.state != 'received':
+ message.state = state
+ if state == 'delivered':
+ try:
+ message.disposition.remove('positive-delivery')
+ except ValueError:
+ pass
+ elif state == 'displayed':
+ message.disposition.clear()
+ message.save()
+
+ @run_in_thread('cassandra')
+ def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None):
+ try:
+ msg_timestamp = datetime.datetime.fromisoformat(timestamp)
+ except ValueError:
+ msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z")
+ timestamp = datetime.datetime.now()
+
+ try:
+ created_at = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0]
+ except IndexError:
+ pass
+ else:
+ if content_type == 'message/imdn+json':
+ return
+ if ChatMessage.objects(ChatMessage.account == account,
+ ChatMessage.created_at == created_at.created_at,
+ ChatMessage.message_id == message_id).count() != 0:
+ return
+
+ if content_type == 'text/pgp-public-key':
+ try:
+ PublicKey.create(account=contact, public_key=content, created_at=timestamp)
+ ChatMessageIdMapping.create(time_stamp=timestamp, message_id=message_id)
+ except (CQLEngineException, InvalidRequest) as e:
+ log.error(f'Storing public key failed: {e}')
+ raise StorageError
+ else:
+ return
+ try:
+ ChatMessage.create(account=account, direction=direction, contact=contact, content_type=content_type,
+ content=content, created_at=timestamp, message_id=message_id,
+ disposition=disposition_notification, state=state, msg_timestamp=msg_timestamp)
+ ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id)
+ except (CQLEngineException, InvalidRequest) as e:
+ log.error(f'Storing message failed: {e}')
+ raise StorageError
+
+ @run_in_thread('cassandra')
+ def removeChat(self, account, contact):
+ try:
+ messages = ChatMessage.objects(ChatMessage.account == account)
+ except LWTException:
+ pass
+ else:
+ for message in messages:
+ if message.contact == contact:
+ message.delete()
+
+ @run_in_thread('cassandra')
+ def removeMessage(self, account, message_id):
+ try:
+ timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0]
+ except IndexError:
+ return
+ else:
+ try:
+ ChatMessage.objects(ChatMessage.account == account,
+ ChatMessage.created_at == timestamp.created_at,
+ ChatMessage.message_id == message_id).if_exists().delete()
+ except LWTException:
+ pass
+
+
class TokenStorage(object, metaclass=Singleton):
def __new__(self):
if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points:
return CassandraTokenStorage()
else:
return FileTokenStorage()
+
+
+class MessageStorage(object, metaclass=Singleton):
+ def __new__(self):
+ if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points:
+ return CassandraMessageStorage()
+ else:
+ return FileMessageStorage()

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 5:54 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408913
Default Alt Text
(23 KB)

Event Timeline