Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7159477
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
23 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py
index 7e58908..1109ba8 100644
--- a/sylk/applications/webrtcgateway/storage.py
+++ b/sylk/applications/webrtcgateway/storage.py
@@ -1,183 +1,603 @@
+import datetime
+import json
import pickle as pickle
import os
from application.python.types import Singleton
+from application.system import makedirs
from collections import defaultdict
from sipsimple.threading import run_in_thread
+from sipsimple.util import ISOTimestamp
from twisted.internet import defer
+from types import SimpleNamespace
from sylk.configuration import ServerConfig
from .configuration import CassandraConfig
from .errors import StorageError
from .logger import log
__all__ = 'TokenStorage',
# TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed,
# and thus it's ok to scrap it after a reasonable amount of time.
CASSANDRA_MODULES_AVAILABLE = False
try:
from cassandra.cqlengine import columns, connection
except ImportError:
pass
else:
try:
from cassandra.cqlengine.models import Model
except ImportError:
pass
else:
CASSANDRA_MODULES_AVAILABLE = True
from cassandra import InvalidRequest
from cassandra.cqlengine import CQLEngineException
from cassandra.cqlengine.query import LWTException
from cassandra.cluster import NoHostAvailable
from cassandra.policies import DCAwareRoundRobinPolicy
from .models.storage.cassandra import PushTokens
+ from .models.storage.cassandra import ChatAccount, ChatMessage, ChatMessageIdMapping, PublicKey
if CassandraConfig.push_tokens_table:
PushTokens.__table_name__ = CassandraConfig.push_tokens_table
class FileTokenStorage(object):
def __init__(self):
self._tokens = defaultdict()
@run_in_thread('file-io')
def _save(self):
with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f:
pickle.dump(self._tokens, f)
@run_in_thread('file-io')
def load(self):
try:
tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb'))
except Exception:
pass
else:
self._tokens.update(tokens)
def __getitem__(self, key):
try:
return self._tokens[key]
except KeyError:
return {}
def add(self, account, contact_params, user_agent):
try:
(token, background_token) = contact_params['pn_tok'].split('-')
except ValueError:
token = contact_params['pn_tok']
background_token = None
data = {
'device_id': contact_params['pn_device'],
'platform': contact_params['pn_type'],
'silent': contact_params['pn_silent'],
'app_id': contact_params['pn_app'],
'user_agent': user_agent,
'background_token': background_token
}
key = f"{data['app_id']}-{data['device_id']}"
if account in self._tokens:
if isinstance(self._tokens[account], set):
self._tokens[account] = {}
# Remove old storage layout based on device id
if contact_params['pn_device'] in self._tokens[account]:
del self._tokens[account][contact_params['pn_device']]
# Remove old storage layout based on token
if token in self._tokens[account]:
del self._tokens[account][token]
# Remove old unsplit token if exists, can be removed if all tokens are stored split
if background_token is not None:
try:
del self._tokens[account][contact_params['pn_tok']]
except IndexError:
pass
self._tokens[account][key] = data
else:
self._tokens[account] = {key: data}
self._save()
def remove(self, account, app_id, device_id):
key = f'{app_id}-{device_id}'
try:
del self._tokens[account][key]
except KeyError:
pass
self._save()
class CassandraConnection(object, metaclass=Singleton):
@run_in_thread('cassandra')
def __init__(self):
try:
self.session = connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, load_balancing_policy=DCAwareRoundRobinPolicy(), protocol_version=4)
except NoHostAvailable:
self.log.error("Not able to connect to any of the Cassandra contact points")
raise StorageError
class CassandraTokenStorage(object):
@run_in_thread('cassandra')
def load(self):
CassandraConnection()
def __getitem__(self, key):
deferred = defer.Deferred()
@run_in_thread('cassandra')
def query_tokens(key):
username, domain = key.split('@', 1)
tokens = {}
for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain):
tokens[f'{device.app_id}-{device.device_id}'] = {'device_id': device.device_id, 'token': device.token,
'platform': device.platform, 'silent': device.silent,
'app_id': device.app_id, 'background_token': device.background_token}
deferred.callback(tokens)
return tokens
query_tokens(key)
return deferred
@run_in_thread('cassandra')
def add(self, account, contact_params, user_agent):
username, domain = account.split('@', 1)
try:
(token, background_token) = contact_params['pn_tok'].split('-')
except ValueError:
token = contact_params['pn_tok']
background_token = None
# Remove old unsplit token if exists, can be removed if all tokens are stored split
if background_token is not None:
try:
PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_token == contact_params['pn_tok']).if_exists().delete()
except LWTException:
pass
try:
PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'],
device_token=token, background_token=background_token, platform=contact_params['pn_type'],
silent=contact_params['pn_silent'], app_id=contact_params['pn_app'], user_agent=user_agent)
except (CQLEngineException, InvalidRequest) as e:
self.logger.error(f'Storing token failed: {e}')
raise StorageError
@run_in_thread('cassandra')
def remove(self, account, app_id, device_id):
username, domain = account.split('@', 1)
try:
PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain,
PushTokens.device_id == device_id, PushTokens.app_id == app_id).if_exists().delete()
except LWTException:
pass
+class FileMessageStorage(object):
+ def __init__(self):
+ self._public_keys = defaultdict()
+ self._accounts = defaultdict()
+ self._storage_path = os.path.join(ServerConfig.spool_dir, 'conversations')
+
+ def _json_dateconverter(self, o):
+ if isinstance(o, datetime.datetime):
+ return o.__str__()
+
+ @run_in_thread('file-io')
+ def _save(self):
+ with open(os.path.join(self._storage_path, 'accounts.json'), 'w+') as f:
+ json.dump(self._accounts, f, default=self._json_dateconverter)
+ with open(os.path.join(self._storage_path, 'public_keys.json'), 'w+') as f:
+ json.dump(self._public_keys, f, default=self._json_dateconverter)
+
+ def _save_messages(self, account, messages):
+ with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'w+') as f:
+ json.dump(messages, f, default=self._json_dateconverter, indent=4)
+
+ def _save_id_by_timestamp(self, account, ids):
+ with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'w+') as f:
+ json.dump(ids, f, default=self._json_dateconverter)
+
+ @run_in_thread('file-io')
+ def load(self):
+ makedirs(self._storage_path)
+ try:
+ accounts = json.load(open(os.path.join(self._storage_path, 'accounts.json'), 'r'))
+ except (OSError, IOError):
+ pass
+ else:
+ self._accounts.update(accounts)
+
+ def _load_id_by_timestamp(self, account):
+ try:
+ with open(os.path.join(self._storage_path, account[0], f'{account}_id_timestamp.json'), 'r') as f:
+ data = json.load(f)
+ return data
+ except (OSError, IOError) as e:
+ raise e
+
+ def _load_messages(self, account):
+ try:
+ with open(os.path.join(self._storage_path, account[0], f'{account}_messages.json'), 'r') as f:
+ messages = json.load(f)
+ return messages
+ except (OSError, IOError) as e:
+ raise e
+
+ def __getitem__(self, key):
+ deferred = defer.Deferred()
+
+ @run_in_thread('file-io')
+ def query(account, message_id):
+ messages = []
+ timestamp = None
+
+ try:
+ id_by_timestamp = self._load_id_by_timestamp(account)
+ except (OSError, IOError):
+ deferred.callback(messages)
+ return
+
+ if message_id is not None:
+ try:
+ timestamp = id_by_timestamp[message_id]
+ except KeyError:
+ deferred.callback(messages)
+ return
+ else:
+ timestamp = ISOTimestamp(timestamp)
+
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ deferred.callback(messages)
+ return
+ else:
+ if timestamp is not None:
+ messages = [message for message in messages if ISOTimestamp(message['created_at']) > timestamp]
+ deferred.callback(messages)
+ else:
+ deferred.callback(messages)
+ query(key[0], key[1])
+ return deferred
+
+ def get_account(self, account):
+ try:
+ return SimpleNamespace(account=account, **self._accounts[account])
+ except KeyError:
+ return None
+
+ def get_account_token(self, account):
+ try:
+ if datetime.datetime.now() < datetime.datetime.fromisoformat(self._accounts[account]['token_expire']):
+ return self._accounts[account]['api_token']
+ return None
+ except KeyError:
+ return None
+
+ def add_account(self, account):
+ timestamp = datetime.datetime.now()
+
+ if account not in self._accounts:
+ self._accounts[account] = {'last_login': timestamp}
+ self._save()
+
+ def add_account_token(self, account, token):
+ timestamp = datetime.datetime.now()
+ if account not in self._accounts:
+ log.error(f'Updating API token for {account} failed')
+ return StorageError
+ self._accounts[account]['api_token'] = token
+ self._accounts[account]['token_expire'] = timestamp + datetime.timedelta(seconds=26784000)
+ self._save()
+
+ @run_in_thread('file-io')
+ def update(self, account, state, message_id):
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ return
+
+ try:
+ id_by_timestamp = self._load_id_by_timestamp(account)
+ except (OSError, IOError):
+ return
+ else:
+ try:
+ timestamp = id_by_timestamp[message_id]
+ except KeyError:
+ return
+ else:
+ for idx, message in enumerate(messages):
+ if message['created_at'] == timestamp and message['message_id'] == message_id:
+ if message['state'] != 'received':
+ message['state'] = state
+ if state == 'delivered':
+ try:
+ message['disposition'].remove('positive-delivery')
+ except ValueError:
+ pass
+ elif state == 'displayed':
+ message['disposition'] = []
+ messages[idx] = message
+ self._save_messages(account, messages)
+ break
+
+ @run_in_thread('file-io')
+ def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None):
+ try:
+ msg_timestamp = datetime.datetime.fromisoformat(timestamp)
+ except ValueError:
+ msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z")
+ timestamp = datetime.datetime.now()
+
+ messages = []
+ id_by_timestamp = {}
+
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ makedirs(os.path.join(self._storage_path, account[0]))
+
+ try:
+ id_by_timestamp = self._load_id_by_timestamp(account)
+ except (OSError, IOError):
+ pass
+ else:
+ try:
+ created_at = id_by_timestamp[message_id]
+ except KeyError:
+ pass
+ else:
+ if content_type == 'message/imdn+json':
+ return
+
+ items = [n for n in messages if n['created_at'] == created_at and n['message_id'] == message_id]
+ if len(items) == 1:
+ return
+
+ if content_type == 'text/pgp-public-key':
+ self._public_keys[contact] = {'content': content, 'created_at': timestamp}
+ self._save()
+ return
+
+ if not isinstance(disposition_notification, list) and disposition_notification == '':
+ disposition_notification = []
+
+ message = {'account': account,
+ 'direction': direction,
+ 'contact': contact,
+ 'content_type': content_type,
+ 'content': content,
+ 'created_at': timestamp,
+ 'message_id': message_id,
+ 'disposition': disposition_notification,
+ 'state': state,
+ 'msg_timestamp': msg_timestamp}
+ messages.append(message)
+
+ self._save_messages(account, messages)
+
+ id_by_timestamp[message_id] = timestamp
+ self._save_id_by_timestamp(account, id_by_timestamp)
+
+ @run_in_thread('file-io')
+ def removeChat(self, account, contact):
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ pass
+ else:
+ for message in messages:
+ if message['contact'] == contact:
+ messages.remove(message)
+
+ self._save_messages(account, messages)
+
+ @run_in_thread('file-io')
+ def removeMessage(self, account, message_id):
+ try:
+ id_by_timestamp = self._load_id_by_timestamp(account)
+ except (OSError, IOError):
+ return
+ else:
+ try:
+ timestamp = id_by_timestamp[message_id]
+ except KeyError:
+ return
+ else:
+ try:
+ messages = self._load_messages(account)
+ except (OSError, IOError):
+ return
+ else:
+ item = [n for n in messages if n['created_at'] == timestamp and n['message_id'] == message_id]
+ if len(item) == 1:
+ messages.remove(item[0])
+ self._save_messages(account, messages)
+
+
+class CassandraMessageStorage(object):
+ @run_in_thread('cassandra')
+ def load(self):
+ CassandraConnection()
+
+ def __getitem__(self, key):
+ deferred = defer.Deferred()
+
+ @run_in_thread('cassandra')
+ def query_tokens(key, message_id):
+ messages = []
+ try:
+ timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0]
+ except (IndexError, InvalidRequest):
+ timestamp = datetime.datetime.now() - datetime.timedelta(days=3)
+ else:
+ timestamp = timestamp.created_at
+ for message in ChatMessage.objects(ChatMessage.account == key, ChatMessage.created_at > timestamp):
+ messages.append(message)
+ deferred.callback(messages)
+
+ query_tokens(key[0], key[1])
+ return deferred
+
+ def get_account(self, account):
+ deferred = defer.Deferred()
+
+ @run_in_thread('cassandra')
+ def query_tokens(account):
+ try:
+ chat_account = ChatAccount.objects(ChatAccount.account == account)[0]
+ except (IndexError, InvalidRequest):
+ deferred.callback(None)
+ else:
+ deferred.callback(chat_account)
+
+ query_tokens(account)
+ return deferred
+
+ def get_account_token(self, account):
+ deferred = defer.Deferred()
+
+ @run_in_thread('cassandra')
+ def query_tokens(account):
+ try:
+ chat_account = ChatAccount.objects(ChatAccount.account == account)[0]
+ except (IndexError, InvalidRequest):
+ deferred.callback(None)
+ else:
+ deferred.callback(chat_account.api_token)
+
+ query_tokens(account)
+ return deferred
+
+ @run_in_thread('cassandra')
+ def add_account(self, account):
+ timestamp = datetime.datetime.now()
+
+ try:
+ ChatAccount.create(account=account, last_login=timestamp)
+ except (CQLEngineException, InvalidRequest) as e:
+ log.error(f'Storing account failed: {e}')
+ raise StorageError
+
+ @run_in_thread('cassandra')
+ def add_account_token(self, account, token):
+ try:
+ chat_account = ChatAccount.objects(account=account)[0]
+ chat_account.ttl(2678400).update(api_token=token)
+ except IndexError:
+ log.error(f'Updating API token for {account} failed')
+ raise StorageError
+
+ @run_in_thread('cassandra')
+ def update(self, account, state, message_id):
+ try:
+ timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0]
+ except IndexError:
+ return
+ else:
+ try:
+ message = ChatMessage.objects(ChatMessage.account == account,
+ ChatMessage.created_at == timestamp.created_at,
+ ChatMessage.message_id == message_id)[0]
+ except IndexError:
+ pass
+ else:
+ if message.state != 'received':
+ message.state = state
+ if state == 'delivered':
+ try:
+ message.disposition.remove('positive-delivery')
+ except ValueError:
+ pass
+ elif state == 'displayed':
+ message.disposition.clear()
+ message.save()
+
+ @run_in_thread('cassandra')
+ def add(self, account, contact, direction, content, content_type, timestamp, disposition_notification, message_id, state=None):
+ try:
+ msg_timestamp = datetime.datetime.fromisoformat(timestamp)
+ except ValueError:
+ msg_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f%z")
+ timestamp = datetime.datetime.now()
+
+ try:
+ created_at = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0]
+ except IndexError:
+ pass
+ else:
+ if content_type == 'message/imdn+json':
+ return
+ if ChatMessage.objects(ChatMessage.account == account,
+ ChatMessage.created_at == created_at.created_at,
+ ChatMessage.message_id == message_id).count() != 0:
+ return
+
+ if content_type == 'text/pgp-public-key':
+ try:
+ PublicKey.create(account=contact, public_key=content, created_at=timestamp)
+ ChatMessageIdMapping.create(time_stamp=timestamp, message_id=message_id)
+ except (CQLEngineException, InvalidRequest) as e:
+ log.error(f'Storing public key failed: {e}')
+ raise StorageError
+ else:
+ return
+ try:
+ ChatMessage.create(account=account, direction=direction, contact=contact, content_type=content_type,
+ content=content, created_at=timestamp, message_id=message_id,
+ disposition=disposition_notification, state=state, msg_timestamp=msg_timestamp)
+ ChatMessageIdMapping.create(created_at=timestamp, message_id=message_id)
+ except (CQLEngineException, InvalidRequest) as e:
+ log.error(f'Storing message failed: {e}')
+ raise StorageError
+
+ @run_in_thread('cassandra')
+ def removeChat(self, account, contact):
+ try:
+ messages = ChatMessage.objects(ChatMessage.account == account)
+ except LWTException:
+ pass
+ else:
+ for message in messages:
+ if message.contact == contact:
+ message.delete()
+
+ @run_in_thread('cassandra')
+ def removeMessage(self, account, message_id):
+ try:
+ timestamp = ChatMessageIdMapping.objects(ChatMessageIdMapping.message_id == message_id)[0]
+ except IndexError:
+ return
+ else:
+ try:
+ ChatMessage.objects(ChatMessage.account == account,
+ ChatMessage.created_at == timestamp.created_at,
+ ChatMessage.message_id == message_id).if_exists().delete()
+ except LWTException:
+ pass
+
+
class TokenStorage(object, metaclass=Singleton):
def __new__(self):
if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points:
return CassandraTokenStorage()
else:
return FileTokenStorage()
+
+
+class MessageStorage(object, metaclass=Singleton):
+ def __new__(self):
+ if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points:
+ return CassandraMessageStorage()
+ else:
+ return FileMessageStorage()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 5:54 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408913
Default Alt Text
(23 KB)
Attached To
Mode
rSYLK SylkServer
Attached
Detach File
Event Timeline
Log In to Comment