diff --git a/sylk/applications/webrtcgateway/__init__.py b/sylk/applications/webrtcgateway/__init__.py index 84376d9..dfbc358 100644 --- a/sylk/applications/webrtcgateway/__init__.py +++ b/sylk/applications/webrtcgateway/__init__.py @@ -1,238 +1,254 @@ import json import random import secrets import uuid from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, FromHeader, ToHeader, Message, RouteHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads.imdn import IMDNDocument from sipsimple.streams.msrp.chat import CPIMPayload, CPIMParserError from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import defer from zope.interface import implementer from sylk.applications import SylkApplication from sylk.configuration import SIPConfig from sylk.web import server from .configuration import GeneralConfig from .logger import log from .models import sylkrtc from .storage import TokenStorage, MessageStorage from .web import WebHandler, AdminWebHandler - +from . import push @implementer(IObserver) class WebRTCGatewayApplication(SylkApplication): def __init__(self): self.web_handler = WebHandler() self.admin_web_handler = AdminWebHandler() self.resolver = DNSLookup() def start(self): self.web_handler.start() self.admin_web_handler.start() # Load tokens from the storage token_storage = TokenStorage() token_storage.load() # Setup message storage message_storage = MessageStorage() message_storage.load() def stop(self): self.web_handler.stop() self.admin_web_handler.stop() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def incoming_session(self, session): log.debug('New incoming session {session.call_id} from sip:{uri.user}@{uri.host} rejected'.format(session=session, uri=session.remote_identity.uri)) session.reject(403) def incoming_subscription(self, request, data): request.answer(405) def incoming_referral(self, request, data): request.answer(405) def incoming_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null).content_type from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (content_type, from_header, to_header): message_request.answer(400) return if content_type == 'message/cpim': try: cpim_message = CPIMPayload.decode(data.body) except CPIMParserError: log.warning('SIP message from %s to %s rejected: CPIM parse error' % (from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host))) message_request.answer(400) return else: content_type = cpim_message.content_type log.info('received SIP message (%s) from %s to %s' % (content_type, from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host))) message_storage = MessageStorage() if content_type == 'application/sylk-api-token': account = f'{from_header.uri.user}@{from_header.uri.host}' message_storage.add_account(account) message_request.answer(200) token = secrets.token_urlsafe() self._send_sip_message(from_header.uri, json.dumps({'token': token, 'url': f'{server.url}/webrtcgateway/messages/history/{account}'}), 'application/sylk-api-token') message_storage.add_account_token(account=account, token=token) return account = message_storage.get_account(f'{to_header.uri.user}@{to_header.uri.host}') if isinstance(account, defer.Deferred): account.addCallback(lambda result: self.check_account(result, message_request, data)) else: self.check_account(account, message_request, data) def check_account(self, account, message_request, data): content_type = data.headers.get('Content-Type', Null).content_type from_header = data.headers.get('From', Null) if account is not None: log.debug(f'processing SIP message from {from_header.uri} to {account.account}') message_request.answer(200) cpim_message = None if content_type == "message/cpim": cpim_message = CPIMPayload.decode(data.body) body = cpim_message.content if isinstance(cpim_message.content, str) else cpim_message.content.decode() content_type = cpim_message.content_type sender = cpim_message.sender or from_header disposition = next(([item.strip() for item in header.value.split(',')] for header in cpim_message.additional_headers if header.name == 'Disposition-Notification'), None) message_id = next((header.value for header in cpim_message.additional_headers if header.name == 'Message-ID'), str(uuid.uuid4())) else: body = data.body.decode('utf-8') sender = from_header disposition = None message_id = str(uuid.uuid4()) timestamp = str(cpim_message.timestamp) if cpim_message is not None and cpim_message.timestamp is not None else str(ISOTimestamp.now()) sender = sylkrtc.SIPIdentity(uri=str(sender.uri), display_name=sender.display_name) message = None notification_center = NotificationCenter() if content_type == "application/im-iscomposing+xml": return if content_type == IMDNDocument.content_type: document = IMDNDocument.parse(body) imdn_message_id = document.message_id.value imdn_status = document.notification.status.__str__() imdn_datetime = document.datetime.__str__() log.debug('storing IMDN message ({status}) from: {originator.uri}'.format(status=imdn_status, originator=sender)) storage = MessageStorage() storage.update(account=account.account, state=imdn_status, message_id=imdn_message_id) storage.update(account=str(sender.uri), state=imdn_status, message_id=imdn_message_id) message = sylkrtc.AccountDispositionNotificationEvent(account=account.account, state=imdn_status, message_id=imdn_message_id, message_timestamp=imdn_datetime, timestamp=timestamp, code=200, reason='') imdn_message_event = message.__data__ # del imdn_message_event['account'] storage.add(account=account.account, contact=sender.uri, direction="incoming", content=json.dumps(imdn_message_event), content_type='message/imdn', timestamp=timestamp, disposition_notification='', message_id=message_id, state='received') notification_center.post_notification(name='SIPApplicationGotAccountDispositionNotification', sender=account.account, data=NotificationData(message=message, sender=sender)) else: log.debug('storing SIP message ({content_type}) from: {originator.uri} to {account}'.format(content_type=content_type, originator=sender, account=account.account)) storage = MessageStorage() storage.add(account=account.account, contact=str(sender.uri), direction="incoming", content=body, content_type=content_type, timestamp=timestamp, disposition_notification=disposition, message_id=message_id, state='received') message = sylkrtc.AccountMessageEvent(account=account.account, sender=sender, content=body, content_type=content_type, timestamp=timestamp, disposition_notification=disposition, message_id=message_id) notification_center.post_notification(name='SIPApplicationGotAccountMessage', sender=account.account, data=message) + + if content_type == 'text/plain' or content_type == 'text/html': + + def get_unread_messages(messages, originator): + unread = 1 + for message in messages: + if ((message.content_type == 'text/plain' or message.content_type == 'text/html') + and message.direction == 'incoming' and message.contact != account.account + and 'display' in message.disposition): + unread = unread + 1 + push.message(originator=originator, destination=account.account, call_id=str(uuid.uuid4()), badge=unread) + + messages = storage[[account.account, '']] + if isinstance(messages, defer.Deferred): + messages.addCallback(lambda result: get_unread_messages(messages=result, originator=message.sender)) + return to_header = data.headers.get('To', Null) log.info('rejecting SIP %s message from %s to %s, account not found in storage' % (content_type, from_header.uri, '%s@%s' % (to_header.uri.user, to_header.uri.host))) message_request.answer(404) def _lookup_sip_target_route(self, uri): proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: sip_uri = SIPURI.parse('sip:%s' % uri) settings = SIPSimpleSettings() try: routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait() except DNSLookupError as e: raise DNSLookupError('DNS lookup error: {exception!s}'.format(exception=e)) if not routes: raise DNSLookupError('DNS lookup error: no results found') route = random.choice([r for r in routes if r.transport == routes[0].transport]) log.debug('DNS lookup for SIP message proxy for {} yielded {}'.format(uri, route)) return route @run_in_green_thread def _send_sip_message(self, uri, content, content_type='text/plain'): route = self._lookup_sip_target_route(uri) sip_uri = SIPURI.parse('%s' % uri) if route: identity = f'sylkserver@{SIPConfig.local_ip}' log.debug("sending message from '%s' to '%s' using proxy %s" % (identity, uri, route)) from_uri = SIPURI.parse(f'sip:{identity}') content = content if isinstance(content, bytes) else content.encode() message_request = Message(FromHeader(from_uri, 'SylkServer'), ToHeader(sip_uri), RouteHeader(route.uri), content_type, content) notification_center = NotificationCenter() notification_center.add_observer(self, sender=message_request) message_request.send() def _NH_SIPMessageDidSucceed(self, notification): log.info('message was accepted by remote party') def _NH_SIPMessageDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) data = notification.data reason = data.reason.decode() if isinstance(data.reason, bytes) else data.reason log.warning('could not deliver message %d %s' % (data.code, reason)) diff --git a/sylk/applications/webrtcgateway/models/sylkpush.py b/sylk/applications/webrtcgateway/models/sylkpush.py index d8665ac..29ea955 100644 --- a/sylk/applications/webrtcgateway/models/sylkpush.py +++ b/sylk/applications/webrtcgateway/models/sylkpush.py @@ -1,42 +1,56 @@ from .jsonobjects import IntegerProperty, StringProperty, FixedValueProperty, LimitedChoiceProperty from .jsonobjects import JSONObject # Event base classes (abstract, should not be used directly) class SylkRTCEventBase(JSONObject): platform = StringProperty(optional=True) app_id = StringProperty(optional=True) token = StringProperty() device_id = StringProperty(optional=True) silent = IntegerProperty(optional=True, default=0) call_id = StringProperty() def __init__(self, **kw): super(SylkRTCEventBase, self).__init__(**kw) class CallEventBase(SylkRTCEventBase): event = None # specified by subclass originator = StringProperty() from_display_name = StringProperty(optional=True, default=None) - media_type = LimitedChoiceProperty(['audio', 'video']) + media_type = LimitedChoiceProperty(['audio', 'video', 'sms']) # Events to use used in a SylkPushRequest class ConferenceInviteEvent(CallEventBase): event = FixedValueProperty('incoming_conference_request') to = StringProperty() @property def __data__(self): data = super(ConferenceInviteEvent, self).__data__ for key in data: # Fixup keys data[key.replace('_', '-')] = data.pop(key) data['from'] = data.pop('originator') return data + +class MessageEvent(CallEventBase): + event = FixedValueProperty('message') + to = StringProperty() + badge = IntegerProperty(default=1) + + @property + def __data__(self): + data = super(MessageEvent, self).__data__ + for key in list(data): + # Fixup keys + data[key.replace('_', '-')] = data.pop(key) + data['from'] = data.pop('originator') + return data diff --git a/sylk/applications/webrtcgateway/push.py b/sylk/applications/webrtcgateway/push.py index 0a20907..ffdbd8b 100644 --- a/sylk/applications/webrtcgateway/push.py +++ b/sylk/applications/webrtcgateway/push.py @@ -1,130 +1,147 @@ import json from twisted.internet import defer, reactor from twisted.web.client import Agent, readBody from twisted.web.iweb import IBodyProducer from twisted.web.http_headers import Headers from zope.interface import implementer from .configuration import GeneralConfig from .logger import log from .models import sylkpush from .storage import TokenStorage __all__ = 'conference_invite' agent = Agent(reactor) headers = Headers({'User-Agent': ['SylkServer'], 'Content-Type': ['application/json']}) @implementer(IBodyProducer) class BytesProducer(object): def __init__(self, data): self.body = data self.length = len(data) def startProducing(self, consumer): consumer.write(self.body) return defer.succeed(None) def pauseProducing(self): pass def stopProducing(self): pass + def _construct_and_send(result, request, destination): for device, push_parameters in result.items(): request.token = push_parameters['token'] + if isinstance(request, sylkpush.MessageEvent) and push_parameters['background_token']: + request.token = push_parameters['background_token'] request.app_id = push_parameters['app_id'] request.platform = push_parameters['platform'] request.device_id = push_parameters['device_id'] _send_push_notification(request, destination, request.token) def conference_invite(originator, destination, room, call_id, audio, video): tokens = TokenStorage() if video: media_type = 'video' else: media_type = 'audio' request = sylkpush.ConferenceInviteEvent(token='dummy', app_id='dummy', platform='dummy', device_id='dummy', originator=originator.uri, from_display_name=originator.display_name, to=room, call_id=str(call_id), media_type=media_type) user_tokens = tokens[destination] if isinstance(user_tokens, set): return else: if isinstance(user_tokens, defer.Deferred): user_tokens.addCallback(lambda result: _construct_and_send(result, request, destination)) else: _construct_and_send(user_tokens, request, destination) +def message(originator, destination, call_id, badge): + tokens = TokenStorage() + media_type = 'sms' + + request = sylkpush.MessageEvent(token='dummy', app_id='dummy', platform='dummy', device_id='dummy', + originator=originator.uri, from_display_name=originator.display_name, to=destination, call_id=str(call_id), + media_type=media_type, badge=badge) + user_tokens = tokens[destination] + if isinstance(user_tokens, set): + return + else: + if isinstance(user_tokens, defer.Deferred): + user_tokens.addCallback(lambda result: _construct_and_send(result, request, destination)) + else: + _construct_and_send(user_tokens, request, destination) @defer.inlineCallbacks def _send_push_notification(payload, destination, token): if GeneralConfig.sylk_push_url: try: r = yield agent.request(b'POST', GeneralConfig.sylk_push_url.encode(), headers, BytesProducer(json.dumps(payload.__data__).encode()) ) except Exception as e: log.info('Error sending push notification to %s: %s', GeneralConfig.sylk_push_url, e) else: try: raw_body = yield readBody(r) except Exception as e: log.warning('Error reading response body: %s', e) else: try: body = json.loads(raw_body) except Exception as e: log.warning('Error parsing response body: %s', e) body = {} try: platform = body['data']['platform'] except KeyError: platform = 'Unknown platform' if r.code != 200: try: reason = body['data']['reason'] except KeyError: reason = None try: details = body['data']['body']['_content']['error']['message'] except KeyError: details = None if reason and details: error_description = "%s %s" % (reason, details) elif reason: error_description = reason else: error_description = body if r.code == 410: if body and 'application/json' in r.headers.getRawHeaders('content-type'): try: token = body['data']['token'] except KeyError: pass else: log.info('Purging expired push token %s/%s' % (destination, token)) tokens = TokenStorage() tokens.remove(destination, payload.app_id, payload.device_id) else: - log.warning('Error sending %s push notification for videoroom to %s/%s: %s (%s) %s %s' % (platform.title(), payload.to, destination, token[:15], r.phrase.decode(), r.code, error_description)) + log.warning('Error sending %s push notification to %s/%s: %s (%s) %s %s' % (platform.title(), payload.to, destination, token[:15], r.phrase.decode(), r.code, error_description)) else: - log.info('Sent %s push notify for videoroom %s to %s/%s' % (platform.title(), payload.to, destination, token[:15])) - + log.info('Sent %s push notify for %s to %s/%s' % (platform.title(), payload.to, destination, token[:15])) else: log.warning('Cannot send push notification: no Sylk push server configured')