Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/webrtcgateway/handler.py b/sylk/applications/webrtcgateway/handler.py
index 4312811..4f20c5c 100644
--- a/sylk/applications/webrtcgateway/handler.py
+++ b/sylk/applications/webrtcgateway/handler.py
@@ -1,1323 +1,1320 @@
import json
import os
import random
import re
import uuid
import weakref
from application.python import Null
from application.system import makedirs
from eventlib import coros, proc
from eventlib.twistedutil import block_on
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading.green import run_in_green_thread
from sipsimple.util import ISOTimestamp
from twisted.internet import reactor
from sylk.applications.webrtcgateway.configuration import GeneralConfig, get_room_config
from sylk.applications.webrtcgateway.logger import log
from sylk.applications.webrtcgateway.models import sylkrtc
from sylk.applications.webrtcgateway.storage import TokenStorage
from sylk.applications.webrtcgateway.util import GreenEvent
SIP_PREFIX_RE = re.compile('^sips?:')
class ACLValidationError(Exception): pass
sylkrtc_models = {
# account management
'account-add' : sylkrtc.AccountAddRequest,
'account-remove' : sylkrtc.AccountRemoveRequest,
'account-register' : sylkrtc.AccountRegisterRequest,
'account-unregister' : sylkrtc.AccountUnregisterRequest,
'account-devicetoken' : sylkrtc.AccountDeviceTokenRequest,
# session management
'session-create' : sylkrtc.SessionCreateRequest,
'session-answer' : sylkrtc.SessionAnswerRequest,
'session-trickle' : sylkrtc.SessionTrickleRequest,
'session-terminate' : sylkrtc.SessionTerminateRequest,
# video conference management
'videoroom-join' : sylkrtc.VideoRoomJoinRequest,
'videoroom-ctl' : sylkrtc.VideoRoomControlRequest,
'videoroom-terminate' : sylkrtc.VideoRoomTerminateRequest,
}
class AccountInfo(object):
def __init__(self, id, password, display_name=None, user_agent=None):
self.id = id
self.password = password
self.display_name = display_name
self.user_agent = user_agent
self.registration_state = None
self.janus_handle_id = None
@property
def uri(self):
return 'sip:%s' % self.id
class SessionPartyIdentity(object):
def __init__(self, uri, display_name=''):
self.uri = uri
self.display_name = display_name
class SIPSessionInfo(object):
def __init__(self, id):
self.id = id
self.direction = None
self.state = None
self.account_id = None
self.local_identity = None # instance of SessionPartyIdentity
self.remote_identity = None # instance of SessionPartyIdentity
self.janus_handle_id = None
self.trickle_ice_active = False
def init_outgoing(self, account_id, destination):
self.account_id = account_id
self.direction = 'outgoing'
self.state = 'connecting'
self.local_identity = SessionPartyIdentity(account_id)
self.remote_identity = SessionPartyIdentity(destination)
def init_incoming(self, account_id, originator, originator_display_name=''):
self.account_id = account_id
self.direction = 'incoming'
self.state = 'connecting'
self.local_identity = SessionPartyIdentity(account_id)
self.remote_identity = SessionPartyIdentity(originator, originator_display_name)
class VideoRoom(object):
def __init__(self, uri):
self.config = get_room_config(uri)
self.uri = uri
self.record = self.config.record
self.rec_dir = os.path.join(GeneralConfig.recording_dir, '%s/' % uri)
self.id = random.getrandbits(32) # janus needs numeric room names
self.destroyed = False
self._session_id_map = weakref.WeakValueDictionary()
self._publisher_id_map = weakref.WeakValueDictionary()
if self.record:
makedirs(self.rec_dir, 0755)
log.msg('Video room %s created with recording in %s' % (self.uri, self.rec_dir))
else:
log.msg('Video room %s created without recording' % self.uri)
def add(self, session):
assert session.publisher_id is not None
assert session.id not in self._session_id_map
assert session.publisher_id not in self._publisher_id_map
self._session_id_map[session.id] = session
self._publisher_id_map[session.publisher_id] = session
log.msg('Video room %s: added session %s for %s' % (self.uri, session.id, session.account_id))
def __getitem__(self, key):
try:
return self._session_id_map[key]
except KeyError:
return self._publisher_id_map[key]
def __len__(self):
return len(self._session_id_map)
class VideoRoomSessionInfo(object):
def __init__(self, id):
self.id = id
self.account_id = None
self.type = None # publisher / subscriber
self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers
self.janus_handle_id = None
self.room = None
self.parent_session = None
self.feeds = {} # janus publisher ID -> our publisher ID
self.trickle_ice_active = False
def initialize(self, account_id, type, room):
assert type in ('publisher', 'subscriber')
self.account_id = account_id
self.type = type
self.room = room
log.msg('Video room %s: new session %s initialized by %s' % (self.room.uri, self.id, self.account_id))
def __repr__(self):
return '<%s: id=%s janus_handle_id=%s type=%s>' % (self.__class__.__name__, self.id, self.janus_handle_id, self.type)
class VideoRoomSessionContainer(object):
def __init__(self):
self._sessions = set()
self._janus_handle_map = weakref.WeakValueDictionary()
self._id_map = weakref.WeakValueDictionary()
def add(self, session):
assert session not in self._sessions
assert session.janus_handle_id not in self._janus_handle_map
assert session.id not in self._id_map
self._sessions.add(session)
self._janus_handle_map[session.janus_handle_id] = session
self._id_map[session.id] = session
def remove(self, session):
self._sessions.discard(session)
def count(self):
return len(self._sessions)
def clear(self):
self._sessions.clear()
def __len__(self):
return len(self._sessions)
def __iter__(self):
return iter(self._sessions)
def __getitem__(self, key):
try:
return self._id_map[key]
except KeyError:
return self._janus_handle_map[key]
def __contains__(self, item):
return item in self._id_map or item in self._janus_handle_map or item in self._sessions
class Operation(object):
__slots__ = ('name', 'data')
def __init__(self, name, data):
self.name = name
self.data = data
class APIError(Exception):
pass
class ConnectionHandler(object):
def __init__(self, protocol):
self.protocol = protocol
self.janus_session_id = None
self.accounts_map = {} # account ID -> account
self.account_handles_map = {} # Janus handle ID -> account
self.sessions_map = {} # session ID -> session
self.session_handles_map = {} # Janus handle ID -> session
self.videoroom_sessions = VideoRoomSessionContainer() # session ID / janus handle ID -> session
self.ready_event = GreenEvent()
self.resolver = DNSLookup()
self.proc = proc.spawn(self._operations_handler)
self.operations_queue = coros.queue()
def start(self):
self._create_janus_session()
def stop(self):
if self.ready_event.is_set():
assert self.janus_session_id is not None
for account_info in self.accounts_map.values():
handle_id = account_info.janus_handle_id
if handle_id is not None:
self.protocol.backend.janus_detach(self.janus_session_id, handle_id)
self.protocol.backend.janus_set_event_handler(handle_id, None)
self.protocol.backend.janus_stop_keepalive(self.janus_session_id)
self.protocol.backend.janus_destroy_session(self.janus_session_id)
if self.proc is not None:
self.proc.kill()
self.proc = None
# cleanup
self.ready_event.clear()
self.accounts_map.clear()
self.account_handles_map.clear()
self.sessions_map.clear()
self.session_handles_map.clear()
self.videoroom_sessions.clear()
self.janus_session_id = None
self.protocol = None
def handle_message(self, data):
try:
request_type = data.pop('sylkrtc')
except KeyError:
log.warn('Error getting WebSocket message type')
return
self.ready_event.wait()
try:
model = sylkrtc_models[request_type]
except KeyError:
log.warn('Unknown request type: %s' % request_type)
return
try:
request = model(**data)
request.validate()
except Exception, e:
log.error('%s: %s' % (request_type, e))
transaction = data.get('transaction') # we cannot rely on request.transaction as request may not have been created
if transaction:
self._send_response(sylkrtc.ErrorResponse(transaction=transaction, error=str(e)))
return
op = Operation(request_type, request)
self.operations_queue.send(op)
def validate_acl(self, room_uri, from_uri):
cfg = get_room_config(room_uri)
if cfg.access_policy == 'allow,deny':
if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri):
return
raise ACLValidationError
else:
if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri):
raise ACLValidationError
# internal methods (not overriding / implementing the protocol API)
def _send_response(self, response):
response.validate()
self._send_data(json.dumps(response.to_struct()))
def _send_data(self, data):
if GeneralConfig.trace_websocket:
self.protocol.factory.ws_logger.msg("OUT", ISOTimestamp.now(), data)
self.protocol.sendMessage(data, False)
def _cleanup_session(self, session):
@run_in_green_thread
def do_cleanup():
if self.janus_session_id is None:
# The connection was closed, there is noting to do here
return
self.sessions_map.pop(session.id)
if session.direction == 'outgoing':
# Destroy plugin handle for outgoing sessions. For incoming ones it's the
# same as the account handle, so don't
block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None)
self.session_handles_map.pop(session.janus_handle_id)
# give it some time to receive other hangup events
reactor.callLater(2, do_cleanup)
def _cleanup_videoroom_session(self, session):
@run_in_green_thread
def do_cleanup():
if self.janus_session_id is None:
# The connection was closed, there is noting to do here
return
if session in self.videoroom_sessions:
self.videoroom_sessions.remove(session)
block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None)
# give it some time to receive other hangup events
reactor.callLater(2, do_cleanup)
def _maybe_destroy_videoroom(self, videoroom):
if videoroom is None:
return
@run_in_green_thread
def f():
if self.protocol is None:
# The connection was closed
return
# destroy the room if empty
if not videoroom and not videoroom.destroyed:
videoroom.destroyed = True
self.protocol.factory.videorooms.remove(videoroom)
# create a handle to do the cleanup
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
data = {'request': 'destroy',
'room': videoroom.id}
try:
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except Exception:
pass
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
log.msg('Video room %s destroyed' % videoroom.uri)
# don't destroy it immediately
reactor.callLater(5, f)
@run_in_green_thread
def _create_janus_session(self):
if self.ready_event.is_set():
self._send_response(sylkrtc.ReadyEvent())
return
try:
self.janus_session_id = block_on(self.protocol.backend.janus_create_session())
self.protocol.backend.janus_start_keepalive(self.janus_session_id)
except Exception, e:
log.warn('Error creating session, disconnecting: %s' % e)
self.protocol.disconnect(3000, unicode(e))
return
self._send_response(sylkrtc.ReadyEvent())
self.ready_event.set()
def _lookup_sip_proxy(self, uri):
- sip_uri = SIPURI.parse('sip:%s' % uri)
-
# The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed
# as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use
# caching to avoid long delays, we randomize the results matching the highest priority route's
# transport.
+
proxy = GeneralConfig.outbound_sip_proxy
if proxy is not None:
- proxy_uri = SIPURI(host=proxy.host,
- port=proxy.port,
- parameters={'transport': proxy.transport})
+ sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport})
else:
- proxy_uri = SIPURI(host=sip_uri.host)
+ sip_uri = SIPURI.parse('sip:%s' % uri)
settings = SIPSimpleSettings()
- routes = self.resolver.lookup_sip_proxy(proxy_uri, settings.sip.transport_list).wait()
+ routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait()
if not routes:
raise DNSLookupError('no results found')
# Get all routes with the highest priority transport and randomly pick one
route = random.choice([r for r in routes if r.transport == routes[0].transport])
log.debug("DNS lookup for SIP proxy for {} yielded {}".format(uri, route))
# Build a proxy URI Sofia-SIP likes
return 'sips:{route.address}:{route.port}'.format(route=route) if route.transport == 'tls' else str(route.uri)
def _handle_conference_invite(self, originator, room_uri, participants):
for p in participants:
try:
account_info = self.accounts_map[p]
except KeyError:
continue
data = dict(sylkrtc='account_event',
account=account_info.id,
event='conference_invite',
data=dict(originator=dict(uri=originator.id, display_name=originator.display_name), room=room_uri))
log.msg('Video room %s: invitation from %s to %s' % (room_uri, originator.id, account_info.id))
self._send_data(json.dumps(data))
def _handle_janus_event_sip(self, handle_id, event_type, event):
# TODO: use a model
op = Operation('janus-event-sip', data=dict(handle_id=handle_id, event_type=event_type, event=event))
self.operations_queue.send(op)
def _handle_janus_event_videoroom(self, handle_id, event_type, event):
# TODO: use a model
op = Operation('janus-event-videoroom', data=dict(handle_id=handle_id, event_type=event_type, event=event))
self.operations_queue.send(op)
def _operations_handler(self):
while True:
op = self.operations_queue.wait()
handler = getattr(self, '_OH_%s' % op.name.replace('-', '_'), Null)
try:
handler(op.data)
except Exception:
log.exception('Unhandled exception in operation "%s"' % op.name)
del op, handler
def _OH_account_add(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
password = request.password
display_name = request.display_name
user_agent = request.user_agent
try:
if account in self.accounts_map:
raise APIError('Account %s already added' % account)
# check if domain is acceptable
domain = account.partition('@')[2]
if not {'*', domain}.intersection(GeneralConfig.sip_domains):
raise APIError('SIP domain not allowed: %s' % domain)
# Create and store our mapping
account_info = AccountInfo(account, password, display_name, user_agent)
self.accounts_map[account_info.id] = account_info
except APIError, e:
log.error('account_add: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s added using %s at %s' % (account, user_agent, self.protocol.peer))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_remove(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
try:
try:
account_info = self.accounts_map.pop(account)
except KeyError:
raise APIError('Unknown account specified: %s' % account)
# cleanup in case the client didn't unregister before removing the account
handle_id = account_info.janus_handle_id
if handle_id is not None:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
self.account_handles_map.pop(handle_id)
except APIError, e:
log.error('account_remove: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s removed' % account)
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_register(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
try:
proxy = self._lookup_sip_proxy(account)
except DNSLookupError:
raise APIError('DNS lookup error')
handle_id = account_info.janus_handle_id
if handle_id is not None:
# Destroy the existing plugin handle
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
self.account_handles_map.pop(handle_id)
account_info.janus_handle_id = None
# Create a plugin handle
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip)
account_info.janus_handle_id = handle_id
self.account_handles_map[handle_id] = account_info
data = {'request': 'register',
'username': account_info.uri,
'display_name': account_info.display_name,
'user_agent': account_info.user_agent,
'ha1_secret': account_info.password,
'proxy': proxy}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except APIError, e:
log.error('account-register: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.debug('Account %s will register using %s' % (account, account_info.user_agent))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_unregister(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
handle_id = account_info.janus_handle_id
if handle_id is not None:
log.msg('Account %s will unregister' % account)
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
account_info.janus_handle_id = None
self.account_handles_map.pop(handle_id)
except APIError, e:
log.error('account-unregister: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Account %s removed' % account)
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_account_devicetoken(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
old_token = request.old_token
new_token = request.new_token
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
storage = TokenStorage()
if old_token is not None:
storage.remove(account, old_token)
log.msg('Removed device token %s for account %s using %s' % (old_token, account, account_info.user_agent))
if new_token is not None:
storage.add(account, new_token)
log.msg('Added device token %s for account %s using %s' % (new_token, account, account_info.user_agent))
except APIError, e:
log.error('account-devicetoken: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_create(self, request):
# extract the fields to avoid going through the descriptor several times
account = request.account
session = request.session
uri = request.uri
sdp = request.sdp
try:
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
if session in self.sessions_map:
raise APIError('Session ID (%s) already in use' % session)
# Create a new plugin handle and 'register' it, without actually doing so
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip)
try:
proxy = self._lookup_sip_proxy(uri)
except DNSLookupError:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
raise APIError('DNS lookup error')
data = {'request': 'register',
'username': account_info.uri,
'display_name': account_info.display_name,
'user_agent': account_info.user_agent,
'ha1_secret': account_info.password,
'proxy': proxy,
'send_register': False}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
session_info = SIPSessionInfo(session)
session_info.janus_handle_id = handle_id
session_info.init_outgoing(account, uri)
# TODO: create a "SessionContainer" object combining the 2
self.sessions_map[session_info.id] = session_info
self.session_handles_map[handle_id] = session_info
data = {'request': 'call', 'uri': 'sip:%s' % SIP_PREFIX_RE.sub('', uri), 'srtp': 'sdes_optional'}
jsep = {'type': 'offer', 'sdp': sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep))
except APIError, e:
log.error('session-create: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Outgoing session %s from %s to %s started using %s from %s' % (session, account, uri, account_info.user_agent, self.protocol.peer))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_answer(self, request):
# extract the fields to avoid going through the descriptor several times
session = request.session
sdp = request.sdp
try:
try:
session_info = self.sessions_map[session]
except KeyError:
raise APIError('Unknown session specified: %s' % session)
if session_info.direction != 'incoming':
raise APIError('Cannot answer outgoing session')
if session_info.state != 'connecting':
raise APIError('Invalid state for session answer')
data = {'request': 'accept'}
jsep = {'type': 'answer', 'sdp': sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data, jsep))
except APIError, e:
log.error('session-answer: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('%s answered session %s' % (session_info.account_id, session))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_trickle(self, request):
# extract the fields to avoid going through the descriptor several times
session = request.session
candidates = [c.to_struct() for c in request.candidates]
try:
try:
session_info = self.sessions_map[session]
except KeyError:
raise APIError('Unknown session specified: %s' % session)
if session_info.state == 'terminated':
raise APIError('Session is terminated')
try:
account_info = self.accounts_map[session_info.account_id]
except KeyError:
raise APIError('Unknown account specified: %s' % session_info.account_id)
block_on(self.protocol.backend.janus_trickle(self.janus_session_id, session_info.janus_handle_id, candidates))
except APIError, e:
log.error('session-trickle: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
if candidates:
if not session_info.trickle_ice_active:
log.msg('Session %s: ICE negotiation started by %s using %s' % (session_info.id, session_info.account_id, account_info.user_agent))
session_info.trickle_ice_active = True
else:
log.msg('Session %s: ICE negotiation ended by %s using %s' % (session_info.id, session_info.account_id, account_info.user_agent))
session_info.trickle_ice_active = False
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_session_terminate(self, request):
# extract the fields to avoid going through the descriptor several times
session = request.session
try:
try:
session_info = self.sessions_map[session]
except KeyError:
raise APIError('Unknown session specified: %s' % session)
if session_info.state not in ('connecting', 'progress', 'accepted', 'established'):
raise APIError('Invalid state for session terminate: \"%s\"' % session_info.state)
if session_info.direction == 'incoming' and session_info.state == 'connecting':
data = {'request': 'decline', 'code': 486}
else:
data = {'request': 'hangup'}
block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data))
except APIError, e:
log.error('session-terminate: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.debug('%s terminated session %s' % (session_info.account_id, session))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
def _OH_videoroom_join(self, request):
account = request.account
session = request.session
uri = request.uri
sdp = request.sdp
videoroom = None
try:
try:
self.validate_acl(uri, account)
except ACLValidationError:
raise APIError('%s is not allowed to join room %s' % (account, uri))
try:
account_info = self.accounts_map[account]
except KeyError:
raise APIError('Unknown account specified: %s' % account)
if session in self.videoroom_sessions:
raise APIError('Video room session ID (%s) already in use' % session)
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
# create the room if it doesn't exist
try:
videoroom = self.protocol.factory.videorooms[uri]
except KeyError:
videoroom = VideoRoom(uri)
self.protocol.factory.videorooms.add(videoroom)
data = {'request': 'create',
'room': videoroom.id,
'publishers': 10,
'record': videoroom.record,
'rec_dir': videoroom.rec_dir}
try:
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except Exception, e:
code = getattr(e, 'code', -1)
if code != 427: # 417 == room exists
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
raise APIError(str(e))
# join the room
data = {'request': 'joinandconfigure',
'room': videoroom.id,
'ptype': 'publisher',
'audio': True,
'video': True}
if account_info.display_name:
data['display'] = account_info.display_name
jsep = {'type': 'offer', 'sdp': sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep))
videoroom_session = VideoRoomSessionInfo(session)
videoroom_session.janus_handle_id = handle_id
videoroom_session.initialize(account, 'publisher', videoroom)
self.videoroom_sessions.add(videoroom_session)
except APIError, e:
log.error('videoroom-join: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
self._maybe_destroy_videoroom(videoroom)
else:
log.msg('Video room %s: joined by %s using %s (%d participants present) from %s' % (videoroom.uri, account, account_info.user_agent, self.videoroom_sessions.count(), self.protocol.peer))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='progress'))
self._send_data(json.dumps(data))
def _OH_videoroom_ctl(self, request):
if request.option == 'trickle':
trickle = request.trickle
if not trickle:
log.error('videoroom-ctl: missing field')
return
candidates = [c.to_struct() for c in trickle.candidates]
session = trickle.session or request.session
try:
try:
videoroom_session = self.videoroom_sessions[session]
except KeyError:
raise APIError('trickle: unknown video room session ID specified: %s' % session)
try:
account_info = self.accounts_map[videoroom_session.account_id]
except KeyError:
raise APIError('Unknown account specified: %s' % videoroom_session.account_id)
block_on(self.protocol.backend.janus_trickle(self.janus_session_id, videoroom_session.janus_handle_id, candidates))
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
if candidates:
if not videoroom_session.trickle_ice_active:
log.msg('Video room %s: ICE negotiation started by %s using %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
videoroom_session.trickle_ice_active = True
else:
log.msg('Video room %s: ICE negotiation ended by %s using %s' % (videoroom_session.room.uri, videoroom_session.account_id, account_info.user_agent))
videoroom_session.trickle_ice_active = False
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
elif request.option == 'feed-attach':
feed_attach = request.feed_attach
if not feed_attach:
log.error('videoroom-ctl: missing field')
return
try:
if feed_attach.session in self.videoroom_sessions:
raise APIError('feed-attach: video room session ID (%s) already in use' % feed_attach.session)
# get the 'base' session, the one used to join and publish
try:
base_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('feed-attach: unknown video room session ID specified: %s' % request.session)
# get the publisher's session
try:
publisher_session = base_session.room[feed_attach.publisher]
except KeyError:
raise APIError('feed-attach: unknown publisher video room session ID specified: %s' % feed_attach.publisher)
if publisher_session.publisher_id is None:
raise APIError('feed-attach: video room session ID does not have a publisher ID' % feed_attach.publisher)
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
# join the room as a listener
data = {'request': 'join',
'room': base_session.room.id,
'ptype': 'listener',
'feed': publisher_session.publisher_id}
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
videoroom_session = VideoRoomSessionInfo(feed_attach.session)
videoroom_session.janus_handle_id = handle_id
videoroom_session.parent_session = base_session
videoroom_session.publisher_id = publisher_session.id
videoroom_session.initialize(base_session.account_id, 'subscriber', base_session.room)
self.videoroom_sessions.add(videoroom_session)
base_session.feeds[publisher_session.publisher_id] = publisher_session.id
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Video room %s: %s attached to %s' % (base_session.room.uri, base_session.account_id, feed_attach.publisher))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
elif request.option == 'feed-answer':
feed_answer = request.feed_answer
if not feed_answer:
log.error('videoroom-ctl: missing field')
return
try:
try:
videoroom_session = self.videoroom_sessions[request.feed_answer.session]
except KeyError:
raise APIError('feed-answer: unknown video room session ID specified: %s' % feed_answer.session)
data = {'request': 'start',
'room': videoroom_session.room.id}
jsep = {'type': 'answer', 'sdp': feed_answer.sdp}
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data, jsep))
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
elif request.option == 'feed-detach':
feed_detach = request.feed_detach
if not feed_detach:
log.error('videoroom-ctl: missing field')
return
try:
try:
base_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('feed-detach: unknown video room session ID specified: %s' % request.session)
try:
videoroom_session = self.videoroom_sessions[feed_detach.session]
except KeyError:
raise APIError('feed-detach: unknown video room session ID specified: %s' % feed_detach.session)
data = {'request': 'leave'}
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data))
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
block_on(self.protocol.backend.janus_detach(self.janus_session_id, videoroom_session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(videoroom_session.janus_handle_id, None)
self.videoroom_sessions.remove(videoroom_session)
try:
janus_publisher_id = next(k for k, v in base_session.feeds.iteritems() if v == videoroom_session.publisher_id)
except StopIteration:
pass
else:
base_session.feeds.pop(janus_publisher_id)
elif request.option == 'invite-participants':
invite_participants = request.invite_participants
if not invite_participants:
log.error('videoroom-ctl: missing field')
return
try:
try:
base_session = self.videoroom_sessions[request.session]
account_info = self.accounts_map[base_session.account_id]
except KeyError:
raise APIError('invite-participants: unknown video room session ID specified: %s' % request.session)
except APIError, e:
log.error('videoroom-ctl: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
for conn in self.protocol.factory.connections.difference([self]):
if conn.connection_handler:
conn.connection_handler._handle_conference_invite(account_info, base_session.room.uri, invite_participants.participants)
else:
log.error('videoroom-ctl: unsupported option: %s' % request.option)
def _OH_videoroom_terminate(self, request):
session = request.session
try:
try:
videoroom_session = self.videoroom_sessions[session]
except KeyError:
raise APIError('Unknown video room session ID specified: %s' % session)
data = {'request': 'leave'}
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data))
except APIError, e:
log.error('videoroom-terminate: %s' % e)
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
log.msg('Video room %s: %s left the room (%d participants present)' % (videoroom_session.room.uri, videoroom_session.account_id, self.videoroom_sessions.count()))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='terminated'))
self._send_data(json.dumps(data))
self._cleanup_videoroom_session(videoroom_session)
self._maybe_destroy_videoroom(videoroom_session.room)
# Event handlers
def _OH_janus_event_sip(self, data):
handle_id = data['handle_id']
event_type = data['event_type']
event = data['event']
if event_type == 'event':
self._janus_event_plugin_sip(data)
elif event_type == 'webrtcup':
try:
session_info = self.session_handles_map[handle_id]
except KeyError:
log.msg('Could not find session for handle ID %s' % handle_id)
return
session_info.state = 'established'
data = dict(sylkrtc='session_event',
session=session_info.id,
event='state',
data=dict(state=session_info.state))
direction = session_info.direction.title()
log.debug('%s session %s from %s to %s state: %s' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
session_info.state))
# TODO: SessionEvent model
self._send_data(json.dumps(data))
elif event_type == 'hangup':
try:
session_info = self.session_handles_map[handle_id]
except KeyError:
log.msg('Could not find session for handle ID %s' % handle_id)
return
if session_info.state != 'terminated':
session_info.state = 'terminated'
code = event.get('code', 0)
reason = event.get('reason', 'Unknown')
reason = '%d %s' % (code, reason)
data = dict(sylkrtc='session_event',
session=session_info.id,
event='state',
data=dict(state=session_info.state, reason=reason))
# TODO: SessionEvent model
self._send_data(json.dumps(data))
self._cleanup_session(session_info)
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s terminated (%s)' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
reason))
elif event_type in ('media', 'detached'):
# ignore
pass
elif event_type == 'slowlink':
# TODO something
pass
else:
log.warn('Received unexpected event type: %s' % event_type)
def _janus_event_plugin_sip(self, data):
handle_id = data['handle_id']
event = data['event']
plugin_data = event['plugindata']
assert(plugin_data['plugin'] == 'janus.plugin.sip')
event_data = event['plugindata']['data']
assert(event_data.get('sip', '') == 'event')
if 'result' not in event_data:
log.warn('Unexpected event: %s' % event)
return
event_data = event_data['result']
jsep = event.get('jsep', None)
event_type = event_data['event']
if event_type in ('registering', 'registered', 'registration_failed', 'incomingcall'):
# skip 'registered' events from session handles
if event_type == 'registered' and event_data['register_sent'] in (False, 'false'):
return
# account event
try:
account_info = self.account_handles_map[handle_id]
except KeyError:
log.warn('Could not find account for handle ID %s' % handle_id)
return
if event_type == 'incomingcall':
originator_uri = SIP_PREFIX_RE.sub('', event_data['username'])
originator_display_name = event_data.get('displayname', '').replace('"', '')
jsep = event.get('jsep', None)
assert jsep is not None
session_id = uuid.uuid4().hex
session = SIPSessionInfo(session_id)
session.janus_handle_id = handle_id
session.init_incoming(account_info.id, originator_uri, originator_display_name)
self.sessions_map[session_id] = session
self.session_handles_map[handle_id] = session
data = dict(sylkrtc='account_event',
account=account_info.id,
session=session_id,
event='incoming_session',
data=dict(originator=session.remote_identity.__dict__, sdp=jsep['sdp']))
log.msg('Incoming session %s from %s to %s created' % (session.id,
session.remote_identity.uri,
session.local_identity.uri))
else:
registration_state = event_type
if registration_state == 'registration_failed':
registration_state = 'failed'
if account_info.registration_state == registration_state:
return
account_info.registration_state = registration_state
registration_data = dict(state=registration_state)
if registration_state == 'failed':
code = event_data['code']
reason = event_data['reason']
registration_data['reason'] = '%d %s' % (code, reason)
log.msg('Account %s registration failed: %s (%s)' % (account_info.id, code, reason))
elif registration_state == 'registered':
log.msg('Account %s registered using %s from %s' % (account_info.id, account_info.user_agent, self.protocol.peer))
data = dict(sylkrtc='account_event',
account=account_info.id,
event='registration_state',
data=registration_data)
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type in ('calling', 'accepted', 'hangup'):
# session event
try:
session_info = self.session_handles_map[handle_id]
except KeyError:
log.warn('Could not find session for handle ID %s' % handle_id)
return
if event_type == 'hangup' and session_info.state == 'terminated':
return
if event_type == 'calling':
session_info.state = 'progress'
elif event_type == 'accepted':
session_info.state = 'accepted'
elif event_type == 'hangup':
session_info.state = 'terminated'
data = dict(sylkrtc='session_event',
session=session_info.id,
event='state',
data=dict(state=session_info.state))
direction = session_info.direction.title()
log.debug('%s session %s from %s to %s state: %s' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
session_info.state))
if session_info.state == 'accepted' and session_info.direction == 'outgoing':
assert jsep is not None
data['data']['sdp'] = jsep['sdp']
elif session_info.state == 'terminated':
code = event_data.get('code', 0)
reason = event_data.get('reason', 'Unknown')
reason = '%d %s' % (code, reason)
data['data']['reason'] = reason
# TODO: SessionEvent model
self._send_data(json.dumps(data))
if session_info.state == 'terminated':
self._cleanup_session(session_info)
direction = session_info.direction.title()
log.msg('%s session %s from %s to %s terminated (%s)' % (direction,
session_info.id,
session_info.local_identity.uri if direction == 'Outgoing' else session_info.remote_identity.uri,
session_info.remote_identity.uri if direction == 'Outgoing' else session_info.local_identity.uri,
reason))
# check if missed incoming call
if session_info.direction == 'incoming' and code == 487:
data = dict(sylkrtc='account_event',
account=session_info.account_id,
event='missed_session',
data=dict(originator=session_info.remote_identity.__dict__))
log.msg('Incoming session from %s to %s was not answered ' % (session_info.remote_identity.uri, session_info.local_identity.uri))
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type == 'missed_call':
try:
account_info = self.account_handles_map[handle_id]
except KeyError:
log.warn('Could not find account for handle ID %s' % handle_id)
return
originator_uri = SIP_PREFIX_RE.sub('', event_data['caller'])
originator_display_name = event_data.get('displayname', '').replace('"', '')
# We have no session, so create an identity object by hand
originator = SessionPartyIdentity(originator_uri, originator_display_name)
data = dict(sylkrtc='account_event',
account=account_info.id,
event='missed_session',
data=dict(originator=originator.__dict__))
log.msg('Incoming session from %s missed' % originator.uri)
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type in ('ack', 'declining', 'hangingup', 'proceeding'):
# ignore
pass
else:
log.warn('Unexpected SIP plugin event type: %s' % event_type)
def _OH_janus_event_videoroom(self, data):
handle_id = data['handle_id']
event_type = data['event_type']
if event_type == 'event':
self._janus_event_plugin_videoroom(data)
elif event_type == 'webrtcup':
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find videoroom session for handle ID %s' % handle_id)
return
base_session = videoroom_session.parent_session
if base_session is None:
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='established'))
else:
# this is a subscriber session
data = dict(sylkrtc='videoroom_event',
session=base_session.id,
event='feed_established',
data=dict(state='established', subscription=videoroom_session.id))
log.msg('Video room %s: session established to %s' % (videoroom_session.room.uri, videoroom_session.account_id))
self._send_data(json.dumps(data))
elif event_type == 'hangup':
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find video room session for handle ID %s' % handle_id)
return
log.msg('Video room %s: session terminated to %s' % (videoroom_session.room.uri, videoroom_session.account_id))
self._cleanup_videoroom_session(videoroom_session)
self._maybe_destroy_videoroom(videoroom_session.room)
elif event_type in ('media', 'detached'):
# ignore
pass
elif event_type == 'slowlink':
try:
videoroom_session = (session_info for session_info in self.videoroom_sessions if session_info.janus_handle_id == handle_id).next()
except StopIteration:
log.warn('Could not find video room session for Janus handle ID %s' % handle_id)
else:
try:
account_info = self.accounts_map[videoroom_session.account_id]
except KeyError:
raise APIError('Unknown account specified: %s' % videoroom_session.account_id)
try:
uplink = data['event']['uplink']
except KeyError:
log.warn('Could not find uplink in slowlink event data')
else:
direction = 'download' if uplink else 'upload' # uplink is from janus' point of view
log.msg('Video room {0.room.uri}: poor {1} connection for {2.id} using {2.user_agent}'.format(videoroom_session, direction, account_info))
else:
log.warn('Received unexpected event type %s: data=%s' % (event_type, data))
def _janus_event_plugin_videoroom(self, data):
handle_id = data['handle_id']
event = data['event']
plugin_data = event['plugindata']
assert(plugin_data['plugin'] == 'janus.plugin.videoroom')
event_data = event['plugindata']['data']
assert 'videoroom' in event_data
event_type = event_data['videoroom']
if event_type == 'joined':
# a join request succeeded, this is a publisher
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find video room session for handle ID %s' % handle_id)
return
room = videoroom_session.room
videoroom_session.publisher_id = event_data['id']
room.add(videoroom_session)
jsep = event.get('jsep', None)
assert jsep is not None
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='state',
data=dict(state='accepted', sdp=jsep['sdp']))
self._send_data(json.dumps(data))
# send information about existing publishers
publishers = []
for p in event_data['publishers']:
publisher_id = p['id']
publisher_display = p.get('display', '')
try:
publisher_session = room[publisher_id]
except KeyError:
log.warn('Could not find matching session for publisher %s' % publisher_id)
continue
item = {'id': publisher_session.id,
'uri': publisher_session.account_id,
'display_name': publisher_display}
publishers.append(item)
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='initial_publishers',
data=dict(publishers=publishers))
self._send_data(json.dumps(data))
elif event_type == 'event':
if 'publishers' in event_data:
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find videoroom session for handle ID %s' % handle_id)
return
room = videoroom_session.room
# send information about new publishers
publishers = []
for p in event_data['publishers']:
publisher_id = p['id']
publisher_display = p.get('display', '')
try:
publisher_session = room[publisher_id]
except KeyError:
log.warn('Could not find matching session for publisher %s' % publisher_id)
continue
item = {'id': publisher_session.id,
'uri': publisher_session.account_id,
'display_name': publisher_display}
publishers.append(item)
data = dict(sylkrtc='videoroom_event',
session=videoroom_session.id,
event='publishers_joined',
data=dict(publishers=publishers))
self._send_data(json.dumps(data))
elif 'leaving' in event_data:
try:
base_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find video room session for handle ID %s' % handle_id)
return
janus_publisher_id = event_data['leaving']
try:
publisher_id = base_session.feeds.pop(janus_publisher_id)
except KeyError:
return
data = dict(sylkrtc='videoroom_event',
session=base_session.id,
event='publishers_left',
data=dict(publishers=[publisher_id]))
self._send_data(json.dumps(data))
elif {'started', 'unpublished', 'left'}.intersection(event_data):
# ignore
pass
else:
log.warn('Received unexpected plugin "event" event')
elif event_type == 'attached':
# sent when a feed is subscribed for a given publisher
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
log.warn('Could not find videoroom session for handle ID %s' % handle_id)
return
# get the session which originated the subscription
base_session = videoroom_session.parent_session
assert base_session is not None
jsep = event.get('jsep', None)
assert jsep is not None
assert jsep['type'] == 'offer'
data = dict(sylkrtc='videoroom_event',
session=base_session.id,
event='feed_attached',
data=dict(sdp=jsep['sdp'], subscription=videoroom_session.id))
self._send_data(json.dumps(data))
elif event_type == 'slow_link':
pass
else:
log.warn('Received unexpected plugin event type %s: plugin_data=%s, event_data=%s' % (event_type, plugin_data, event_data))

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 2:04 PM (1 d, 4 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3409195
Default Alt Text
(63 KB)

Event Timeline