Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/webrtcgateway/__init__.py b/sylk/applications/webrtcgateway/__init__.py
index a0338e1..7ee2443 100644
--- a/sylk/applications/webrtcgateway/__init__.py
+++ b/sylk/applications/webrtcgateway/__init__.py
@@ -1,30 +1,38 @@
from sylk.applications import SylkApplication
from sylk.applications.webrtcgateway.logger import log
from sylk.applications.webrtcgateway.util import IdentityFormatter
from sylk.applications.webrtcgateway.web import WebHandler
+from sylk.applications.webrtcgateway.web.admin import AdminWebHandler
+from sylk.applications.webrtcgateway.web.storage import TokenStorage
class WebRTCGatewayApplication(SylkApplication):
def __init__(self):
self.web_handler = WebHandler()
+ self.admin_web_handler = AdminWebHandler()
def start(self):
self.web_handler.start()
+ self.admin_web_handler.start()
+ # Load tokens from storage
+ storage = TokenStorage()
+ storage.load()
def stop(self):
self.web_handler.stop()
+ self.admin_web_handler.stop()
def incoming_session(self, session):
log.msg(u'New incoming session %s from %s rejected' % (session.call_id, IdentityFormatter.format(session.remote_identity)))
session.reject(403)
def incoming_subscription(self, request, data):
request.reject(405)
def incoming_referral(self, request, data):
request.reject(405)
def incoming_message(self, request, data):
request.reject(405)
diff --git a/sylk/applications/webrtcgateway/configuration.py b/sylk/applications/webrtcgateway/configuration.py
index 40b6871..17dcc00 100644
--- a/sylk/applications/webrtcgateway/configuration.py
+++ b/sylk/applications/webrtcgateway/configuration.py
@@ -1,127 +1,134 @@
import os
import re
from application.configuration import ConfigFile, ConfigSection, ConfigSetting
-from application.configuration.datatypes import StringList
+from application.configuration.datatypes import NetworkAddress, StringList
from sylk.configuration import ServerConfig
from sylk.configuration.datatypes import Path, SIPProxyAddress
from sylk.resources import VarResources
__all__ = ['get_room_config']
# Datatypes
class AccessPolicyValue(str):
allowed_values = ('allow,deny', 'deny,allow')
def __new__(cls, value):
value = re.sub('\s', '', value)
if value not in cls.allowed_values:
raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values))
return str.__new__(cls, value)
class Domain(str):
domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$")
def __new__(cls, value):
value = str(value)
if not cls.domain_re.match(value):
raise ValueError("illegal domain: %s" % value)
return str.__new__(cls, value)
class SIPAddress(str):
def __new__(cls, address):
address = str(address)
address = address.replace('@', '%40', address.count('@')-1)
try:
username, domain = address.split('@')
Domain(domain)
except ValueError:
raise ValueError("illegal SIP address: %s, must be in user@domain format" % address)
return str.__new__(cls, address)
class PolicySettingValue(list):
def __init__(self, value):
if isinstance(value, (tuple, list)):
l = [str(x) for x in value]
elif isinstance(value, basestring):
if value.lower() in ('none', ''):
return list.__init__(self, [])
elif value.lower() in ('any', 'all', '*'):
return list.__init__(self, ['*'])
else:
l = re.split(r'\s*,\s*', value)
else:
raise TypeError("value must be a string, list or tuple")
values = []
for item in l:
if '@' in item:
values.append(SIPAddress(item))
else:
values.append(Domain(item))
return list.__init__(self, values)
def match(self, uri):
if self == ['*']:
return True
(user, domain) = uri.split("@")
uri = re.sub('^(sip:|sips:)', '', str(uri))
return uri in self or domain in self
+class ManagementInterfaceAddress(NetworkAddress):
+ default_port = 20888
+
+
# Configuration objects
class GeneralConfig(ConfigSection):
__cfgfile__ = 'webrtcgateway.ini'
__section__ = 'General'
web_origins = ConfigSetting(type=StringList, value=['*'])
sip_domains = ConfigSetting(type=StringList, value=['*'])
outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None)
trace_websocket = False
websocket_ping_interval = 120
recording_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'recordings')))
-
-
-class RoomConfig(ConfigSection):
- __cfgfile__ = 'webrtcgateway.ini'
-
- record = False
- access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny'))
- allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all'))
- deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none'))
+ http_management_interface = ConfigSetting(type=ManagementInterfaceAddress, value=ManagementInterfaceAddress('127.0.0.1'))
+ http_management_auth_secret = ConfigSetting(type=str, value=None)
+ firebase_server_key = ConfigSetting(type=str, value=None)
class JanusConfig(ConfigSection):
__cfgfile__ = 'webrtcgateway.ini'
__section__ = 'Janus'
api_url = 'ws://127.0.0.1:8188'
api_secret = '0745f2f74f34451c89343afcdcae5809'
trace_janus = False
+class RoomConfig(ConfigSection):
+ __cfgfile__ = 'webrtcgateway.ini'
+
+ record = False
+ access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny'))
+ allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all'))
+ deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none'))
+
+
class Configuration(object):
def __init__(self, data):
self.__dict__.update(data)
def get_room_config(room):
config_file = ConfigFile(RoomConfig.__cfgfile__)
section = config_file.get_section(room)
if section is not None:
RoomConfig.read(section=room)
config = Configuration(dict(RoomConfig))
RoomConfig.reset()
else:
# Apply general policy
config = Configuration(dict(RoomConfig))
return config
diff --git a/sylk/applications/webrtcgateway/models/sylkrtc.py b/sylk/applications/webrtcgateway/models/sylkrtc.py
index 1ff432b..934ac42 100644
--- a/sylk/applications/webrtcgateway/models/sylkrtc.py
+++ b/sylk/applications/webrtcgateway/models/sylkrtc.py
@@ -1,191 +1,197 @@
__all__ = ['AccountAddRequest', 'AccountRemoveRequest', 'AccountRegisterRequest', 'AccountUnregisterRequest',
'SessionCreateRequest', 'SessionAnswerRequest', 'SessionTrickleRequest', 'SessionTerminateRequest',
'AckResponse', 'ErrorResponse',
'ReadyEvent']
import re
from jsonmodels import models, fields, errors, validators
from sipsimple.core import SIPURI, SIPCoreError
SIP_PREFIX_RE = re.compile('^sips?:')
class DefaultValueField(fields.BaseField):
def __init__(self, value):
self.default_value = value
super(DefaultValueField, self).__init__()
def validate(self, value):
if value != self.default_value:
raise errors.ValidationError('%s doesn\'t match the expected value %s' % (value, self.default_value))
def get_default_value(self):
return self.default_value
def URIValidator(value):
account = SIP_PREFIX_RE.sub('', value)
try:
SIPURI.parse('sip:%s' % account)
except SIPCoreError:
raise errors.ValidationError('invalid account: %s' % value)
def URIListValidator(values):
for item in values:
URIValidator(item)
class OptionsValidator(object):
def __init__(self, options):
self.options = options
def __call__(self, value):
if value not in self.options:
raise errors.ValidationError('invalid option: %s' % value)
# Base models
class SylkRTCRequestBase(models.Base):
transaction = fields.StringField(required=True)
class SylkRTCResponseBase(models.Base):
transaction = fields.StringField(required=True)
# Miscelaneous models
class AckResponse(SylkRTCResponseBase):
sylkrtc = DefaultValueField('ack')
class ErrorResponse(SylkRTCResponseBase):
sylkrtc = DefaultValueField('error')
error = fields.StringField(required=True)
class ICECandidate(models.Base):
candidate = fields.StringField(required=True)
sdpMLineIndex = fields.IntField(required=True)
sdpMid = fields.StringField(required=True)
class ReadyEvent(models.Base):
sylkrtc = DefaultValueField('event')
event = DefaultValueField('ready')
# Account models
class AccountRequestBase(SylkRTCRequestBase):
account = fields.StringField(required=True,
validators=[URIValidator])
class AccountAddRequest(AccountRequestBase):
sylkrtc = DefaultValueField('account-add')
password = fields.StringField(required=True,
validators=[validators.Length(minimum_value=1, maximum_value=9999)])
display_name = fields.StringField(required=False)
user_agent = fields.StringField(required=False)
class AccountRemoveRequest(AccountRequestBase):
sylkrtc = DefaultValueField('account-remove')
class AccountRegisterRequest(AccountRequestBase):
sylkrtc = DefaultValueField('account-register')
class AccountUnregisterRequest(AccountRequestBase):
sylkrtc = DefaultValueField('account-unregister')
+class AccountDeviceTokenRequest(AccountRequestBase):
+ sylkrtc = DefaultValueField('account-devicetoken')
+ old_token = fields.StringField(required=False)
+ new_token = fields.StringField(required=True)
+
+
# Session models
class SessionRequestBase(SylkRTCRequestBase):
session = fields.StringField(required=True)
class SessionCreateRequest(SessionRequestBase):
sylkrtc = DefaultValueField('session-create')
account = fields.StringField(required=True,
validators=[URIValidator])
uri = fields.StringField(required=True,
validators=[URIValidator])
sdp = fields.StringField(required=True)
class SessionAnswerRequest(SessionRequestBase):
sylkrtc = DefaultValueField('session-answer')
sdp = fields.StringField(required=True)
class SessionTrickleRequest(SessionRequestBase):
sylkrtc = DefaultValueField('session-trickle')
candidates = fields.ListField([ICECandidate])
class SessionTerminateRequest(SessionRequestBase):
sylkrtc = DefaultValueField('session-terminate')
# VideoRoom models
class VideoRoomRequestBase(SylkRTCRequestBase):
session = fields.StringField(required=True)
class VideoRoomJoinRequest(VideoRoomRequestBase):
sylkrtc = DefaultValueField('videoroom-join')
account = fields.StringField(required=True,
validators=[URIValidator])
uri = fields.StringField(required=True,
validators=[URIValidator])
sdp = fields.StringField(required=True)
class VideoRoomControlTrickleRequest(models.Base):
# ID for the subscriber session, if specified, otherwise the publisher is considered
session = fields.StringField(required=False)
candidates = fields.ListField([ICECandidate])
class VideoRoomControlFeedAttachRequest(models.Base):
session = fields.StringField(required=True)
publisher = fields.StringField(required=True)
class VideoRoomControlFeedAnswerRequest(models.Base):
session = fields.StringField(required=True)
sdp = fields.StringField(required=True)
class VideoRoomControlFeedDetachRequest(models.Base):
session = fields.StringField(required=True)
class VideoRoomControlInviteParticipantsRequest(models.Base):
participants = fields.ListField([str, unicode], validators=[URIListValidator])
class VideoRoomControlRequest(VideoRoomRequestBase):
sylkrtc = DefaultValueField('videoroom-ctl')
option = fields.StringField(required=True,
validators=[OptionsValidator(['trickle', 'feed-attach', 'feed-answer', 'feed-detach', 'invite-participants'])])
# all other options should have optional fields below, and the application needs to do a little validation
trickle = fields.EmbeddedField(VideoRoomControlTrickleRequest, required=False)
feed_attach = fields.EmbeddedField(VideoRoomControlFeedAttachRequest, required=False)
feed_answer = fields.EmbeddedField(VideoRoomControlFeedAnswerRequest, required=False)
feed_detach = fields.EmbeddedField(VideoRoomControlFeedDetachRequest, required=False)
invite_participants = fields.EmbeddedField(VideoRoomControlInviteParticipantsRequest, required=False)
class VideoRoomTerminateRequest(VideoRoomRequestBase):
sylkrtc = DefaultValueField('videoroom-terminate')
diff --git a/sylk/applications/webrtcgateway/web/admin.py b/sylk/applications/webrtcgateway/web/admin.py
new file mode 100644
index 0000000..19f3d01
--- /dev/null
+++ b/sylk/applications/webrtcgateway/web/admin.py
@@ -0,0 +1,101 @@
+
+import json
+
+from application.python.types import Singleton
+from klein import Klein
+from twisted.internet import reactor
+from twisted.web.server import Site
+
+from sylk.applications.webrtcgateway.configuration import GeneralConfig
+from sylk.applications.webrtcgateway.logger import log
+from sylk.applications.webrtcgateway.web import push
+from sylk.applications.webrtcgateway.web.storage import TokenStorage
+
+__all__ = ['AdminWebHandler']
+
+
+class AuthError(Exception): pass
+
+
+class AdminWebHandler(object):
+ __metaclass__ = Singleton
+
+ app = Klein()
+
+ def __init__(self):
+ self.listener = None
+
+ def start(self):
+ host, port = GeneralConfig.http_management_interface
+ self.listener = reactor.listenTCP(port, Site(self.app.resource()), interface=host)
+ log.msg('Admin web handler started at http://%s:%d' % (host, port))
+
+ def stop(self):
+ if self.listener is not None:
+ self.listener.stopListening()
+ self.listener = None
+
+ # Admin web API
+
+ def _check_auth(self, request):
+ auth_secret = GeneralConfig.http_management_auth_secret
+ if auth_secret:
+ auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None)
+ if not auth_headers or auth_headers[0] != auth_secret:
+ raise AuthError()
+
+ @app.handle_errors(AuthError)
+ def auth_error(self, request, failure):
+ request.setResponseCode(403)
+ return 'Authentication error'
+
+ @app.route('/incoming_session', methods=['POST'])
+ def incoming_session(self, request):
+ self._check_auth(request)
+ request.setHeader('Content-Type', 'application/json')
+ try:
+ data = json.load(request.content)
+ originator = data['originator']
+ destination = data['destination']
+ except Exception, e:
+ return json.dumps({'success': False, 'error': str(e)})
+ else:
+ storage = TokenStorage()
+ tokens = storage[destination]
+ push.incoming_session(originator, destination, tokens)
+ return json.dumps({'success': True})
+
+ @app.route('/missed_session', methods=['POST'])
+ def missed_session(self, request):
+ self._check_auth(request)
+ request.setHeader('Content-Type', 'application/json')
+ try:
+ data = json.load(request.content)
+ originator = data['originator']
+ destination = data['destination']
+ except Exception, e:
+ return json.dumps({'success': False, 'error': str(e)})
+ else:
+ storage = TokenStorage()
+ tokens = storage[destination]
+ push.missed_session(originator, destination, tokens)
+ return json.dumps({'success': True})
+
+ @app.route('/tokens/<string:account>')
+ def get_tokens(self, request, account):
+ self._check_auth(request)
+ request.setHeader('Content-Type', 'application/json')
+ storage = TokenStorage()
+ tokens = storage[account]
+ return json.dumps({'tokens': list(tokens)})
+
+ @app.route('/tokens/<string:account>/<string:token>', methods=['POST', 'DELETE'])
+ def process_token(self, request, account, token):
+ self._check_auth(request)
+ request.setHeader('Content-Type', 'application/json')
+ storage = TokenStorage()
+ if request.method == 'POST':
+ storage.add(account, token)
+ elif request.method == 'DELETE':
+ storage.remove(account, token)
+ return json.dumps({'success': True})
diff --git a/sylk/applications/webrtcgateway/web/factory.py b/sylk/applications/webrtcgateway/web/factory.py
index 52c4be3..317a5d4 100644
--- a/sylk/applications/webrtcgateway/web/factory.py
+++ b/sylk/applications/webrtcgateway/web/factory.py
@@ -1,79 +1,80 @@
import weakref
from application.notification import IObserver, NotificationCenter
from application.python import Null
+from application.python.types import Singleton
from autobahn.twisted.websocket import WebSocketServerFactory
from zope.interface import implements
from sylk.applications.webrtcgateway.web.protocol import SylkWebSocketServerProtocol, SYLK_WS_PROTOCOL
class VideoRoomContainer(object):
def __init__(self):
self._rooms = set()
self._uri_map = weakref.WeakValueDictionary()
self._id_map = weakref.WeakValueDictionary()
def add(self, room):
self._rooms.add(room)
self._uri_map[room.uri] = room
self._id_map[room.id] = room
def remove(self, value):
if isinstance(value, int):
room = self._id_map.get(value, None)
elif isinstance(value, basestring):
room = self._uri_map.get(value, None)
else:
room = value
self._rooms.discard(room)
def clear(self):
self._rooms.clear()
def __len__(self):
return len(self._rooms)
def __iter__(self):
return iter(self._rooms)
def __getitem__(self, item):
if isinstance(item, int):
return self._id_map[item]
elif isinstance(item, basestring):
return self._uri_map[item]
else:
raise KeyError('%s not found' % item)
def __contains__(self, item):
return item in self._id_map or item in self._uri_map or item in self._rooms
class SylkWebSocketServerFactory(WebSocketServerFactory):
implements(IObserver)
protocol = SylkWebSocketServerProtocol
connections = set()
videorooms = VideoRoomContainer()
backend = None # assigned by WebHandler
def __init__(self, *args, **kw):
super(SylkWebSocketServerFactory, self).__init__(*args, **kw)
notification_center = NotificationCenter()
notification_center.add_observer(self, name='JanusBackendDisconnected')
def buildProtocol(self, addr):
protocol = self.protocol()
protocol.factory = self
protocol.backend = self.backend
return protocol
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_JanusBackendDisconnected(self, notification):
for conn in self.connections.copy():
conn.dropConnection(abort=True)
self.videorooms.clear()
diff --git a/sylk/applications/webrtcgateway/web/handler.py b/sylk/applications/webrtcgateway/web/handler.py
index 381c583..611befd 100644
--- a/sylk/applications/webrtcgateway/web/handler.py
+++ b/sylk/applications/webrtcgateway/web/handler.py
@@ -1,1311 +1,1338 @@
import json
import os
import random
import re
import uuid
import weakref
from application.python import Null
from application.system import makedirs
from eventlib import coros, proc
from eventlib.twistedutil import block_on
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading.green import run_in_green_thread
from sipsimple.util import ISOTimestamp
from twisted.internet import reactor
from sylk.applications.webrtcgateway.configuration import GeneralConfig, get_room_config
from sylk.applications.webrtcgateway.logger import log
from sylk.applications.webrtcgateway.models import sylkrtc
from sylk.applications.webrtcgateway.util import GreenEvent
+from sylk.applications.webrtcgateway.web.storage import TokenStorage
SIP_PREFIX_RE = re.compile('^sips?:')
class ACLValidationError(Exception): pass
sylkrtc_models = {
# account management
'account-add' : sylkrtc.AccountAddRequest,
'account-remove' : sylkrtc.AccountRemoveRequest,
'account-register' : sylkrtc.AccountRegisterRequest,
'account-unregister' : sylkrtc.AccountUnregisterRequest,
+ 'account-devicetoken' : sylkrtc.AccountDeviceTokenRequest,
# session management
'session-create' : sylkrtc.SessionCreateRequest,
'session-answer' : sylkrtc.SessionAnswerRequest,
'session-trickle' : sylkrtc.SessionTrickleRequest,
'session-terminate' : sylkrtc.SessionTerminateRequest,
# video conference management
'videoroom-join' : sylkrtc.VideoRoomJoinRequest,
'videoroom-ctl' : sylkrtc.VideoRoomControlRequest,
'videoroom-terminate' : sylkrtc.VideoRoomTerminateRequest,
}
class AccountInfo(object):
def __init__(self, id, password, display_name=None, user_agent=None):
self.id = id
self.password = password
self.display_name = display_name
self.user_agent = user_agent
self.registration_state = None
self.janus_handle_id = None
@property
def uri(self):
return 'sip:%s' % self.id
class SessionPartyIdentity(object):
def __init__(self, uri, display_name=''):
self.uri = uri
self.display_name = display_name
class SIPSessionInfo(object):
def __init__(self, id):
self.id = id
self.direction = None
self.state = None
self.account_id = None
self.local_identity = None # instance of SessionPartyIdentity
self.remote_identity = None # instance of SessionPartyIdentity
self.janus_handle_id = None
self.ice_media_negotiation_started = False
self.ice_media_negotiation_ended = False
def init_outgoing(self, account_id, destination):
self.account_id = account_id
self.direction = 'outgoing'
self.state = 'connecting'
self.local_identity = SessionPartyIdentity(account_id)
self.remote_identity = SessionPartyIdentity(destination)
def init_incoming(self, account_id, originator, originator_display_name=''):
self.account_id = account_id
self.direction = 'incoming'
self.state = 'connecting'
self.local_identity = SessionPartyIdentity(account_id)
self.remote_identity = SessionPartyIdentity(originator, originator_display_name)
class VideoRoom(object):
def __init__(self, uri):
self.config = get_room_config(uri)
self.uri = uri
self.record = self.config.record
self.rec_dir = os.path.join(GeneralConfig.recording_dir, '%s/' % uri)
self.id = random.getrandbits(32) # janus needs numeric room names
self.destroyed = False
self._session_id_map = weakref.WeakValueDictionary()
self._publisher_id_map = weakref.WeakValueDictionary()
if self.record:
makedirs(self.rec_dir, 0755)
log.msg('Video room %s created with recording in %s' % (self.uri, self.rec_dir))
else:
log.msg('Video room %s created without recording' % self.uri)
def add(self, session):
assert session.publisher_id is not None
assert session.id not in self._session_id_map
assert session.publisher_id not in self._publisher_id_map
self._session_id_map[session.id] = session
self._publisher_id_map[session.publisher_id] = session
log.msg('Video room %s: added session %s for %s' % (self.uri, session.id, session.account_id))
def __getitem__(self, key):
try:
return self._session_id_map[key]
except KeyError:
return self._publisher_id_map[key]
def __len__(self):
return len(self._session_id_map)
class VideoRoomSessionInfo(object):
def __init__(self, id):
self.id = id
self.account_id = None
self.type = None # publisher / subscriber
self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers
self.janus_handle_id = None
self.room = None
self.parent_session = None
self.feeds = {} # janus publisher ID -> our publisher ID
self.ice_media_negotiation_started = False
self.ice_media_negotiation_ended = False
def initialize(self, account_id, type, room):
assert type in ('publisher', 'subscriber')
self.account_id = account_id
self.type = type
self.room = room
log.msg('Video room %s: new session %s initialized by %s' % (self.room.uri, self.id, self.account_id))
def __repr__(self):
return '<%s: id=%s janus_handle_id=%s type=%s>' % (self.__class__.__name__, self.id, self.janus_handle_id, self.type)
class VideoRoomSessionContainer(object):
def __init__(self):
self._sessions = set()
self._janus_handle_map = weakref.WeakValueDictionary()
self._id_map = weakref.WeakValueDictionary()
def add(self, session):
assert session not in self._sessions
assert session.janus_handle_id not in self._janus_handle_map
assert session.id not in self._id_map
self._sessions.add(session)
self._janus_handle_map[session.janus_handle_id] = session
self._id_map[session.id] = session
def remove(self, session):
self._sessions.discard(session)
def count(self):
return len(self._sessions)
def clear(self):
self._sessions.clear()
def __len__(self):
return len(self._sessions)
def __iter__(self):
return iter(self._sessions)
def __getitem__(self, key):
try:
return self._id_map[key]
except KeyError:
return self._janus_handle_map[key]
def __contains__(self, item):
return item in self._id_map or item in self._janus_handle_map or item in self._sessions
class Operation(object):
__slots__ = ('name', 'data')
def __init__(self, name, data):
self.name = name
self.data = data
class APIError(Exception):
pass
class ConnectionHandler(object):
def __init__(self, protocol):
self.protocol = protocol
self.janus_session_id = None
self.accounts_map = {} # account ID -> account
self.account_handles_map = {} # Janus handle ID -> account
self.sessions_map = {} # session ID -> session
self.session_handles_map = {} # Janus handle ID -> session
self.videoroom_sessions = VideoRoomSessionContainer() # session ID / janus handle ID -> session
self.ready_event = GreenEvent()
self.resolver = DNSLookup()
self.proc = proc.spawn(self._operations_handler)
self.operations_queue = coros.queue()
@property
def end_point_address(self):
return self.protocol.peer
def start(self):
self._create_janus_session()
def stop(self):
if self.ready_event.is_set():
assert self.janus_session_id is not None
self.protocol.backend.janus_stop_keepalive(self.janus_session_id)
self.protocol.backend.janus_destroy_session(self.janus_session_id)
if self.proc is not None:
self.proc.kill()
self.proc = None
# cleanup
self.ready_event.clear()
self.accounts_map.clear()
self.account_handles_map.clear()
self.sessions_map.clear()
self.session_handles_map.clear()
self.videoroom_sessions.clear()
self.janus_session_id = None
self.protocol = None
def handle_message(self, data):
try:
request_type = data.pop('sylkrtc')
except KeyError:
log.warn('Error getting WebSocket message type')
return
self.ready_event.wait()
try:
model = sylkrtc_models[request_type]
except KeyError:
log.warn('Unknown request type: %s' % request_type)
return
request = model(**data)
try:
request.validate()
except Exception, e:
log.error('%s: %s' % (request_type, e))
if request.transaction:
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
return
op = Operation(request_type, request)
self.operations_queue.send(op)
def validate_acl(self, room_uri, from_uri):
cfg = get_room_config(room_uri)
if cfg.access_policy == 'allow,deny':
if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri):
return
raise ACLValidationError
else:
if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri):
raise ACLValidationError
# internal methods (not overriding / implementing the protocol API)
def _send_response(self, response):
response.validate()
self._send_data(json.dumps(response.to_struct()))
def _send_data(self, data):
if GeneralConfig.trace_websocket:
self.protocol.factory.ws_logger.msg("OUT", ISOTimestamp.now(), data)
self.protocol.sendMessage(data, False)
def _cleanup_session(self, session):
@run_in_green_thread
def do_cleanup():
if self.janus_session_id is None:
# The connection was closed, there is noting to do here
return
self.sessions_map.pop(session.id)
if session.direction == 'outgoing':
# Destroy plugin handle for outgoing sessions. For incoming ones it's the
# same as the account handle, so don't
block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None)
self.session_handles_map.pop(session.janus_handle_id)
# give it some time to receive other hangup events
reactor.callLater(2, do_cleanup)
def _cleanup_videoroom_session(self, session):
@run_in_green_thread
def do_cleanup():
if self.janus_session_id is None:
# The connection was closed, there is noting to do here
return
if session in self.videoroom_sessions:
self.videoroom_sessions.remove(session)
block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None)
# give it some time to receive other hangup events
reactor.callLater(2, do_cleanup)
def _maybe_destroy_videoroom(self, videoroom):
if videoroom is None:
return
@run_in_green_thread
def f():
if self.protocol is None:
# The connection was closed
return
# destroy the room if empty
if not videoroom and not videoroom.destroyed:
videoroom.destroyed = True
self.protocol.factory.videorooms.remove(videoroom)
# create a handle to do the cleanup
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
data = {'request': 'destroy',
'room': videoroom.id}
try:
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except Exception:
pass
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
log.msg('Video room %s destroyed' % videoroom.uri)
# don't destroy it immediately
reactor.callLater(5, f)
@run_in_green_thread
def _create_janus_session(self):
if self.ready_event.is_set():
self._send_response(sylkrtc.ReadyEvent())
return
try:
self.janus_session_id = block_on(self.protocol.backend.janus_create_session())
self.protocol.backend.janus_start_keepalive(self.janus_session_id)
except Exception, e:
log.warn('Error creating session, disconnecting: %s' % e)
self.protocol.disconnect(3000, unicode(e))
return
self._send_response(sylkrtc.ReadyEvent())
self.ready_event.set()
def _lookup_sip_proxy(self, account):
sip_uri = SIPURI.parse('sip:%s' % account)
# The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed
# as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use
# caching to avoid long delays, we randomize the results matching the highest priority route's
# transport.
proxy = GeneralConfig.outbound_sip_proxy
if proxy is not None:
proxy_uri = SIPURI(host=proxy.host,
port=proxy.port,
parameters={'transport': proxy.transport})
else:
proxy_uri = SIPURI(host=sip_uri.host)
settings = SIPSimpleSettings()
routes = self.resolver.lookup_sip_proxy(proxy_uri, settings.sip.transport_list).wait()
if not routes:
raise DNSLookupError('no results found')
# Get all routes with the highest priority transport and randomly pick one
route = random.choice([r for r in routes if r.transport == routes[0].transport])
# Build a proxy URI Sofia-SIP likes
return '%s:%s:%d%s' % ('sips' if route.transport == 'tls' else 'sip',
route.address,
route.port,
';transport=%s' % route.transport if route.transport != 'tls' else '')
def _handle_conference_invite(self, originator, room_uri, participants):
for p in participants:
try:
account_info = self.accounts_map[p]
except KeyError:
continue
data = dict(sylkrtc='account_event',
account=account_info.id,
event='conference_invite',
data=dict(originator=dict(uri=originator.id, display_name=originator.display_name),
room=room_uri))
log.msg('Video room %s: invitation from %s to %s' % (room_uri, originator.id, account_info.id))
self._send_data(json.dumps(data))
def _handle_janus_event_sip(self, handle_id, event_type, event):
# TODO: use a model
op = Operation('janus-event-sip', data=dict(handle_id=handle_id, event_type=event_type, event=event))
self.operations_queue.send(op)
def _handle_janus_event_videoroom(self, handle_id, event_type, event):
# TODO: use a model
op = Operation('janus-event-videoroom', data=dict(handle_id=handle_id, event_type=event_type, event=event))
self.operations_queue.send(op)
def _operations_handler(self):
while True:
op = self.operations_queue.wait()
handler = getattr(self, '_OH_%s' % op.name.replace('-', '_'), Null)
try:
handler(op.data)
except Exception:
log.exception('Unhandled exception in operation "%s"' % op.name)
del op, handler
def _OH_account_add(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
password = request.password
display_name = request.display_name
user_agent = request.user_agent
try:
if account in self.accounts_map:
raise APIError('Account %s already added' % account)
# check if domain is acceptable
domain = account.partition('@')[2]
if not {'*', domain}.intersection(GeneralConfig.sip_domains):
raise APIError('SIP domain not allowed: %s' % domain)
# Create and store our mapping
account_info = AccountInfo(account, password, display_name, user_agent)
self.accounts_map[account_info.id] = account_info
except APIError, e:
log.error('account_add: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s added using %s at %s' % (account, user_agent, self.end_point_address))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_remove(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
try:
try:
account_info = self.accounts_map.pop(account)
except KeyError:
raise APIError('Unknown account specified: %s' % account)
handle_id = account_info.janus_handle_id
if handle_id is not None:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
self.account_handles_map.pop(handle_id)
except APIError, e:
log.error('account_remove: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s removed' % account)
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_register(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
try:
proxy = self._lookup_sip_proxy(account)
except DNSLookupError:
raise APIError('DNS lookup error')
handle_id = account_info.janus_handle_id
if handle_id is not None:
# Destroy the existing plugin handle
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
self.account_handles_map.pop(handle_id)
account_info.janus_handle_id = None
# Create a plugin handle
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip)
account_info.janus_handle_id = handle_id
self.account_handles_map[handle_id] = account_info
data = {'request': 'register',
'username': account_info.uri,
'display_name': account_info.display_name,
'user_agent': account_info.user_agent,
'ha1_secret': account_info.password,
'proxy': proxy}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except APIError, e:
log.error('account-register: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s will register using %s' % (account, account_info.user_agent))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_unregister(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
handle_id = account_info.janus_handle_id
if handle_id is not None:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
account_info.janus_handle_id = None
self.account_handles_map.pop(handle_id)
except APIError, e:
log.error('account-unregister: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s will unregister' % account)
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
+ def _OH_account_devicetoken(self, request):
+ # extract the fields to avoid going through the descriptor several times
+ account = request.account
+ old_token = request.old_token
+ new_token = request.new_token
+
+ try:
+ try:
+ account_info = self.accounts_map[account]
+ except KeyError:
+ raise APIError('Unknown account specified: %s' % account)
+
+ storage = TokenStorage()
+ if old_token is not None:
+ storage.remove(account, old_token)
+ log.msg('Removed device token %s... for account %s', (old_token[:5], account))
+ if new_token is not None:
+ storage.add(account, new_token)
+ log.msg('Added device token %s... for account %s', (new_token[:5], account))
+ except APIError, e:
+ log.error('account-devicetoken: %s' % e)
+ self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
+ else:
+ self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
+
def _OH_session_create(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
session = request.session
uri = request.uri
sdp = request.sdp
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
if session in self.sessions_map:
raise APIError('Session ID (%s) already in use' % session)
# Create a new plugin handle and 'register' it, without actually doing so
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip)
try:
proxy = self._lookup_sip_proxy(account_info.id)
except DNSLookupError:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
raise APIError('DNS lookup error')
data = {'request': 'register',
'username': account_info.uri,
'display_name': account_info.display_name,
'user_agent': account_info.user_agent,
'ha1_secret': account_info.password,
'proxy': proxy,
'send_register': False}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
session_info = SIPSessionInfo(session)
session_info.janus_handle_id = handle_id
session_info.init_outgoing(account, uri)
# TODO: create a "SessionContainer" object combining the 2
self.sessions_map[session_info.id] = session_info
self.session_handles_map[handle_id] = session_info
data = {'request': 'call', 'uri': 'sip:%s' % SIP_PREFIX_RE.sub('', uri), 'srtp': 'sdes_optional'}
jsep = {'type': 'offer', 'sdp': sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep))
except APIError, e:
log.error('session-create: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Outgoing session %s from %s to %s created using %s from %s' % (session, account, uri, account_info.user_agent, self.end_point_address))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_answer(self, request):
# extract the fields to avoid going through the descriptor several times
session = request.session
sdp = request.sdp
try:
try:
session_info = self.sessions_map[session]
except KeyError:
raise APIError('Unknown session specified: %s' % session)
if session_info.direction != 'incoming':
raise APIError('Cannot answer outgoing session')
if session_info.state != 'connecting':
raise APIError('Invalid state for session answer')
data = {'request': 'accept'}
jsep = {'type': 'answer', 'sdp': sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data, jsep))
except APIError, e:
log.error('session-answer: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('%s answered session %s' % (session_info.account_id, session))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_trickle(self, request):
# extract the fields to avoid going through the descriptor several times
session = request.session
candidates = [c.to_struct() for c in request.candidates]
try:
try:
session_info = self.sessions_map[session]
except KeyError:
raise APIError('Unknown session specified: %s' % session)
if session_info.state == 'terminated':
raise APIError('Session is terminated')
try:
account_info = self.accounts_map[session_info.account_id]
except KeyError:
raise APIError('Unknown account specified: %s' % session_info.account_id)
block_on(self.protocol.backend.janus_trickle(self.janus_session_id, session_info.janus_handle_id, candidates))
except APIError, e:
#log.error('session-trickle: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
if candidates:
if not session_info.ice_media_negotiation_started:
log.msg('Session %s: ICE negotiation started by %s using %s' % (session_info.id, session_info.account_id, account_info.user_agent))
session_info.ice_media_negotiation_started = True
else:
log.msg('Session %s: ICE negotiation ended by %s using %s' % (session_info.id, session_info.account_id, account_info.user_agent))
session_info.ice_media_negotiation_ended = True
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_terminate(self, request):
# extract the fields to avoid going through the descriptor several times
session = request.session
try:
try:
session_info = self.sessions_map[session]
except KeyError:
raise APIError('Unknown session specified: %s' % session)
if session_info.state not in ('connecting', 'progress', 'accepted', 'established'):
raise APIError('Invalid state for session terminate: \"%s\"' % session_info.state)
if session_info.direction == 'incoming' and session_info.state == 'connecting':
data = {'request': 'decline', 'code': 486}
else:
data = {'request': 'hangup'}
block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data))
except APIError, e:
log.error('session-terminate: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('%s terminated session %s' % (session_info.account_id, session))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_videoroom_join(self, request):
account = request.account
session = request.session
uri = request.uri
sdp = request.sdp
videoroom = None
try:
try:
self.validate_acl(uri, account)
except ACLValidationError:
raise APIError('%s is not allowed to join room %s' % (account, uri))
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
if session in self.videoroom_sessions:
raise APIError('Video room session ID (%s) already in use' % session)
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
# create the room if it doesn't exist
try:
videoroom = self.protocol.factory.videorooms[uri]
except KeyError:
videoroom = VideoRoom(uri)
self.protocol.factory.videorooms.add(videoroom)
data = {'request': 'create',
'room': videoroom.id,
'publishers': 10,
'record': videoroom.record,
'rec_dir': videoroom.rec_dir
}
try:
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except Exception, e:
code = getattr(e, 'code', -1)
if code != 427: # 417 == room exists
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
raise APIError(str(e))
# join the room
data = {'request': 'joinandconfigure',
'room': videoroom.id,
'ptype': 'publisher',
'audio': True,
'video': True
}
if account_info.display_name:
data['display'] = account_info.display_name
jsep = {'type': 'offer', 'sdp': sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep))
videoroom_session = VideoRoomSessionInfo(session)
videoroom_session.janus_handle_id = handle_id
videoroom_session.initialize(account, 'publisher', videoroom)
self.videoroom_sessions.add(videoroom_session)
except APIError, e:
log.error('videoroom-join: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
self._maybe_destroy_videoroom(videoroom)
else:
log.msg('Video room %s: joined by %s using %s (%d participants present) from %s' % (videoroom.uri, account, account_info.user_agent, self.videoroom_sessions.count(), self.end_point_address))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='progress'))
self._send_data(json.dumps(data))
def _OH_videoroom_ctl(self, request):
if request.option == 'trickle':
trickle = request.trickle
if not trickle:
log.error('videoroom-ctl: missing field')
return
candidates = [c.to_struct() for c in trickle.candidates]
session = trickle.session or request.session
try:
try:
videoroom_session = self.videoroom_sessions[session]
except KeyError:
raise APIError('trickle: unknown video room session ID specified: %s' % session)
try:
account_info = self.accounts_map[videoroom_session.account_id]
except KeyError:
raise APIError('Unknown account specified: %s' % videoroom_session.account_id)
block_on(self.protocol.backend.janus_trickle(self.janus_session_id, videoroom_session.janus_handle_id, candidates))
except APIError, e:
#log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
if candidates:
if not videoroom_session.ice_media_negotiation_started:
log.msg('Video room %s: ICE negotiation started by %s using %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
videoroom_session.ice_media_negotiation_started = True
else:
log.msg('Video room %s: ICE negotiation ended by %s using %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
videoroom_session.ice_media_negotiation_ended = True
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
elif request.option == 'feed-attach':
feed_attach = request.feed_attach
if not feed_attach:
log.error('videoroom-ctl: missing field')
return
try:
if feed_attach.session in self.videoroom_sessions:
raise APIError('feed-attach: video room session ID (%s) already in use' % feed_attach.session)
# get the 'base' session, the one used to join and publish
try:
base_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('feed-attach: unknown video room session ID specified: %s' % request.session)
# get the publisher's session
try:
publisher_session = base_session.room[feed_attach.publisher]
except KeyError:
raise APIError('feed-attach: unknown publisher video room session ID specified: %s' % feed_attach.publisher)
if publisher_session.publisher_id is None:
raise APIError('feed-attach: video room session ID does not have a publisher ID' % feed_attach.publisher)
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
# join the room as a listener
data = {'request': 'join',
'room': base_session.room.id,
'ptype': 'listener',
'feed': publisher_session.publisher_id}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
videoroom_session = VideoRoomSessionInfo(feed_attach.session)
videoroom_session.janus_handle_id = handle_id
videoroom_session.parent_session = base_session
videoroom_session.publisher_id = publisher_session.id
videoroom_session.initialize(base_session.account_id, 'subscriber', base_session.room)
self.videoroom_sessions.add(videoroom_session)
base_session.feeds[publisher_session.publisher_id] = publisher_session.id
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Video room %s: %s attached to %s' % (base_session.room.uri, base_session.account_id, feed_attach.publisher))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
elif request.option == 'feed-answer':
feed_answer = request.feed_answer
if not feed_answer:
log.error('videoroom-ctl: missing field')
return
try:
try:
videoroom_session = self.videoroom_sessions[request.feed_answer.session]
except KeyError:
raise APIError('feed-answer: unknown video room session ID specified: %s' % feed_answer.session)
data = {'request': 'start',
'room': videoroom_session.room.id}
jsep = {'type': 'answer', 'sdp': feed_answer.sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data, jsep))
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
elif request.option == 'feed-detach':
feed_detach = request.feed_detach
if not feed_detach:
log.error('videoroom-ctl: missing field')
return
try:
try:
base_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('feed-detach: unknown video room session ID specified: %s' % request.session)
try:
videoroom_session = self.videoroom_sessions[feed_detach.session]
except KeyError:
raise APIError('feed-detach: unknown video room session ID specified: %s' % feed_detach.session)
data = {'request': 'leave'}
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data))
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
block_on(self.protocol.backend.janus_detach(self.janus_session_id, videoroom_session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(videoroom_session.janus_handle_id, None)
self.videoroom_sessions.remove(videoroom_session)
try:
janus_publisher_id = next(k for k, v in base_session.feeds.iteritems() if v == videoroom_session.publisher_id)
except StopIteration:
pass
else:
base_session.feeds.pop(janus_publisher_id)
elif request.option == 'invite-participants':
invite_participants = request.invite_participants
if not invite_participants:
log.error('videoroom-ctl: missing field')
return
try:
try:
base_session = self.videoroom_sessions[request.session]
account_info = self.accounts_map[base_session.account_id]
except KeyError:
raise APIError('invite-participants: unknown video room session ID specified: %s' % request.session)
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
for conn in self.protocol.factory.connections.difference([self]):
if conn.connection_handler:
conn.connection_handler._handle_conference_invite(account_info, base_session.room.uri, invite_participants.participants)
else:
log.error('videoroom-ctl: unsupported option: %s' % request.option)
def _OH_videoroom_terminate(self, request):
session = request.session
try:
try:
videoroom_session = self.videoroom_sessions[session]
except KeyError:
raise APIError('Unknown video room session ID specified: %s' % session)
data = {'request': 'leave'}
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data))
except APIError, e:
log.error('videoroom-terminate: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Video room %s: %s left the room (%d participants present)' % (videoroom_session.room.uri, videoroom_session.account_id, self.videoroom_sessions.count()))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='terminated'))
self._send_data(json.dumps(data))
self._cleanup_videoroom_session(videoroom_session)
self._maybe_destroy_videoroom(videoroom_session.room)
# Event handlers
def _OH_janus_event_sip(self, data):
handle_id = data['handle_id']
event_type = data['event_type']
event = data['event']
if event_type == 'event':
self._janus_event_plugin_sip(data)
elif event_type == 'webrtcup':
try:
session_info = self.session_handles_map[handle_id]
except KeyError:
log.msg('Could not find session for handle ID %s' % handle_id)
return
session_info.state = 'established'
data = dict(sylkrtc='session_event',
session=session_info.id,
event='state',
data=dict(state=session_info.state))
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s state: %s' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
session_info.state))
# TODO: SessionEvent model
self._send_data(json.dumps(data))
elif event_type == 'hangup':
try:
session_info = self.session_handles_map[handle_id]
except KeyError:
log.msg('Could not find session for handle ID %s' % handle_id)
return
if session_info.state != 'terminated':
session_info.state = 'terminated'
code = event.get('code', 0)
reason = event.get('reason', 'Unknown')
reason = '%d %s' % (code, reason)
data = dict(sylkrtc='session_event',
session=session_info.id,
event='state',
data=dict(state=session_info.state, reason=reason))
# TODO: SessionEvent model
self._send_data(json.dumps(data))
self._cleanup_session(session_info)
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s terminated (%s)' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
reason))
elif event_type in ('media', 'detached'):
# ignore
pass
elif event_type == 'slowlink':
# TODO something
pass
else:
log.warn('Received unexpected event type: %s' % event_type)
def _janus_event_plugin_sip(self, data):
handle_id = data['handle_id']
event = data['event']
plugin_data = event['plugindata']
assert(plugin_data['plugin'] == 'janus.plugin.sip')
event_data = event['plugindata']['data']
assert(event_data.get('sip', '') == 'event')
if 'result' not in event_data:
log.warn('Unexpected event: %s' % event)
return
event_data = event_data['result']
jsep = event.get('jsep', None)
event_type = event_data['event']
if event_type in ('registering', 'registered', 'registration_failed', 'incomingcall'):
# skip 'registered' events from session handles
if event_type == 'registered' and event_data['register_sent'] in (False, 'false'):
return
# account event
try:
account_info = self.account_handles_map[handle_id]
except KeyError:
log.warn('Could not find account for handle ID %s' % handle_id)
return
if event_type == 'incomingcall':
originator_uri = SIP_PREFIX_RE.sub('', event_data['username'])
originator_display_name = event_data.get('displayname', '').replace('"', '')
jsep = event.get('jsep', None)
assert jsep is not None
session_id = uuid.uuid4().hex
session = SIPSessionInfo(session_id)
session.janus_handle_id = handle_id
session.init_incoming(account_info.id, originator_uri, originator_display_name)
self.sessions_map[session_id] = session
self.session_handles_map[handle_id] = session
data = dict(sylkrtc='account_event',
account=account_info.id,
session=session_id,
event='incoming_session',
data=dict(originator=session.remote_identity.__dict__, sdp=jsep['sdp']))
log.msg('Incoming session %s from %s to %s created' % (session.id,
session.remote_identity.uri,
session.local_identity.uri))
else:
registration_state = event_type
if registration_state == 'registration_failed':
registration_state = 'failed'
if account_info.registration_state == registration_state:
return
account_info.registration_state = registration_state
registration_data = dict(state=registration_state)
if registration_state == 'failed':
code = event_data['code']
reason = event_data['reason']
registration_data['reason'] = '%d %s' % (code, reason)
log.msg('Account %s registration failed: %s (%s)' % (account_info.id, code, reason))
elif registration_state == 'registered':
log.msg('Account %s registered using %s from %s' % (account_info.id, account_info.user_agent, self.end_point_address))
data = dict(sylkrtc='account_event',
account=account_info.id,
event='registration_state',
data=registration_data)
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type in ('calling', 'accepted', 'hangup'):
# session event
try:
session_info = self.session_handles_map[handle_id]
except KeyError:
log.warn('Could not find session for handle ID %s' % handle_id)
return
if event_type == 'hangup' and session_info.state == 'terminated':
return
if event_type == 'calling':
session_info.state = 'progress'
elif event_type == 'accepted':
session_info.state = 'accepted'
elif event_type == 'hangup':
session_info.state = 'terminated'
data = dict(sylkrtc='session_event',
session=session_info.id,
event='state',
data=dict(state=session_info.state))
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s state: %s' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
session_info.state))
if session_info.state == 'accepted' and session_info.direction == 'outgoing':
assert jsep is not None
data['data']['sdp'] = jsep['sdp']
elif session_info.state == 'terminated':
code = event_data.get('code', 0)
reason = event_data.get('reason', 'Unknown')
reason = '%d %s' % (code, reason)
data['data']['reason'] = reason
# TODO: SessionEvent model
self._send_data(json.dumps(data))
if session_info.state == 'terminated':
self._cleanup_session(session_info)
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s terminated (%s)' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
reason))
# check if missed incoming call
if session_info.direction == 'incoming' and code == 487:
data = dict(sylkrtc='account_event',
account=session_info.account_id,
event='missed_session',
data=dict(originator=session_info.remote_identity.__dict__))
log.msg('Incoming session from %s to %s was not answered ' % (session_info.remote_identity.uri, session_info.local_identity.uri))
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type == 'missed_call':
try:
account_info = self.account_handles_map[handle_id]
except KeyError:
log.warn('Could not find account for handle ID %s' % handle_id)
return
originator_uri = SIP_PREFIX_RE.sub('', event_data['caller'])
originator_display_name = event_data.get('displayname', '').replace('"', '')
# We have no session, so create an identity object by hand
originator = SessionPartyIdentity(originator_uri, originator_display_name)
data = dict(sylkrtc='account_event',
account=account_info.id,
event='missed_session',
data=dict(originator=originator.__dict__))
log.msg('Incoming session from %s missed' % originator.uri)
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type in ('ack', 'declining', 'hangingup', 'proceeding'):
# ignore
pass
else:
log.warn('Unexpected SIP plugin event type: %s' % event_type)
def _OH_janus_event_videoroom(self, data):
handle_id = data['handle_id']
event_type = data['event_type']
if event_type == 'event':
self._janus_event_plugin_videoroom(data)
elif event_type == 'webrtcup':
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find videoroom session for handle ID %s' % handle_id)
return
base_session = videoroom_session.parent_session
if base_session is None:
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='established'))
else:
# this is a subscriber session
data = dict(sylkrtc='videoroom_event',
session=base_session.id,
event='feed_established',
data=dict(state='established', subscription=videoroom_session.id))
log.msg('Video room %s: session established to %s' % (videoroom_session.room.uri, videoroom_session.account_id))
self._send_data(json.dumps(data))
elif event_type == 'hangup':
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find video room session for handle ID %s' % handle_id)
return
log.msg('Video room %s: session terminated to %s' % (videoroom_session.room.uri, videoroom_session.account_id))
self._cleanup_videoroom_session(videoroom_session)
self._maybe_destroy_videoroom(videoroom_session.room)
elif event_type in ('media', 'detached'):
# ignore
pass
elif event_type == 'slowlink':
try:
videoroom_session = (session_info for session_info in self.videoroom_sessions if session_info.janus_handle_id == handle_id).next()
except StopIteration:
log.warn('Could not find video room session for Janus handle ID %s' % handle_id)
else:
try:
account_info = self.accounts_map[videoroom_session.account_id]
except KeyError:
raise APIError('Unknown account specified: %s' % videoroom_session.account_id)
try:
uplink = data['event']['uplink']
except KeyError:
log.warn('Could not find uplink in slowlink event data')
else:
if uplink:
log.msg('Video room %s: %s has poor download connection on %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
else:
log.msg('Video room %s: %s has poor upload connection on %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
else:
log.warn('Received unexpected event type %s: data=%s' % (event_type, data))
def _janus_event_plugin_videoroom(self, data):
handle_id = data['handle_id']
event = data['event']
plugin_data = event['plugindata']
assert(plugin_data['plugin'] == 'janus.plugin.videoroom')
event_data = event['plugindata']['data']
assert 'videoroom' in event_data
event_type = event_data['videoroom']
if event_type == 'joined':
# a join request succeeded, this is a publisher
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find video room session for handle ID %s' % handle_id)
return
room = videoroom_session.room
videoroom_session.publisher_id = event_data['id']
room.add(videoroom_session)
jsep = event.get('jsep', None)
assert jsep is not None
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='accepted', sdp=jsep['sdp']))
self._send_data(json.dumps(data))
# send information about existing publishers
publishers = []
for p in event_data['publishers']:
publisher_id = p['id']
publisher_display = p.get('display', '')
try:
publisher_session = room[publisher_id]
except KeyError:
log.warn('Could not find matching session for publisher %s' % publisher_id)
continue
item = {
'id': publisher_session.id,
'uri': publisher_session.account_id,
'display_name': publisher_display,
}
publishers.append(item)
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='initial_publishers',
data=dict(publishers=publishers))
self._send_data(json.dumps(data))
elif event_type == 'event':
if 'publishers' in event_data:
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find videoroom session for handle ID %s' % handle_id)
return
room = videoroom_session.room
# send information about new publishers
publishers = []
for p in event_data['publishers']:
publisher_id = p['id']
publisher_display = p.get('display', '')
try:
publisher_session = room[publisher_id]
except KeyError:
log.warn('Could not find matching session for publisher %s' % publisher_id)
continue
item = {
'id': publisher_session.id,
'uri': publisher_session.account_id,
'display_name': publisher_display,
}
publishers.append(item)
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='publishers_joined',
data=dict(publishers=publishers))
self._send_data(json.dumps(data))
elif 'leaving' in event_data:
try:
base_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find video room session for handle ID %s' % handle_id)
return
janus_publisher_id = event_data['leaving']
try:
publisher_id = base_session.feeds.pop(janus_publisher_id)
except KeyError:
return
data = dict(sylkrtc='videoroom_event',
session=base_session.id,
event='publishers_left',
data=dict(publishers=[publisher_id]))
self._send_data(json.dumps(data))
elif {'started', 'unpublished', 'left'}.intersection(event_data):
# ignore
pass
else:
log.warn('Received unexpected plugin "event" event')
elif event_type == 'attached':
# sent when a feed is subscribed for a given publisher
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find videoroom session for handle ID %s' % handle_id)
return
# get the session which originated the subscription
base_session = videoroom_session.parent_session
assert base_session is not None
jsep = event.get('jsep', None)
assert jsep is not None
assert jsep['type'] == 'offer'
data = dict(sylkrtc='videoroom_event',
session=base_session.id,
event='feed_attached',
data=dict(sdp=jsep['sdp'], subscription=videoroom_session.id))
self._send_data(json.dumps(data))
elif event_type == 'slow_link':
pass
else:
log.warn('Received unexpected plugin event type %s: plugin_data=%s, event_data=%s' % (event_type, plugin_data, event_data))
diff --git a/sylk/applications/webrtcgateway/web/push.py b/sylk/applications/webrtcgateway/web/push.py
new file mode 100644
index 0000000..511da3c
--- /dev/null
+++ b/sylk/applications/webrtcgateway/web/push.py
@@ -0,0 +1,78 @@
+
+import json
+
+from sipsimple.util import ISOTimestamp
+from twisted.internet import defer, reactor
+from twisted.web.client import Agent
+from twisted.web.iweb import IBodyProducer
+from twisted.web.http_headers import Headers
+from zope.interface import implementer
+
+from sylk.applications.webrtcgateway.configuration import GeneralConfig
+from sylk.applications.webrtcgateway.logger import log
+
+__all__ = ['incoming_session', 'missed_session']
+
+
+agent = Agent(reactor)
+headers = Headers({'User-Agent': ['SylkServer'],
+ 'Content-Type': ['application/json'],
+ 'Authorization': ['key=%s' % GeneralConfig.firebase_server_key]})
+FIREBASE_API_URL = 'https://fcm.googleapis.com/fcm/send'
+
+
+@implementer(IBodyProducer)
+class StringProducer(object):
+ def __init__(self, data):
+ self.body = data
+ self.length = len(data)
+
+ def startProducing(self, consumer):
+ consumer.write(self.body)
+ return defer.succeed(None)
+
+ def pauseProducing(self):
+ pass
+
+ def stopProducing(self):
+ pass
+
+
+def incoming_session(originator, destination, tokens):
+ for token in tokens:
+ data = {'to': token, 'notification': {}, 'data': {'sylkrtc': {}}}
+ data['notification']['body'] = 'Incoming session from %s' % originator
+ data['priority'] = 'high'
+ data['time_to_live'] = 60 # don't deliver if phone is out for over a minute
+ data['data']['sylkrtc']['event'] = 'incoming_session'
+ data['data']['sylkrtc']['originator'] = originator
+ data['data']['sylkrtc']['destination'] = destination
+ data['data']['sylkrtc']['timestamp'] = str(ISOTimestamp.now())
+ _send_push_notification(json.dumps(data))
+
+
+def missed_session(originator, destination, tokens):
+ for token in tokens:
+ data = {'to': token, 'notification': {}, 'data': {'sylkrtc': {}}}
+ data['notification']['body'] = 'Missed session from %s' % originator
+ data['priority'] = 'high'
+ # No TTL, default is 4 weeks
+ data['data']['sylkrtc']['event'] = 'missed_session'
+ data['data']['sylkrtc']['originator'] = originator
+ data['data']['sylkrtc']['destination'] = destination
+ data['data']['sylkrtc']['timestamp'] = str(ISOTimestamp.now())
+ _send_push_notification(json.dumps(data))
+
+
+@defer.inlineCallbacks
+def _send_push_notification(payload):
+ if GeneralConfig.firebase_server_key:
+ try:
+ r = yield agent.request('POST', FIREBASE_API_URL, headers, StringProducer(payload))
+ except Exception, e:
+ log.msg('Error sending Firebase message: %s', e)
+ else:
+ if r.code != 200:
+ log.warn('Error sending Firebase message: %s' % r.phrase)
+ else:
+ log.warn('Cannot send push notification: no Firebase server key configured')
diff --git a/sylk/applications/webrtcgateway/web/storage.py b/sylk/applications/webrtcgateway/web/storage.py
new file mode 100644
index 0000000..c890405
--- /dev/null
+++ b/sylk/applications/webrtcgateway/web/storage.py
@@ -0,0 +1,43 @@
+
+__all__ = ['TokenStorage']
+
+import cPickle as pickle
+import os
+
+from application.python.types import Singleton
+from collections import defaultdict
+from sipsimple.threading import run_in_thread
+
+from sylk.configuration import ServerConfig
+
+
+class TokenStorage(object):
+ __metaclass__ = Singleton
+
+ def __init__(self):
+ self._tokens = defaultdict(set)
+
+ @run_in_thread('file-io')
+ def _save(self):
+ with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f:
+ pickle.dump(self._tokens, f)
+
+ @run_in_thread('file-io')
+ def load(self):
+ try:
+ tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb'))
+ except Exception:
+ pass
+ else:
+ self._tokens.update(tokens)
+
+ def __getitem__(self, key):
+ return self._tokens[key]
+
+ def add(self, account, token):
+ self._tokens[account].add(token)
+ self._save()
+
+ def remove(self, account, token):
+ self._tokens[account].discard(token)
+ self._save()
diff --git a/webrtcgateway.ini.sample b/webrtcgateway.ini.sample
index 8b3ca9c..3254c95 100644
--- a/webrtcgateway.ini.sample
+++ b/webrtcgateway.ini.sample
@@ -1,48 +1,57 @@
; SylkServer WebRTC gateway configuration file
;
; For the gateway to work Janus needs to be properly installed and configured,
; please refer to README.webrtc for detailed instructions
;
[General]
; List of allowed web origins. The connection origin (Origin header in the
; HTTP request) will be checked against the list defined here, if the domain
; is no allowed the connection will be refused.
; * (the default) means any
; web_origins = *
; Proxy used for outbound SIP traffic
; outbound_sip_proxy =
; List of allowed SIP domains for managing accounts
; sip_domains = *
; Boolean indicating if the WebSocket messages sent to/from clients should be logged
; to a file
; trace_websocket = False
; WebSocket Ping frames are sent at the configured interval, this helps detect dead
; client connections
; websocket_ping_interval = 120
+; IP and port for the HTTP management interface
+; http_management_interface = 127.0.0.1:20888
+
+; Shared secret for the HTTP management interface (Authorization: key=THE_KEY)
+; http_management_auth_secret =
+
+; Server key for Firebase Cloud Messaging
+; firebase_server_key =
+
[Janus]
; URL pointing to the Janus API endpoint (only WebSocket is supported)
; api_url = ws://127.0.0.1:8188
; API secret shared with Janus (must match the value in janus.cfg)
; A random UUID value is recommended, a new value can be generated with
; the following command:
; python -c 'import uuid; print(uuid.uuid4().hex)'
api_secret = 0745f2f74f34451c89343afcdcae5809
; Boolean indicating if the messages between SylkServer and Janus should be logged to
; a file
; trace_janus = False
; Per room configuration options
; [room1@videoconference.example.com]
; record = True
; access_policy = deny, allow
; deny = all
; allow = domain1.com, test1@example.com, test2@example.com

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 7:15 AM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3409016
Default Alt Text
(89 KB)

Event Timeline