diff --git a/sylk/applications/webrtcgateway/push.py b/sylk/applications/webrtcgateway/push.py index 3ae0c78..aacc897 100644 --- a/sylk/applications/webrtcgateway/push.py +++ b/sylk/applications/webrtcgateway/push.py @@ -1,84 +1,80 @@ import json from twisted.internet import defer, reactor from twisted.web.client import Agent, readBody from twisted.web.iweb import IBodyProducer from twisted.web.http_headers import Headers from zope.interface import implementer from .configuration import GeneralConfig from .logger import log from .models import sylkpush from .storage import TokenStorage __all__ = 'conference_invite' agent = Agent(reactor) headers = Headers({'User-Agent': ['SylkServer'], 'Content-Type': ['application/json']}) @implementer(IBodyProducer) class StringProducer(object): def __init__(self, data): self.body = data self.length = len(data) def startProducing(self, consumer): consumer.write(self.body) return defer.succeed(None) def pauseProducing(self): pass def stopProducing(self): pass def _construct_and_send(result, request, destination): for device_token, push_parameters in result.iteritems(): request.token = device_token request.app_id = push_parameters['app'] request.platform = push_parameters['platform'] request.device_id = push_parameters['device_id'] _send_push_notification(json.dumps(request), destination) def conference_invite(originator, destination, room, call_id): tokens = TokenStorage() request = sylkpush.ConferenceInviteEvent(token='dummy', app_id='dummy', platform='dummy', device_id='dummy', originator=originator.uri, from_display_name=originator.display_name, to=room, call_id=str(call_id)) user_tokens = tokens[destination] if isinstance(user_tokens, set): return else: if isinstance(user_tokens, defer.Deferred): user_tokens.addCallback(lambda result: _construct_and_send(result, request, destination)) else: _construct_and_send(user_tokens, request, destination) @defer.inlineCallbacks def _send_push_notification(payload, destination): if GeneralConfig.sylk_push_url: - try: - payload.token = payload.token.split('#')[0] - except IndexError: - pass try: r = yield agent.request('POST', GeneralConfig.sylk_push_url, headers, StringProducer(payload.__data__)) except Exception as e: log.info('Error sending push notification to %s: %s', GeneralConfig.sylk_push_url, e) else: if r.code != 200: if r.code == 410: log.info("Token expired, purging old token from storage") tokens = TokenStorage() tokens.remove(destination, payload.token) else: log.warning('Error sending push notification: %s', r.phrase) else: log.debug('Sent push notification: %s', payload) else: log.warning('Cannot send push notification: no Sylk push server configured') diff --git a/sylk/applications/webrtcgateway/storage.py b/sylk/applications/webrtcgateway/storage.py index b2a079b..60e2f90 100644 --- a/sylk/applications/webrtcgateway/storage.py +++ b/sylk/applications/webrtcgateway/storage.py @@ -1,146 +1,172 @@ import cPickle as pickle import os from application.python.types import Singleton from collections import defaultdict from sipsimple.threading import run_in_thread from twisted.internet import defer from sylk.configuration import ServerConfig from .configuration import CassandraConfig __all__ = 'TokenStorage', # TODO: Maybe add some more metadata like the modification date so we know when a token was refreshed, # and thus it's ok to scrap it after a reasonable amount of time. CASSANDRA_MODULES_AVAILABLE = False try: from cassandra.cqlengine import columns, connection except ImportError: pass else: try: from cassandra.cqlengine.models import Model except ImportError: pass else: CASSANDRA_MODULES_AVAILABLE = True from cassandra.cqlengine.query import LWTException class PushTokens(Model): - username = columns.Text(partition_key=True) - domain = columns.Text(partition_key=True) - device_id = columns.Text() - app = columns.Text() - device_token = columns.Text(primary_key=True) - platform = columns.Text() - silent = columns.Text() - user_agent = columns.Text(required=False) + username = columns.Text(partition_key=True) + domain = columns.Text(partition_key=True) + device_id = columns.Text() + app = columns.Text() + background_token = columns.Text(required=False) + device_token = columns.Text(primary_key=True) + platform = columns.Text() + silent = columns.Text() + user_agent = columns.Text(required=False) class FileStorage(object): def __init__(self): self._tokens = defaultdict() @run_in_thread('file-io') def _save(self): with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f: pickle.dump(self._tokens, f) @run_in_thread('file-io') def load(self): try: tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb')) except Exception: pass else: self._tokens.update(tokens) def __getitem__(self, key): try: return self._tokens[key] except KeyError: return {} def add(self, account, contact_params, user_agent): + try: + (token, background_token) = contact_params['pn_tok'].split('#') + except IndexError: + token = contact_params['pn_tok'] + background_token = None data = { 'device_id': contact_params['pn_device'], 'platform': contact_params['pn_type'], 'silent': contact_params['pn_silent'], 'app': contact_params['pn_app'], - 'user_agent': user_agent + 'user_agent': user_agent, + 'background_token': background_token } if account in self._tokens: if isinstance(self._tokens[account], set): self._tokens[account] = {} # Remove old storage layout based on device id if contact_params['pn_device'] in self._tokens[account]: del self._tokens[account][contact_params['pn_device']] - self._tokens[account][contact_params['pn_tok']] = data + + # Remove old unsplit token if exists, can be removed if all tokens are stored split + if background_token is not None: + try: + del self._tokens[account][contact_params['pn_tok']] + except IndexError: + pass + self._tokens[account][token] = data else: - self._tokens[account] = {contact_params['pn_tok']: data} + self._tokens[account] = {token: data} self._save() def remove(self, account, device_token): try: device_token = device_token.split('#')[0] except IndexError: pass try: del self._tokens[account][device_token] except KeyError: pass self._save() class CassandraStorage(object): @run_in_thread('cassandra') def load(self): connection.setup(CassandraConfig.cluster_contact_points, CassandraConfig.keyspace, protocol_version=4) def __getitem__(self, key): deferred = defer.Deferred() @run_in_thread('cassandra') def query_tokens(key): username, domain = key.split('@', 1) tokens = {} for device in PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain): tokens[device.device_token] = {'device_id': device.device_id, 'platform': device.platform, 'silent': device.silent, 'app': device.app} deferred.callback(tokens) return tokens query_tokens(key) return deferred @run_in_thread('cassandra') def add(self, account, contact_params, user_agent): username, domain = account.split('@', 1) + try: + (token, background_token) = contact_params['pn_tok'].split('#') + except IndexError: + token = contact_params['pn_tok'] + background_token = None + + # Remove old unsplit token if exists, can be removed if all tokens are stored split + if background_token is not None: + try: + PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_token == contact_params['pn_tok']).if_exists().delete() + except LWTException: + pass PushTokens.create(username=username, domain=domain, device_id=contact_params['pn_device'], - device_token=contact_params['pn_tok'], platform=contact_params['pn_type'], + device_token=token, background_token=background_token, platform=contact_params['pn_type'], silent=contact_params['pn_silent'], app=contact_params['pn_app'], user_agent=user_agent) @run_in_thread('cassandra') def remove(self, account, device_token): username, domain = account.split('@', 1) try: device_token = device_token.split('#')[0] except IndexError: pass try: PushTokens.objects(PushTokens.username == username, PushTokens.domain == domain, PushTokens.device_token == device_token).if_exists().delete() except LWTException: pass class TokenStorage(object): __metaclass__ = Singleton def __new__(self): if CASSANDRA_MODULES_AVAILABLE and CassandraConfig.cluster_contact_points: return CassandraStorage() else: return FileStorage()