Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7159609
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
89 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/sylk/applications/webrtcgateway/__init__.py b/sylk/applications/webrtcgateway/__init__.py
index a0338e1..7ee2443 100644
--- a/sylk/applications/webrtcgateway/__init__.py
+++ b/sylk/applications/webrtcgateway/__init__.py
@@ -1,30 +1,38 @@
from sylk.applications import SylkApplication
from sylk.applications.webrtcgateway.logger import log
from sylk.applications.webrtcgateway.util import IdentityFormatter
from sylk.applications.webrtcgateway.web import WebHandler
+from sylk.applications.webrtcgateway.web.admin import AdminWebHandler
+from sylk.applications.webrtcgateway.web.storage import TokenStorage
class WebRTCGatewayApplication(SylkApplication):
def __init__(self):
self.web_handler = WebHandler()
+ self.admin_web_handler = AdminWebHandler()
def start(self):
self.web_handler.start()
+ self.admin_web_handler.start()
+ # Load tokens from storage
+ storage = TokenStorage()
+ storage.load()
def stop(self):
self.web_handler.stop()
+ self.admin_web_handler.stop()
def incoming_session(self, session):
log.msg(u'New incoming session %s from %s rejected' % (session.call_id, IdentityFormatter.format(session.remote_identity)))
session.reject(403)
def incoming_subscription(self, request, data):
request.reject(405)
def incoming_referral(self, request, data):
request.reject(405)
def incoming_message(self, request, data):
request.reject(405)
diff --git a/sylk/applications/webrtcgateway/configuration.py b/sylk/applications/webrtcgateway/configuration.py
index 40b6871..17dcc00 100644
--- a/sylk/applications/webrtcgateway/configuration.py
+++ b/sylk/applications/webrtcgateway/configuration.py
@@ -1,127 +1,134 @@
import os
import re
from application.configuration import ConfigFile, ConfigSection, ConfigSetting
-from application.configuration.datatypes import StringList
+from application.configuration.datatypes import NetworkAddress, StringList
from sylk.configuration import ServerConfig
from sylk.configuration.datatypes import Path, SIPProxyAddress
from sylk.resources import VarResources
__all__ = ['get_room_config']
# Datatypes
class AccessPolicyValue(str):
allowed_values = ('allow,deny', 'deny,allow')
def __new__(cls, value):
value = re.sub('\s', '', value)
if value not in cls.allowed_values:
raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values))
return str.__new__(cls, value)
class Domain(str):
domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$")
def __new__(cls, value):
value = str(value)
if not cls.domain_re.match(value):
raise ValueError("illegal domain: %s" % value)
return str.__new__(cls, value)
class SIPAddress(str):
def __new__(cls, address):
address = str(address)
address = address.replace('@', '%40', address.count('@')-1)
try:
username, domain = address.split('@')
Domain(domain)
except ValueError:
raise ValueError("illegal SIP address: %s, must be in user@domain format" % address)
return str.__new__(cls, address)
class PolicySettingValue(list):
def __init__(self, value):
if isinstance(value, (tuple, list)):
l = [str(x) for x in value]
elif isinstance(value, basestring):
if value.lower() in ('none', ''):
return list.__init__(self, [])
elif value.lower() in ('any', 'all', '*'):
return list.__init__(self, ['*'])
else:
l = re.split(r'\s*,\s*', value)
else:
raise TypeError("value must be a string, list or tuple")
values = []
for item in l:
if '@' in item:
values.append(SIPAddress(item))
else:
values.append(Domain(item))
return list.__init__(self, values)
def match(self, uri):
if self == ['*']:
return True
(user, domain) = uri.split("@")
uri = re.sub('^(sip:|sips:)', '', str(uri))
return uri in self or domain in self
+class ManagementInterfaceAddress(NetworkAddress):
+ default_port = 20888
+
+
# Configuration objects
class GeneralConfig(ConfigSection):
__cfgfile__ = 'webrtcgateway.ini'
__section__ = 'General'
web_origins = ConfigSetting(type=StringList, value=['*'])
sip_domains = ConfigSetting(type=StringList, value=['*'])
outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None)
trace_websocket = False
websocket_ping_interval = 120
recording_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'recordings')))
-
-
-class RoomConfig(ConfigSection):
- __cfgfile__ = 'webrtcgateway.ini'
-
- record = False
- access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny'))
- allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all'))
- deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none'))
+ http_management_interface = ConfigSetting(type=ManagementInterfaceAddress, value=ManagementInterfaceAddress('127.0.0.1'))
+ http_management_auth_secret = ConfigSetting(type=str, value=None)
+ firebase_server_key = ConfigSetting(type=str, value=None)
class JanusConfig(ConfigSection):
__cfgfile__ = 'webrtcgateway.ini'
__section__ = 'Janus'
api_url = 'ws://127.0.0.1:8188'
api_secret = '0745f2f74f34451c89343afcdcae5809'
trace_janus = False
+class RoomConfig(ConfigSection):
+ __cfgfile__ = 'webrtcgateway.ini'
+
+ record = False
+ access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny'))
+ allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all'))
+ deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none'))
+
+
class Configuration(object):
def __init__(self, data):
self.__dict__.update(data)
def get_room_config(room):
config_file = ConfigFile(RoomConfig.__cfgfile__)
section = config_file.get_section(room)
if section is not None:
RoomConfig.read(section=room)
config = Configuration(dict(RoomConfig))
RoomConfig.reset()
else:
# Apply general policy
config = Configuration(dict(RoomConfig))
return config
diff --git a/sylk/applications/webrtcgateway/models/sylkrtc.py b/sylk/applications/webrtcgateway/models/sylkrtc.py
index 1ff432b..934ac42 100644
--- a/sylk/applications/webrtcgateway/models/sylkrtc.py
+++ b/sylk/applications/webrtcgateway/models/sylkrtc.py
@@ -1,191 +1,197 @@
__all__ = ['AccountAddRequest', 'AccountRemoveRequest', 'AccountRegisterRequest', 'AccountUnregisterRequest',
'SessionCreateRequest', 'SessionAnswerRequest', 'SessionTrickleRequest', 'SessionTerminateRequest',
'AckResponse', 'ErrorResponse',
'ReadyEvent']
import re
from jsonmodels import models, fields, errors, validators
from sipsimple.core import SIPURI, SIPCoreError
SIP_PREFIX_RE = re.compile('^sips?:')
class DefaultValueField(fields.BaseField):
def __init__(self, value):
self.default_value = value
super(DefaultValueField, self).__init__()
def validate(self, value):
if value != self.default_value:
raise errors.ValidationError('%s doesn\'t match the expected value %s' % (value, self.default_value))
def get_default_value(self):
return self.default_value
def URIValidator(value):
account = SIP_PREFIX_RE.sub('', value)
try:
SIPURI.parse('sip:%s' % account)
except SIPCoreError:
raise errors.ValidationError('invalid account: %s' % value)
def URIListValidator(values):
for item in values:
URIValidator(item)
class OptionsValidator(object):
def __init__(self, options):
self.options = options
def __call__(self, value):
if value not in self.options:
raise errors.ValidationError('invalid option: %s' % value)
# Base models
class SylkRTCRequestBase(models.Base):
transaction = fields.StringField(required=True)
class SylkRTCResponseBase(models.Base):
transaction = fields.StringField(required=True)
# Miscelaneous models
class AckResponse(SylkRTCResponseBase):
sylkrtc = DefaultValueField('ack')
class ErrorResponse(SylkRTCResponseBase):
sylkrtc = DefaultValueField('error')
error = fields.StringField(required=True)
class ICECandidate(models.Base):
candidate = fields.StringField(required=True)
sdpMLineIndex = fields.IntField(required=True)
sdpMid = fields.StringField(required=True)
class ReadyEvent(models.Base):
sylkrtc = DefaultValueField('event')
event = DefaultValueField('ready')
# Account models
class AccountRequestBase(SylkRTCRequestBase):
account = fields.StringField(required=True,
validators=[URIValidator])
class AccountAddRequest(AccountRequestBase):
sylkrtc = DefaultValueField('account-add')
password = fields.StringField(required=True,
validators=[validators.Length(minimum_value=1, maximum_value=9999)])
display_name = fields.StringField(required=False)
user_agent = fields.StringField(required=False)
class AccountRemoveRequest(AccountRequestBase):
sylkrtc = DefaultValueField('account-remove')
class AccountRegisterRequest(AccountRequestBase):
sylkrtc = DefaultValueField('account-register')
class AccountUnregisterRequest(AccountRequestBase):
sylkrtc = DefaultValueField('account-unregister')
+class AccountDeviceTokenRequest(AccountRequestBase):
+ sylkrtc = DefaultValueField('account-devicetoken')
+ old_token = fields.StringField(required=False)
+ new_token = fields.StringField(required=True)
+
+
# Session models
class SessionRequestBase(SylkRTCRequestBase):
session = fields.StringField(required=True)
class SessionCreateRequest(SessionRequestBase):
sylkrtc = DefaultValueField('session-create')
account = fields.StringField(required=True,
validators=[URIValidator])
uri = fields.StringField(required=True,
validators=[URIValidator])
sdp = fields.StringField(required=True)
class SessionAnswerRequest(SessionRequestBase):
sylkrtc = DefaultValueField('session-answer')
sdp = fields.StringField(required=True)
class SessionTrickleRequest(SessionRequestBase):
sylkrtc = DefaultValueField('session-trickle')
candidates = fields.ListField([ICECandidate])
class SessionTerminateRequest(SessionRequestBase):
sylkrtc = DefaultValueField('session-terminate')
# VideoRoom models
class VideoRoomRequestBase(SylkRTCRequestBase):
session = fields.StringField(required=True)
class VideoRoomJoinRequest(VideoRoomRequestBase):
sylkrtc = DefaultValueField('videoroom-join')
account = fields.StringField(required=True,
validators=[URIValidator])
uri = fields.StringField(required=True,
validators=[URIValidator])
sdp = fields.StringField(required=True)
class VideoRoomControlTrickleRequest(models.Base):
# ID for the subscriber session, if specified, otherwise the publisher is considered
session = fields.StringField(required=False)
candidates = fields.ListField([ICECandidate])
class VideoRoomControlFeedAttachRequest(models.Base):
session = fields.StringField(required=True)
publisher = fields.StringField(required=True)
class VideoRoomControlFeedAnswerRequest(models.Base):
session = fields.StringField(required=True)
sdp = fields.StringField(required=True)
class VideoRoomControlFeedDetachRequest(models.Base):
session = fields.StringField(required=True)
class VideoRoomControlInviteParticipantsRequest(models.Base):
participants = fields.ListField([str, unicode], validators=[URIListValidator])
class VideoRoomControlRequest(VideoRoomRequestBase):
sylkrtc = DefaultValueField('videoroom-ctl')
option = fields.StringField(required=True,
validators=[OptionsValidator(['trickle', 'feed-attach', 'feed-answer', 'feed-detach', 'invite-participants'])])
# all other options should have optional fields below, and the application needs to do a little validation
trickle = fields.EmbeddedField(VideoRoomControlTrickleRequest, required=False)
feed_attach = fields.EmbeddedField(VideoRoomControlFeedAttachRequest, required=False)
feed_answer = fields.EmbeddedField(VideoRoomControlFeedAnswerRequest, required=False)
feed_detach = fields.EmbeddedField(VideoRoomControlFeedDetachRequest, required=False)
invite_participants = fields.EmbeddedField(VideoRoomControlInviteParticipantsRequest, required=False)
class VideoRoomTerminateRequest(VideoRoomRequestBase):
sylkrtc = DefaultValueField('videoroom-terminate')
diff --git a/sylk/applications/webrtcgateway/web/admin.py b/sylk/applications/webrtcgateway/web/admin.py
new file mode 100644
index 0000000..19f3d01
--- /dev/null
+++ b/sylk/applications/webrtcgateway/web/admin.py
@@ -0,0 +1,101 @@
+
+import json
+
+from application.python.types import Singleton
+from klein import Klein
+from twisted.internet import reactor
+from twisted.web.server import Site
+
+from sylk.applications.webrtcgateway.configuration import GeneralConfig
+from sylk.applications.webrtcgateway.logger import log
+from sylk.applications.webrtcgateway.web import push
+from sylk.applications.webrtcgateway.web.storage import TokenStorage
+
+__all__ = ['AdminWebHandler']
+
+
+class AuthError(Exception): pass
+
+
+class AdminWebHandler(object):
+ __metaclass__ = Singleton
+
+ app = Klein()
+
+ def __init__(self):
+ self.listener = None
+
+ def start(self):
+ host, port = GeneralConfig.http_management_interface
+ self.listener = reactor.listenTCP(port, Site(self.app.resource()), interface=host)
+ log.msg('Admin web handler started at http://%s:%d' % (host, port))
+
+ def stop(self):
+ if self.listener is not None:
+ self.listener.stopListening()
+ self.listener = None
+
+ # Admin web API
+
+ def _check_auth(self, request):
+ auth_secret = GeneralConfig.http_management_auth_secret
+ if auth_secret:
+ auth_headers = request.requestHeaders.getRawHeaders('Authorization', default=None)
+ if not auth_headers or auth_headers[0] != auth_secret:
+ raise AuthError()
+
+ @app.handle_errors(AuthError)
+ def auth_error(self, request, failure):
+ request.setResponseCode(403)
+ return 'Authentication error'
+
+ @app.route('/incoming_session', methods=['POST'])
+ def incoming_session(self, request):
+ self._check_auth(request)
+ request.setHeader('Content-Type', 'application/json')
+ try:
+ data = json.load(request.content)
+ originator = data['originator']
+ destination = data['destination']
+ except Exception, e:
+ return json.dumps({'success': False, 'error': str(e)})
+ else:
+ storage = TokenStorage()
+ tokens = storage[destination]
+ push.incoming_session(originator, destination, tokens)
+ return json.dumps({'success': True})
+
+ @app.route('/missed_session', methods=['POST'])
+ def missed_session(self, request):
+ self._check_auth(request)
+ request.setHeader('Content-Type', 'application/json')
+ try:
+ data = json.load(request.content)
+ originator = data['originator']
+ destination = data['destination']
+ except Exception, e:
+ return json.dumps({'success': False, 'error': str(e)})
+ else:
+ storage = TokenStorage()
+ tokens = storage[destination]
+ push.missed_session(originator, destination, tokens)
+ return json.dumps({'success': True})
+
+ @app.route('/tokens/<string:account>')
+ def get_tokens(self, request, account):
+ self._check_auth(request)
+ request.setHeader('Content-Type', 'application/json')
+ storage = TokenStorage()
+ tokens = storage[account]
+ return json.dumps({'tokens': list(tokens)})
+
+ @app.route('/tokens/<string:account>/<string:token>', methods=['POST', 'DELETE'])
+ def process_token(self, request, account, token):
+ self._check_auth(request)
+ request.setHeader('Content-Type', 'application/json')
+ storage = TokenStorage()
+ if request.method == 'POST':
+ storage.add(account, token)
+ elif request.method == 'DELETE':
+ storage.remove(account, token)
+ return json.dumps({'success': True})
diff --git a/sylk/applications/webrtcgateway/web/factory.py b/sylk/applications/webrtcgateway/web/factory.py
index 52c4be3..317a5d4 100644
--- a/sylk/applications/webrtcgateway/web/factory.py
+++ b/sylk/applications/webrtcgateway/web/factory.py
@@ -1,79 +1,80 @@
import weakref
from application.notification import IObserver, NotificationCenter
from application.python import Null
+from application.python.types import Singleton
from autobahn.twisted.websocket import WebSocketServerFactory
from zope.interface import implements
from sylk.applications.webrtcgateway.web.protocol import SylkWebSocketServerProtocol, SYLK_WS_PROTOCOL
class VideoRoomContainer(object):
def __init__(self):
self._rooms = set()
self._uri_map = weakref.WeakValueDictionary()
self._id_map = weakref.WeakValueDictionary()
def add(self, room):
self._rooms.add(room)
self._uri_map[room.uri] = room
self._id_map[room.id] = room
def remove(self, value):
if isinstance(value, int):
room = self._id_map.get(value, None)
elif isinstance(value, basestring):
room = self._uri_map.get(value, None)
else:
room = value
self._rooms.discard(room)
def clear(self):
self._rooms.clear()
def __len__(self):
return len(self._rooms)
def __iter__(self):
return iter(self._rooms)
def __getitem__(self, item):
if isinstance(item, int):
return self._id_map[item]
elif isinstance(item, basestring):
return self._uri_map[item]
else:
raise KeyError('%s not found' % item)
def __contains__(self, item):
return item in self._id_map or item in self._uri_map or item in self._rooms
class SylkWebSocketServerFactory(WebSocketServerFactory):
implements(IObserver)
protocol = SylkWebSocketServerProtocol
connections = set()
videorooms = VideoRoomContainer()
backend = None # assigned by WebHandler
def __init__(self, *args, **kw):
super(SylkWebSocketServerFactory, self).__init__(*args, **kw)
notification_center = NotificationCenter()
notification_center.add_observer(self, name='JanusBackendDisconnected')
def buildProtocol(self, addr):
protocol = self.protocol()
protocol.factory = self
protocol.backend = self.backend
return protocol
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_JanusBackendDisconnected(self, notification):
for conn in self.connections.copy():
conn.dropConnection(abort=True)
self.videorooms.clear()
diff --git a/sylk/applications/webrtcgateway/web/handler.py b/sylk/applications/webrtcgateway/web/handler.py
index 381c583..611befd 100644
--- a/sylk/applications/webrtcgateway/web/handler.py
+++ b/sylk/applications/webrtcgateway/web/handler.py
@@ -1,1311 +1,1338 @@
import json
import os
import random
import re
import uuid
import weakref
from application.python import Null
from application.system import makedirs
from eventlib import coros, proc
from eventlib.twistedutil import block_on
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading.green import run_in_green_thread
from sipsimple.util import ISOTimestamp
from twisted.internet import reactor
from sylk.applications.webrtcgateway.configuration import GeneralConfig, get_room_config
from sylk.applications.webrtcgateway.logger import log
from sylk.applications.webrtcgateway.models import sylkrtc
from sylk.applications.webrtcgateway.util import GreenEvent
+from sylk.applications.webrtcgateway.web.storage import TokenStorage
SIP_PREFIX_RE = re.compile('^sips?:')
class ACLValidationError(Exception): pass
sylkrtc_models = {
# account management
'account-add' : sylkrtc.AccountAddRequest,
'account-remove' : sylkrtc.AccountRemoveRequest,
'account-register' : sylkrtc.AccountRegisterRequest,
'account-unregister' : sylkrtc.AccountUnregisterRequest,
+ 'account-devicetoken' : sylkrtc.AccountDeviceTokenRequest,
# session management
'session-create' : sylkrtc.SessionCreateRequest,
'session-answer' : sylkrtc.SessionAnswerRequest,
'session-trickle' : sylkrtc.SessionTrickleRequest,
'session-terminate' : sylkrtc.SessionTerminateRequest,
# video conference management
'videoroom-join' : sylkrtc.VideoRoomJoinRequest,
'videoroom-ctl' : sylkrtc.VideoRoomControlRequest,
'videoroom-terminate' : sylkrtc.VideoRoomTerminateRequest,
}
class AccountInfo(object):
def __init__(self, id, password, display_name=None, user_agent=None):
self.id = id
self.password = password
self.display_name = display_name
self.user_agent = user_agent
self.registration_state = None
self.janus_handle_id = None
@property
def uri(self):
return 'sip:%s' % self.id
class SessionPartyIdentity(object):
def __init__(self, uri, display_name=''):
self.uri = uri
self.display_name = display_name
class SIPSessionInfo(object):
def __init__(self, id):
self.id = id
self.direction = None
self.state = None
self.account_id = None
self.local_identity = None # instance of SessionPartyIdentity
self.remote_identity = None # instance of SessionPartyIdentity
self.janus_handle_id = None
self.ice_media_negotiation_started = False
self.ice_media_negotiation_ended = False
def init_outgoing(self, account_id, destination):
self.account_id = account_id
self.direction = 'outgoing'
self.state = 'connecting'
self.local_identity = SessionPartyIdentity(account_id)
self.remote_identity = SessionPartyIdentity(destination)
def init_incoming(self, account_id, originator, originator_display_name=''):
self.account_id = account_id
self.direction = 'incoming'
self.state = 'connecting'
self.local_identity = SessionPartyIdentity(account_id)
self.remote_identity = SessionPartyIdentity(originator, originator_display_name)
class VideoRoom(object):
def __init__(self, uri):
self.config = get_room_config(uri)
self.uri = uri
self.record = self.config.record
self.rec_dir = os.path.join(GeneralConfig.recording_dir, '%s/' % uri)
self.id = random.getrandbits(32) # janus needs numeric room names
self.destroyed = False
self._session_id_map = weakref.WeakValueDictionary()
self._publisher_id_map = weakref.WeakValueDictionary()
if self.record:
makedirs(self.rec_dir, 0755)
log.msg('Video room %s created with recording in %s' % (self.uri, self.rec_dir))
else:
log.msg('Video room %s created without recording' % self.uri)
def add(self, session):
assert session.publisher_id is not None
assert session.id not in self._session_id_map
assert session.publisher_id not in self._publisher_id_map
self._session_id_map[session.id] = session
self._publisher_id_map[session.publisher_id] = session
log.msg('Video room %s: added session %s for %s' % (self.uri, session.id, session.account_id))
def __getitem__(self, key):
try:
return self._session_id_map[key]
except KeyError:
return self._publisher_id_map[key]
def __len__(self):
return len(self._session_id_map)
class VideoRoomSessionInfo(object):
def __init__(self, id):
self.id = id
self.account_id = None
self.type = None # publisher / subscriber
self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers
self.janus_handle_id = None
self.room = None
self.parent_session = None
self.feeds = {} # janus publisher ID -> our publisher ID
self.ice_media_negotiation_started = False
self.ice_media_negotiation_ended = False
def initialize(self, account_id, type, room):
assert type in ('publisher', 'subscriber')
self.account_id = account_id
self.type = type
self.room = room
log.msg('Video room %s: new session %s initialized by %s' % (self.room.uri, self.id, self.account_id))
def __repr__(self):
return '<%s: id=%s janus_handle_id=%s type=%s>' % (self.__class__.__name__, self.id, self.janus_handle_id, self.type)
class VideoRoomSessionContainer(object):
def __init__(self):
self._sessions = set()
self._janus_handle_map = weakref.WeakValueDictionary()
self._id_map = weakref.WeakValueDictionary()
def add(self, session):
assert session not in self._sessions
assert session.janus_handle_id not in self._janus_handle_map
assert session.id not in self._id_map
self._sessions.add(session)
self._janus_handle_map[session.janus_handle_id] = session
self._id_map[session.id] = session
def remove(self, session):
self._sessions.discard(session)
def count(self):
return len(self._sessions)
def clear(self):
self._sessions.clear()
def __len__(self):
return len(self._sessions)
def __iter__(self):
return iter(self._sessions)
def __getitem__(self, key):
try:
return self._id_map[key]
except KeyError:
return self._janus_handle_map[key]
def __contains__(self, item):
return item in self._id_map or item in self._janus_handle_map or item in self._sessions
class Operation(object):
__slots__ = ('name', 'data')
def __init__(self, name, data):
self.name = name
self.data = data
class APIError(Exception):
pass
class ConnectionHandler(object):
def __init__(self, protocol):
self.protocol = protocol
self.janus_session_id = None
self.accounts_map = {} # account ID -> account
self.account_handles_map = {} # Janus handle ID -> account
self.sessions_map = {} # session ID -> session
self.session_handles_map = {} # Janus handle ID -> session
self.videoroom_sessions = VideoRoomSessionContainer() # session ID / janus handle ID -> session
self.ready_event = GreenEvent()
self.resolver = DNSLookup()
self.proc = proc.spawn(self._operations_handler)
self.operations_queue = coros.queue()
@property
def end_point_address(self):
return self.protocol.peer
def start(self):
self._create_janus_session()
def stop(self):
if self.ready_event.is_set():
assert self.janus_session_id is not None
self.protocol.backend.janus_stop_keepalive(self.janus_session_id)
self.protocol.backend.janus_destroy_session(self.janus_session_id)
if self.proc is not None:
self.proc.kill()
self.proc = None
# cleanup
self.ready_event.clear()
self.accounts_map.clear()
self.account_handles_map.clear()
self.sessions_map.clear()
self.session_handles_map.clear()
self.videoroom_sessions.clear()
self.janus_session_id = None
self.protocol = None
def handle_message(self, data):
try:
request_type = data.pop('sylkrtc')
except KeyError:
log.warn('Error getting WebSocket message type')
return
self.ready_event.wait()
try:
model = sylkrtc_models[request_type]
except KeyError:
log.warn('Unknown request type: %s' % request_type)
return
request = model(**data)
try:
request.validate()
except Exception, e:
log.error('%s: %s' % (request_type, e))
if request.transaction:
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
return
op = Operation(request_type, request)
self.operations_queue.send(op)
def validate_acl(self, room_uri, from_uri):
cfg = get_room_config(room_uri)
if cfg.access_policy == 'allow,deny':
if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri):
return
raise ACLValidationError
else:
if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri):
raise ACLValidationError
# internal methods (not overriding / implementing the protocol API)
def _send_response(self, response):
response.validate()
self._send_data(json.dumps(response.to_struct()))
def _send_data(self, data):
if GeneralConfig.trace_websocket:
self.protocol.factory.ws_logger.msg("OUT", ISOTimestamp.now(), data)
self.protocol.sendMessage(data, False)
def _cleanup_session(self, session):
@run_in_green_thread
def do_cleanup():
if self.janus_session_id is None:
# The connection was closed, there is noting to do here
return
self.sessions_map.pop(session.id)
if session.direction == 'outgoing':
# Destroy plugin handle for outgoing sessions. For incoming ones it's the
# same as the account handle, so don't
block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None)
self.session_handles_map.pop(session.janus_handle_id)
# give it some time to receive other hangup events
reactor.callLater(2, do_cleanup)
def _cleanup_videoroom_session(self, session):
@run_in_green_thread
def do_cleanup():
if self.janus_session_id is None:
# The connection was closed, there is noting to do here
return
if session in self.videoroom_sessions:
self.videoroom_sessions.remove(session)
block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None)
# give it some time to receive other hangup events
reactor.callLater(2, do_cleanup)
def _maybe_destroy_videoroom(self, videoroom):
if videoroom is None:
return
@run_in_green_thread
def f():
if self.protocol is None:
# The connection was closed
return
# destroy the room if empty
if not videoroom and not videoroom.destroyed:
videoroom.destroyed = True
self.protocol.factory.videorooms.remove(videoroom)
# create a handle to do the cleanup
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
data = {'request': 'destroy',
'room': videoroom.id}
try:
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except Exception:
pass
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
log.msg('Video room %s destroyed' % videoroom.uri)
# don't destroy it immediately
reactor.callLater(5, f)
@run_in_green_thread
def _create_janus_session(self):
if self.ready_event.is_set():
self._send_response(sylkrtc.ReadyEvent())
return
try:
self.janus_session_id = block_on(self.protocol.backend.janus_create_session())
self.protocol.backend.janus_start_keepalive(self.janus_session_id)
except Exception, e:
log.warn('Error creating session, disconnecting: %s' % e)
self.protocol.disconnect(3000, unicode(e))
return
self._send_response(sylkrtc.ReadyEvent())
self.ready_event.set()
def _lookup_sip_proxy(self, account):
sip_uri = SIPURI.parse('sip:%s' % account)
# The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed
# as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use
# caching to avoid long delays, we randomize the results matching the highest priority route's
# transport.
proxy = GeneralConfig.outbound_sip_proxy
if proxy is not None:
proxy_uri = SIPURI(host=proxy.host,
port=proxy.port,
parameters={'transport': proxy.transport})
else:
proxy_uri = SIPURI(host=sip_uri.host)
settings = SIPSimpleSettings()
routes = self.resolver.lookup_sip_proxy(proxy_uri, settings.sip.transport_list).wait()
if not routes:
raise DNSLookupError('no results found')
# Get all routes with the highest priority transport and randomly pick one
route = random.choice([r for r in routes if r.transport == routes[0].transport])
# Build a proxy URI Sofia-SIP likes
return '%s:%s:%d%s' % ('sips' if route.transport == 'tls' else 'sip',
route.address,
route.port,
';transport=%s' % route.transport if route.transport != 'tls' else '')
def _handle_conference_invite(self, originator, room_uri, participants):
for p in participants:
try:
account_info = self.accounts_map[p]
except KeyError:
continue
data = dict(sylkrtc='account_event',
account=account_info.id,
event='conference_invite',
data=dict(originator=dict(uri=originator.id, display_name=originator.display_name),
room=room_uri))
log.msg('Video room %s: invitation from %s to %s' % (room_uri, originator.id, account_info.id))
self._send_data(json.dumps(data))
def _handle_janus_event_sip(self, handle_id, event_type, event):
# TODO: use a model
op = Operation('janus-event-sip', data=dict(handle_id=handle_id, event_type=event_type, event=event))
self.operations_queue.send(op)
def _handle_janus_event_videoroom(self, handle_id, event_type, event):
# TODO: use a model
op = Operation('janus-event-videoroom', data=dict(handle_id=handle_id, event_type=event_type, event=event))
self.operations_queue.send(op)
def _operations_handler(self):
while True:
op = self.operations_queue.wait()
handler = getattr(self, '_OH_%s' % op.name.replace('-', '_'), Null)
try:
handler(op.data)
except Exception:
log.exception('Unhandled exception in operation "%s"' % op.name)
del op, handler
def _OH_account_add(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
password = request.password
display_name = request.display_name
user_agent = request.user_agent
try:
if account in self.accounts_map:
raise APIError('Account %s already added' % account)
# check if domain is acceptable
domain = account.partition('@')[2]
if not {'*', domain}.intersection(GeneralConfig.sip_domains):
raise APIError('SIP domain not allowed: %s' % domain)
# Create and store our mapping
account_info = AccountInfo(account, password, display_name, user_agent)
self.accounts_map[account_info.id] = account_info
except APIError, e:
log.error('account_add: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s added using %s at %s' % (account, user_agent, self.end_point_address))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_remove(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
try:
try:
account_info = self.accounts_map.pop(account)
except KeyError:
raise APIError('Unknown account specified: %s' % account)
handle_id = account_info.janus_handle_id
if handle_id is not None:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
self.account_handles_map.pop(handle_id)
except APIError, e:
log.error('account_remove: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s removed' % account)
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_register(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
try:
proxy = self._lookup_sip_proxy(account)
except DNSLookupError:
raise APIError('DNS lookup error')
handle_id = account_info.janus_handle_id
if handle_id is not None:
# Destroy the existing plugin handle
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
self.account_handles_map.pop(handle_id)
account_info.janus_handle_id = None
# Create a plugin handle
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip)
account_info.janus_handle_id = handle_id
self.account_handles_map[handle_id] = account_info
data = {'request': 'register',
'username': account_info.uri,
'display_name': account_info.display_name,
'user_agent': account_info.user_agent,
'ha1_secret': account_info.password,
'proxy': proxy}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except APIError, e:
log.error('account-register: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s will register using %s' % (account, account_info.user_agent))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_unregister(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
handle_id = account_info.janus_handle_id
if handle_id is not None:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
account_info.janus_handle_id = None
self.account_handles_map.pop(handle_id)
except APIError, e:
log.error('account-unregister: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s will unregister' % account)
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
+ def _OH_account_devicetoken(self, request):
+ # extract the fields to avoid going through the descriptor several times
+ account = request.account
+ old_token = request.old_token
+ new_token = request.new_token
+
+ try:
+ try:
+ account_info = self.accounts_map[account]
+ except KeyError:
+ raise APIError('Unknown account specified: %s' % account)
+
+ storage = TokenStorage()
+ if old_token is not None:
+ storage.remove(account, old_token)
+ log.msg('Removed device token %s... for account %s', (old_token[:5], account))
+ if new_token is not None:
+ storage.add(account, new_token)
+ log.msg('Added device token %s... for account %s', (new_token[:5], account))
+ except APIError, e:
+ log.error('account-devicetoken: %s' % e)
+ self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
+ else:
+ self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
+
def _OH_session_create(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
session = request.session
uri = request.uri
sdp = request.sdp
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
if session in self.sessions_map:
raise APIError('Session ID (%s) already in use' % session)
# Create a new plugin handle and 'register' it, without actually doing so
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip)
try:
proxy = self._lookup_sip_proxy(account_info.id)
except DNSLookupError:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
raise APIError('DNS lookup error')
data = {'request': 'register',
'username': account_info.uri,
'display_name': account_info.display_name,
'user_agent': account_info.user_agent,
'ha1_secret': account_info.password,
'proxy': proxy,
'send_register': False}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
session_info = SIPSessionInfo(session)
session_info.janus_handle_id = handle_id
session_info.init_outgoing(account, uri)
# TODO: create a "SessionContainer" object combining the 2
self.sessions_map[session_info.id] = session_info
self.session_handles_map[handle_id] = session_info
data = {'request': 'call', 'uri': 'sip:%s' % SIP_PREFIX_RE.sub('', uri), 'srtp': 'sdes_optional'}
jsep = {'type': 'offer', 'sdp': sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep))
except APIError, e:
log.error('session-create: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Outgoing session %s from %s to %s created using %s from %s' % (session, account, uri, account_info.user_agent, self.end_point_address))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_answer(self, request):
# extract the fields to avoid going through the descriptor several times
session = request.session
sdp = request.sdp
try:
try:
session_info = self.sessions_map[session]
except KeyError:
raise APIError('Unknown session specified: %s' % session)
if session_info.direction != 'incoming':
raise APIError('Cannot answer outgoing session')
if session_info.state != 'connecting':
raise APIError('Invalid state for session answer')
data = {'request': 'accept'}
jsep = {'type': 'answer', 'sdp': sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data, jsep))
except APIError, e:
log.error('session-answer: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('%s answered session %s' % (session_info.account_id, session))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_trickle(self, request):
# extract the fields to avoid going through the descriptor several times
session = request.session
candidates = [c.to_struct() for c in request.candidates]
try:
try:
session_info = self.sessions_map[session]
except KeyError:
raise APIError('Unknown session specified: %s' % session)
if session_info.state == 'terminated':
raise APIError('Session is terminated')
try:
account_info = self.accounts_map[session_info.account_id]
except KeyError:
raise APIError('Unknown account specified: %s' % session_info.account_id)
block_on(self.protocol.backend.janus_trickle(self.janus_session_id, session_info.janus_handle_id, candidates))
except APIError, e:
#log.error('session-trickle: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
if candidates:
if not session_info.ice_media_negotiation_started:
log.msg('Session %s: ICE negotiation started by %s using %s' % (session_info.id, session_info.account_id, account_info.user_agent))
session_info.ice_media_negotiation_started = True
else:
log.msg('Session %s: ICE negotiation ended by %s using %s' % (session_info.id, session_info.account_id, account_info.user_agent))
session_info.ice_media_negotiation_ended = True
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_terminate(self, request):
# extract the fields to avoid going through the descriptor several times
session = request.session
try:
try:
session_info = self.sessions_map[session]
except KeyError:
raise APIError('Unknown session specified: %s' % session)
if session_info.state not in ('connecting', 'progress', 'accepted', 'established'):
raise APIError('Invalid state for session terminate: \"%s\"' % session_info.state)
if session_info.direction == 'incoming' and session_info.state == 'connecting':
data = {'request': 'decline', 'code': 486}
else:
data = {'request': 'hangup'}
block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data))
except APIError, e:
log.error('session-terminate: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('%s terminated session %s' % (session_info.account_id, session))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_videoroom_join(self, request):
account = request.account
session = request.session
uri = request.uri
sdp = request.sdp
videoroom = None
try:
try:
self.validate_acl(uri, account)
except ACLValidationError:
raise APIError('%s is not allowed to join room %s' % (account, uri))
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
if session in self.videoroom_sessions:
raise APIError('Video room session ID (%s) already in use' % session)
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
# create the room if it doesn't exist
try:
videoroom = self.protocol.factory.videorooms[uri]
except KeyError:
videoroom = VideoRoom(uri)
self.protocol.factory.videorooms.add(videoroom)
data = {'request': 'create',
'room': videoroom.id,
'publishers': 10,
'record': videoroom.record,
'rec_dir': videoroom.rec_dir
}
try:
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except Exception, e:
code = getattr(e, 'code', -1)
if code != 427: # 417 == room exists
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
raise APIError(str(e))
# join the room
data = {'request': 'joinandconfigure',
'room': videoroom.id,
'ptype': 'publisher',
'audio': True,
'video': True
}
if account_info.display_name:
data['display'] = account_info.display_name
jsep = {'type': 'offer', 'sdp': sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep))
videoroom_session = VideoRoomSessionInfo(session)
videoroom_session.janus_handle_id = handle_id
videoroom_session.initialize(account, 'publisher', videoroom)
self.videoroom_sessions.add(videoroom_session)
except APIError, e:
log.error('videoroom-join: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
self._maybe_destroy_videoroom(videoroom)
else:
log.msg('Video room %s: joined by %s using %s (%d participants present) from %s' % (videoroom.uri, account, account_info.user_agent, self.videoroom_sessions.count(), self.end_point_address))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='progress'))
self._send_data(json.dumps(data))
def _OH_videoroom_ctl(self, request):
if request.option == 'trickle':
trickle = request.trickle
if not trickle:
log.error('videoroom-ctl: missing field')
return
candidates = [c.to_struct() for c in trickle.candidates]
session = trickle.session or request.session
try:
try:
videoroom_session = self.videoroom_sessions[session]
except KeyError:
raise APIError('trickle: unknown video room session ID specified: %s' % session)
try:
account_info = self.accounts_map[videoroom_session.account_id]
except KeyError:
raise APIError('Unknown account specified: %s' % videoroom_session.account_id)
block_on(self.protocol.backend.janus_trickle(self.janus_session_id, videoroom_session.janus_handle_id, candidates))
except APIError, e:
#log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
if candidates:
if not videoroom_session.ice_media_negotiation_started:
log.msg('Video room %s: ICE negotiation started by %s using %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
videoroom_session.ice_media_negotiation_started = True
else:
log.msg('Video room %s: ICE negotiation ended by %s using %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
videoroom_session.ice_media_negotiation_ended = True
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
elif request.option == 'feed-attach':
feed_attach = request.feed_attach
if not feed_attach:
log.error('videoroom-ctl: missing field')
return
try:
if feed_attach.session in self.videoroom_sessions:
raise APIError('feed-attach: video room session ID (%s) already in use' % feed_attach.session)
# get the 'base' session, the one used to join and publish
try:
base_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('feed-attach: unknown video room session ID specified: %s' % request.session)
# get the publisher's session
try:
publisher_session = base_session.room[feed_attach.publisher]
except KeyError:
raise APIError('feed-attach: unknown publisher video room session ID specified: %s' % feed_attach.publisher)
if publisher_session.publisher_id is None:
raise APIError('feed-attach: video room session ID does not have a publisher ID' % feed_attach.publisher)
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
# join the room as a listener
data = {'request': 'join',
'room': base_session.room.id,
'ptype': 'listener',
'feed': publisher_session.publisher_id}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
videoroom_session = VideoRoomSessionInfo(feed_attach.session)
videoroom_session.janus_handle_id = handle_id
videoroom_session.parent_session = base_session
videoroom_session.publisher_id = publisher_session.id
videoroom_session.initialize(base_session.account_id, 'subscriber', base_session.room)
self.videoroom_sessions.add(videoroom_session)
base_session.feeds[publisher_session.publisher_id] = publisher_session.id
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Video room %s: %s attached to %s' % (base_session.room.uri, base_session.account_id, feed_attach.publisher))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
elif request.option == 'feed-answer':
feed_answer = request.feed_answer
if not feed_answer:
log.error('videoroom-ctl: missing field')
return
try:
try:
videoroom_session = self.videoroom_sessions[request.feed_answer.session]
except KeyError:
raise APIError('feed-answer: unknown video room session ID specified: %s' % feed_answer.session)
data = {'request': 'start',
'room': videoroom_session.room.id}
jsep = {'type': 'answer', 'sdp': feed_answer.sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data, jsep))
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
elif request.option == 'feed-detach':
feed_detach = request.feed_detach
if not feed_detach:
log.error('videoroom-ctl: missing field')
return
try:
try:
base_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('feed-detach: unknown video room session ID specified: %s' % request.session)
try:
videoroom_session = self.videoroom_sessions[feed_detach.session]
except KeyError:
raise APIError('feed-detach: unknown video room session ID specified: %s' % feed_detach.session)
data = {'request': 'leave'}
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data))
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
block_on(self.protocol.backend.janus_detach(self.janus_session_id, videoroom_session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(videoroom_session.janus_handle_id, None)
self.videoroom_sessions.remove(videoroom_session)
try:
janus_publisher_id = next(k for k, v in base_session.feeds.iteritems() if v == videoroom_session.publisher_id)
except StopIteration:
pass
else:
base_session.feeds.pop(janus_publisher_id)
elif request.option == 'invite-participants':
invite_participants = request.invite_participants
if not invite_participants:
log.error('videoroom-ctl: missing field')
return
try:
try:
base_session = self.videoroom_sessions[request.session]
account_info = self.accounts_map[base_session.account_id]
except KeyError:
raise APIError('invite-participants: unknown video room session ID specified: %s' % request.session)
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
for conn in self.protocol.factory.connections.difference([self]):
if conn.connection_handler:
conn.connection_handler._handle_conference_invite(account_info, base_session.room.uri, invite_participants.participants)
else:
log.error('videoroom-ctl: unsupported option: %s' % request.option)
def _OH_videoroom_terminate(self, request):
session = request.session
try:
try:
videoroom_session = self.videoroom_sessions[session]
except KeyError:
raise APIError('Unknown video room session ID specified: %s' % session)
data = {'request': 'leave'}
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data))
except APIError, e:
log.error('videoroom-terminate: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Video room %s: %s left the room (%d participants present)' % (videoroom_session.room.uri, videoroom_session.account_id, self.videoroom_sessions.count()))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='terminated'))
self._send_data(json.dumps(data))
self._cleanup_videoroom_session(videoroom_session)
self._maybe_destroy_videoroom(videoroom_session.room)
# Event handlers
def _OH_janus_event_sip(self, data):
handle_id = data['handle_id']
event_type = data['event_type']
event = data['event']
if event_type == 'event':
self._janus_event_plugin_sip(data)
elif event_type == 'webrtcup':
try:
session_info = self.session_handles_map[handle_id]
except KeyError:
log.msg('Could not find session for handle ID %s' % handle_id)
return
session_info.state = 'established'
data = dict(sylkrtc='session_event',
session=session_info.id,
event='state',
data=dict(state=session_info.state))
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s state: %s' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
session_info.state))
# TODO: SessionEvent model
self._send_data(json.dumps(data))
elif event_type == 'hangup':
try:
session_info = self.session_handles_map[handle_id]
except KeyError:
log.msg('Could not find session for handle ID %s' % handle_id)
return
if session_info.state != 'terminated':
session_info.state = 'terminated'
code = event.get('code', 0)
reason = event.get('reason', 'Unknown')
reason = '%d %s' % (code, reason)
data = dict(sylkrtc='session_event',
session=session_info.id,
event='state',
data=dict(state=session_info.state, reason=reason))
# TODO: SessionEvent model
self._send_data(json.dumps(data))
self._cleanup_session(session_info)
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s terminated (%s)' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
reason))
elif event_type in ('media', 'detached'):
# ignore
pass
elif event_type == 'slowlink':
# TODO something
pass
else:
log.warn('Received unexpected event type: %s' % event_type)
def _janus_event_plugin_sip(self, data):
handle_id = data['handle_id']
event = data['event']
plugin_data = event['plugindata']
assert(plugin_data['plugin'] == 'janus.plugin.sip')
event_data = event['plugindata']['data']
assert(event_data.get('sip', '') == 'event')
if 'result' not in event_data:
log.warn('Unexpected event: %s' % event)
return
event_data = event_data['result']
jsep = event.get('jsep', None)
event_type = event_data['event']
if event_type in ('registering', 'registered', 'registration_failed', 'incomingcall'):
# skip 'registered' events from session handles
if event_type == 'registered' and event_data['register_sent'] in (False, 'false'):
return
# account event
try:
account_info = self.account_handles_map[handle_id]
except KeyError:
log.warn('Could not find account for handle ID %s' % handle_id)
return
if event_type == 'incomingcall':
originator_uri = SIP_PREFIX_RE.sub('', event_data['username'])
originator_display_name = event_data.get('displayname', '').replace('"', '')
jsep = event.get('jsep', None)
assert jsep is not None
session_id = uuid.uuid4().hex
session = SIPSessionInfo(session_id)
session.janus_handle_id = handle_id
session.init_incoming(account_info.id, originator_uri, originator_display_name)
self.sessions_map[session_id] = session
self.session_handles_map[handle_id] = session
data = dict(sylkrtc='account_event',
account=account_info.id,
session=session_id,
event='incoming_session',
data=dict(originator=session.remote_identity.__dict__, sdp=jsep['sdp']))
log.msg('Incoming session %s from %s to %s created' % (session.id,
session.remote_identity.uri,
session.local_identity.uri))
else:
registration_state = event_type
if registration_state == 'registration_failed':
registration_state = 'failed'
if account_info.registration_state == registration_state:
return
account_info.registration_state = registration_state
registration_data = dict(state=registration_state)
if registration_state == 'failed':
code = event_data['code']
reason = event_data['reason']
registration_data['reason'] = '%d %s' % (code, reason)
log.msg('Account %s registration failed: %s (%s)' % (account_info.id, code, reason))
elif registration_state == 'registered':
log.msg('Account %s registered using %s from %s' % (account_info.id, account_info.user_agent, self.end_point_address))
data = dict(sylkrtc='account_event',
account=account_info.id,
event='registration_state',
data=registration_data)
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type in ('calling', 'accepted', 'hangup'):
# session event
try:
session_info = self.session_handles_map[handle_id]
except KeyError:
log.warn('Could not find session for handle ID %s' % handle_id)
return
if event_type == 'hangup' and session_info.state == 'terminated':
return
if event_type == 'calling':
session_info.state = 'progress'
elif event_type == 'accepted':
session_info.state = 'accepted'
elif event_type == 'hangup':
session_info.state = 'terminated'
data = dict(sylkrtc='session_event',
session=session_info.id,
event='state',
data=dict(state=session_info.state))
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s state: %s' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
session_info.state))
if session_info.state == 'accepted' and session_info.direction == 'outgoing':
assert jsep is not None
data['data']['sdp'] = jsep['sdp']
elif session_info.state == 'terminated':
code = event_data.get('code', 0)
reason = event_data.get('reason', 'Unknown')
reason = '%d %s' % (code, reason)
data['data']['reason'] = reason
# TODO: SessionEvent model
self._send_data(json.dumps(data))
if session_info.state == 'terminated':
self._cleanup_session(session_info)
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s terminated (%s)' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
reason))
# check if missed incoming call
if session_info.direction == 'incoming' and code == 487:
data = dict(sylkrtc='account_event',
account=session_info.account_id,
event='missed_session',
data=dict(originator=session_info.remote_identity.__dict__))
log.msg('Incoming session from %s to %s was not answered ' % (session_info.remote_identity.uri, session_info.local_identity.uri))
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type == 'missed_call':
try:
account_info = self.account_handles_map[handle_id]
except KeyError:
log.warn('Could not find account for handle ID %s' % handle_id)
return
originator_uri = SIP_PREFIX_RE.sub('', event_data['caller'])
originator_display_name = event_data.get('displayname', '').replace('"', '')
# We have no session, so create an identity object by hand
originator = SessionPartyIdentity(originator_uri, originator_display_name)
data = dict(sylkrtc='account_event',
account=account_info.id,
event='missed_session',
data=dict(originator=originator.__dict__))
log.msg('Incoming session from %s missed' % originator.uri)
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type in ('ack', 'declining', 'hangingup', 'proceeding'):
# ignore
pass
else:
log.warn('Unexpected SIP plugin event type: %s' % event_type)
def _OH_janus_event_videoroom(self, data):
handle_id = data['handle_id']
event_type = data['event_type']
if event_type == 'event':
self._janus_event_plugin_videoroom(data)
elif event_type == 'webrtcup':
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find videoroom session for handle ID %s' % handle_id)
return
base_session = videoroom_session.parent_session
if base_session is None:
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='established'))
else:
# this is a subscriber session
data = dict(sylkrtc='videoroom_event',
session=base_session.id,
event='feed_established',
data=dict(state='established', subscription=videoroom_session.id))
log.msg('Video room %s: session established to %s' % (videoroom_session.room.uri, videoroom_session.account_id))
self._send_data(json.dumps(data))
elif event_type == 'hangup':
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find video room session for handle ID %s' % handle_id)
return
log.msg('Video room %s: session terminated to %s' % (videoroom_session.room.uri, videoroom_session.account_id))
self._cleanup_videoroom_session(videoroom_session)
self._maybe_destroy_videoroom(videoroom_session.room)
elif event_type in ('media', 'detached'):
# ignore
pass
elif event_type == 'slowlink':
try:
videoroom_session = (session_info for session_info in self.videoroom_sessions if session_info.janus_handle_id == handle_id).next()
except StopIteration:
log.warn('Could not find video room session for Janus handle ID %s' % handle_id)
else:
try:
account_info = self.accounts_map[videoroom_session.account_id]
except KeyError:
raise APIError('Unknown account specified: %s' % videoroom_session.account_id)
try:
uplink = data['event']['uplink']
except KeyError:
log.warn('Could not find uplink in slowlink event data')
else:
if uplink:
log.msg('Video room %s: %s has poor download connection on %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
else:
log.msg('Video room %s: %s has poor upload connection on %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
else:
log.warn('Received unexpected event type %s: data=%s' % (event_type, data))
def _janus_event_plugin_videoroom(self, data):
handle_id = data['handle_id']
event = data['event']
plugin_data = event['plugindata']
assert(plugin_data['plugin'] == 'janus.plugin.videoroom')
event_data = event['plugindata']['data']
assert 'videoroom' in event_data
event_type = event_data['videoroom']
if event_type == 'joined':
# a join request succeeded, this is a publisher
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find video room session for handle ID %s' % handle_id)
return
room = videoroom_session.room
videoroom_session.publisher_id = event_data['id']
room.add(videoroom_session)
jsep = event.get('jsep', None)
assert jsep is not None
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='accepted', sdp=jsep['sdp']))
self._send_data(json.dumps(data))
# send information about existing publishers
publishers = []
for p in event_data['publishers']:
publisher_id = p['id']
publisher_display = p.get('display', '')
try:
publisher_session = room[publisher_id]
except KeyError:
log.warn('Could not find matching session for publisher %s' % publisher_id)
continue
item = {
'id': publisher_session.id,
'uri': publisher_session.account_id,
'display_name': publisher_display,
}
publishers.append(item)
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='initial_publishers',
data=dict(publishers=publishers))
self._send_data(json.dumps(data))
elif event_type == 'event':
if 'publishers' in event_data:
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find videoroom session for handle ID %s' % handle_id)
return
room = videoroom_session.room
# send information about new publishers
publishers = []
for p in event_data['publishers']:
publisher_id = p['id']
publisher_display = p.get('display', '')
try:
publisher_session = room[publisher_id]
except KeyError:
log.warn('Could not find matching session for publisher %s' % publisher_id)
continue
item = {
'id': publisher_session.id,
'uri': publisher_session.account_id,
'display_name': publisher_display,
}
publishers.append(item)
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='publishers_joined',
data=dict(publishers=publishers))
self._send_data(json.dumps(data))
elif 'leaving' in event_data:
try:
base_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find video room session for handle ID %s' % handle_id)
return
janus_publisher_id = event_data['leaving']
try:
publisher_id = base_session.feeds.pop(janus_publisher_id)
except KeyError:
return
data = dict(sylkrtc='videoroom_event',
session=base_session.id,
event='publishers_left',
data=dict(publishers=[publisher_id]))
self._send_data(json.dumps(data))
elif {'started', 'unpublished', 'left'}.intersection(event_data):
# ignore
pass
else:
log.warn('Received unexpected plugin "event" event')
elif event_type == 'attached':
# sent when a feed is subscribed for a given publisher
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find videoroom session for handle ID %s' % handle_id)
return
# get the session which originated the subscription
base_session = videoroom_session.parent_session
assert base_session is not None
jsep = event.get('jsep', None)
assert jsep is not None
assert jsep['type'] == 'offer'
data = dict(sylkrtc='videoroom_event',
session=base_session.id,
event='feed_attached',
data=dict(sdp=jsep['sdp'], subscription=videoroom_session.id))
self._send_data(json.dumps(data))
elif event_type == 'slow_link':
pass
else:
log.warn('Received unexpected plugin event type %s: plugin_data=%s, event_data=%s' % (event_type, plugin_data, event_data))
diff --git a/sylk/applications/webrtcgateway/web/push.py b/sylk/applications/webrtcgateway/web/push.py
new file mode 100644
index 0000000..511da3c
--- /dev/null
+++ b/sylk/applications/webrtcgateway/web/push.py
@@ -0,0 +1,78 @@
+
+import json
+
+from sipsimple.util import ISOTimestamp
+from twisted.internet import defer, reactor
+from twisted.web.client import Agent
+from twisted.web.iweb import IBodyProducer
+from twisted.web.http_headers import Headers
+from zope.interface import implementer
+
+from sylk.applications.webrtcgateway.configuration import GeneralConfig
+from sylk.applications.webrtcgateway.logger import log
+
+__all__ = ['incoming_session', 'missed_session']
+
+
+agent = Agent(reactor)
+headers = Headers({'User-Agent': ['SylkServer'],
+ 'Content-Type': ['application/json'],
+ 'Authorization': ['key=%s' % GeneralConfig.firebase_server_key]})
+FIREBASE_API_URL = 'https://fcm.googleapis.com/fcm/send'
+
+
+@implementer(IBodyProducer)
+class StringProducer(object):
+ def __init__(self, data):
+ self.body = data
+ self.length = len(data)
+
+ def startProducing(self, consumer):
+ consumer.write(self.body)
+ return defer.succeed(None)
+
+ def pauseProducing(self):
+ pass
+
+ def stopProducing(self):
+ pass
+
+
+def incoming_session(originator, destination, tokens):
+ for token in tokens:
+ data = {'to': token, 'notification': {}, 'data': {'sylkrtc': {}}}
+ data['notification']['body'] = 'Incoming session from %s' % originator
+ data['priority'] = 'high'
+ data['time_to_live'] = 60 # don't deliver if phone is out for over a minute
+ data['data']['sylkrtc']['event'] = 'incoming_session'
+ data['data']['sylkrtc']['originator'] = originator
+ data['data']['sylkrtc']['destination'] = destination
+ data['data']['sylkrtc']['timestamp'] = str(ISOTimestamp.now())
+ _send_push_notification(json.dumps(data))
+
+
+def missed_session(originator, destination, tokens):
+ for token in tokens:
+ data = {'to': token, 'notification': {}, 'data': {'sylkrtc': {}}}
+ data['notification']['body'] = 'Missed session from %s' % originator
+ data['priority'] = 'high'
+ # No TTL, default is 4 weeks
+ data['data']['sylkrtc']['event'] = 'missed_session'
+ data['data']['sylkrtc']['originator'] = originator
+ data['data']['sylkrtc']['destination'] = destination
+ data['data']['sylkrtc']['timestamp'] = str(ISOTimestamp.now())
+ _send_push_notification(json.dumps(data))
+
+
+@defer.inlineCallbacks
+def _send_push_notification(payload):
+ if GeneralConfig.firebase_server_key:
+ try:
+ r = yield agent.request('POST', FIREBASE_API_URL, headers, StringProducer(payload))
+ except Exception, e:
+ log.msg('Error sending Firebase message: %s', e)
+ else:
+ if r.code != 200:
+ log.warn('Error sending Firebase message: %s' % r.phrase)
+ else:
+ log.warn('Cannot send push notification: no Firebase server key configured')
diff --git a/sylk/applications/webrtcgateway/web/storage.py b/sylk/applications/webrtcgateway/web/storage.py
new file mode 100644
index 0000000..c890405
--- /dev/null
+++ b/sylk/applications/webrtcgateway/web/storage.py
@@ -0,0 +1,43 @@
+
+__all__ = ['TokenStorage']
+
+import cPickle as pickle
+import os
+
+from application.python.types import Singleton
+from collections import defaultdict
+from sipsimple.threading import run_in_thread
+
+from sylk.configuration import ServerConfig
+
+
+class TokenStorage(object):
+ __metaclass__ = Singleton
+
+ def __init__(self):
+ self._tokens = defaultdict(set)
+
+ @run_in_thread('file-io')
+ def _save(self):
+ with open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'wb+') as f:
+ pickle.dump(self._tokens, f)
+
+ @run_in_thread('file-io')
+ def load(self):
+ try:
+ tokens = pickle.load(open(os.path.join(ServerConfig.spool_dir, 'webrtc_device_tokens'), 'rb'))
+ except Exception:
+ pass
+ else:
+ self._tokens.update(tokens)
+
+ def __getitem__(self, key):
+ return self._tokens[key]
+
+ def add(self, account, token):
+ self._tokens[account].add(token)
+ self._save()
+
+ def remove(self, account, token):
+ self._tokens[account].discard(token)
+ self._save()
diff --git a/webrtcgateway.ini.sample b/webrtcgateway.ini.sample
index 8b3ca9c..3254c95 100644
--- a/webrtcgateway.ini.sample
+++ b/webrtcgateway.ini.sample
@@ -1,48 +1,57 @@
; SylkServer WebRTC gateway configuration file
;
; For the gateway to work Janus needs to be properly installed and configured,
; please refer to README.webrtc for detailed instructions
;
[General]
; List of allowed web origins. The connection origin (Origin header in the
; HTTP request) will be checked against the list defined here, if the domain
; is no allowed the connection will be refused.
; * (the default) means any
; web_origins = *
; Proxy used for outbound SIP traffic
; outbound_sip_proxy =
; List of allowed SIP domains for managing accounts
; sip_domains = *
; Boolean indicating if the WebSocket messages sent to/from clients should be logged
; to a file
; trace_websocket = False
; WebSocket Ping frames are sent at the configured interval, this helps detect dead
; client connections
; websocket_ping_interval = 120
+; IP and port for the HTTP management interface
+; http_management_interface = 127.0.0.1:20888
+
+; Shared secret for the HTTP management interface (Authorization: key=THE_KEY)
+; http_management_auth_secret =
+
+; Server key for Firebase Cloud Messaging
+; firebase_server_key =
+
[Janus]
; URL pointing to the Janus API endpoint (only WebSocket is supported)
; api_url = ws://127.0.0.1:8188
; API secret shared with Janus (must match the value in janus.cfg)
; A random UUID value is recommended, a new value can be generated with
; the following command:
; python -c 'import uuid; print(uuid.uuid4().hex)'
api_secret = 0745f2f74f34451c89343afcdcae5809
; Boolean indicating if the messages between SylkServer and Janus should be logged to
; a file
; trace_janus = False
; Per room configuration options
; [room1@videoconference.example.com]
; record = True
; access_policy = deny, allow
; deny = all
; allow = domain1.com, test1@example.com, test2@example.com
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 7:15 AM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3409016
Default Alt Text
(89 KB)
Attached To
Mode
rSYLK SylkServer
Attached
Detach File
Event Timeline
Log In to Comment