Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/webrtcgateway/handler.py b/sylk/applications/webrtcgateway/handler.py
index 4ced5eb..87bccc8 100644
--- a/sylk/applications/webrtcgateway/handler.py
+++ b/sylk/applications/webrtcgateway/handler.py
@@ -1,1312 +1,1336 @@
import hashlib
import json
import random
import re
import time
import uuid
-from application.python import Null, limit
+from application.python import limit
from application.python.weakref import defaultweakobjectmap
from application.system import makedirs
from eventlib import coros, proc
from eventlib.twistedutil import block_on
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading.green import call_in_green_thread, run_in_green_thread
+from string import maketrans
from twisted.internet import reactor
from sylk.applications.webrtcgateway.configuration import GeneralConfig, get_room_config
from sylk.applications.webrtcgateway.janus.backend import JanusError
from sylk.applications.webrtcgateway.logger import ConnectionLogger, VideoroomLogger
from sylk.applications.webrtcgateway.models import sylkrtc
from sylk.applications.webrtcgateway.storage import TokenStorage
from sylk.applications.webrtcgateway.util import GreenEvent
SIP_PREFIX_RE = re.compile('^sips?:')
sylkrtc_models = {model.sylkrtc.value: model for model in vars(sylkrtc).values() if hasattr(model, 'sylkrtc') and issubclass(model, sylkrtc.SylkRTCRequestBase)}
class AccountInfo(object):
def __init__(self, id, password, display_name=None, user_agent=None):
self.id = id
self.password = password
self.display_name = display_name
self.user_agent = user_agent
self.registration_state = None
self.janus_handle_id = None
@property
def uri(self):
return 'sip:%s' % self.id
@property
def user_data(self):
return dict(username=self.uri, display_name=self.display_name, user_agent=self.user_agent, ha1_secret=self.password)
class SessionPartyIdentity(object):
def __init__(self, uri, display_name=''):
self.uri = uri
self.display_name = display_name
# todo: might need to replace this auto-resetting descriptor with a timer in case we need to know when the slow link state expired
#
class SlowLinkState(object):
def __init__(self):
self.slow_link = False
self.last_reported = 0
class SlowLinkDescriptor(object):
__timeout__ = 30 # 30 seconds
def __init__(self):
self.values = defaultweakobjectmap(SlowLinkState)
def __get__(self, instance, owner):
if instance is None:
return self
state = self.values[instance]
if state.slow_link and time.time() - state.last_reported > self.__timeout__:
state.slow_link = False
return state.slow_link
def __set__(self, instance, value):
state = self.values[instance]
if value:
state.last_reported = time.time()
state.slow_link = bool(value)
def __delete__(self, instance):
raise AttributeError('Attribute cannot be deleted')
class SIPSessionInfo(object):
slow_download = SlowLinkDescriptor()
slow_upload = SlowLinkDescriptor()
def __init__(self, id):
self.id = id
self.direction = None
self.state = None
self.account_id = None
self.local_identity = None # instance of SessionPartyIdentity
self.remote_identity = None # instance of SessionPartyIdentity
self.janus_handle_id = None
self.slow_download = False
self.slow_upload = False
def init_outgoing(self, account_id, destination):
self.account_id = account_id
self.direction = 'outgoing'
self.state = 'connecting'
self.local_identity = SessionPartyIdentity(account_id)
self.remote_identity = SessionPartyIdentity(destination)
def init_incoming(self, account_id, originator, originator_display_name=''):
self.account_id = account_id
self.direction = 'incoming'
self.state = 'connecting'
self.local_identity = SessionPartyIdentity(account_id)
self.remote_identity = SessionPartyIdentity(originator, originator_display_name)
class VideoRoom(object):
def __init__(self, uri):
self.id = random.getrandbits(32) # janus needs numeric room names
self.uri = uri
self.config = get_room_config(uri)
self.log = VideoroomLogger(self)
self._active_participants = []
self._sessions = set()
self._id_map = {} # map session.id -> session and session.publisher_id -> session
if self.config.record:
makedirs(self.config.recording_dir, 0o755)
self.log.info('created (recording on)')
else:
self.log.info('created')
@property
def active_participants(self):
return self._active_participants
@active_participants.setter
def active_participants(self, participant_list):
unknown_participants = set(participant_list).difference(self._id_map)
if unknown_participants:
raise ValueError('unknown participant session id: {}'.format(', '.join(unknown_participants)))
if self._active_participants != participant_list:
self._active_participants = participant_list
self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None))
self._update_bitrate()
def add(self, session):
assert session not in self._sessions
assert session.publisher_id is not None
assert session.publisher_id not in self._id_map and session.id not in self._id_map
self._sessions.add(session)
self._id_map[session.id] = self._id_map[session.publisher_id] = session
self.log.info('{session.account_id} has joined the room'.format(session=session))
self._update_bitrate()
if self._active_participants:
session.owner.notify(sylkrtc.VideoRoomConfigurationEvent(session=session.id, active_participants=self._active_participants, originator='videoroom'))
def discard(self, session):
if session in self._sessions:
self._sessions.discard(session)
self._id_map.pop(session.id, None)
self._id_map.pop(session.publisher_id, None)
self.log.info('{session.account_id} has left the room'.format(session=session))
if session.id in self._active_participants:
self._active_participants.remove(session.id)
self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None))
for session in self._sessions:
session.owner.notify(sylkrtc.VideoRoomConfigurationEvent(session=session.id, active_participants=self._active_participants, originator='videoroom'))
self._update_bitrate()
def remove(self, session):
self._sessions.remove(session)
self._id_map.pop(session.id)
self._id_map.pop(session.publisher_id)
self.log.info('{session.account_id} has left the room'.format(session=session))
if session.id in self._active_participants:
self._active_participants.remove(session.id)
self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None))
for session in self._sessions:
session.owner.notify(sylkrtc.VideoRoomConfigurationEvent(session=session.id, active_participants=self._active_participants, originator='videoroom'))
self._update_bitrate()
def clear(self):
for session in self._sessions:
self.log.info('{session.account_id} has left the room'.format(session=session))
self._active_participants = []
self._sessions.clear()
self._id_map.clear()
def allow_uri(self, uri):
config = self.config
if config.access_policy == 'allow,deny':
return config.allow.match(uri) and not config.deny.match(uri)
else:
return not config.deny.match(uri) or config.allow.match(uri)
def _update_bitrate(self):
if self._sessions:
if self._active_participants:
# todo: should we use max_bitrate / 2 or max_bitrate for each active participant if there are 2 active participants?
active_participant_bitrate = self.config.max_bitrate // len(self._active_participants)
other_participant_bitrate = 100000
self.log.info('participant bitrate is {} (active) / {} (others)'.format(active_participant_bitrate, other_participant_bitrate))
for session in self._sessions:
if session.id in self._active_participants:
bitrate = active_participant_bitrate
else:
bitrate = other_participant_bitrate
if session.bitrate != bitrate:
session.bitrate = bitrate
data = dict(request='configure', room=self.id, bitrate=bitrate)
session.owner.protocol.backend.janus_message(session.owner.janus_session_id, session.janus_handle_id, data)
else:
bitrate = self.config.max_bitrate // limit(len(self._sessions) - 1, min=1)
self.log.info('participant bitrate is {}'.format(bitrate))
for session in self._sessions:
if session.bitrate != bitrate:
session.bitrate = bitrate
data = dict(request='configure', room=self.id, bitrate=bitrate)
session.owner.protocol.backend.janus_message(session.owner.janus_session_id, session.janus_handle_id, data)
# todo: make VideoRoom be a context manager that is retained/released on enter/exit and implement __nonzero__ to be different from __len__
# todo: so that a videoroom is not accidentally released by the last participant leaving while a new participant waits to join
# todo: this needs a new model for communication with janus and the client that is pseudo-synchronous (uses green threads)
def __len__(self):
return len(self._sessions)
def __iter__(self):
return iter(self._sessions)
def __getitem__(self, key):
return self._id_map[key]
def __contains__(self, item):
return item in self._id_map or item in self._sessions
class PublisherFeedContainer(object):
"""A container for the other participant's publisher sessions that we have subscribed to"""
def __init__(self):
self._publishers = set()
self._id_map = {} # map publisher.id -> publisher and publisher.publisher_id -> publisher
def add(self, session):
assert session not in self._publishers
assert session.id not in self._id_map and session.publisher_id not in self._id_map
self._publishers.add(session)
self._id_map[session.id] = self._id_map[session.publisher_id] = session
def discard(self, item): # item can be any of session, session.id or session.publisher_id
session = self._id_map[item] if item in self._id_map else item if item in self._publishers else None
if session is not None:
self._publishers.discard(session)
self._id_map.pop(session.id, None)
self._id_map.pop(session.publisher_id, None)
def remove(self, item): # item can be any of session, session.id or session.publisher_id
session = self._id_map[item] if item in self._id_map else item
self._publishers.remove(session)
self._id_map.pop(session.id)
self._id_map.pop(session.publisher_id)
def pop(self, item): # item can be any of session, session.id or session.publisher_id
session = self._id_map[item] if item in self._id_map else item
self._publishers.remove(session)
self._id_map.pop(session.id)
self._id_map.pop(session.publisher_id)
return session
def clear(self):
self._publishers.clear()
self._id_map.clear()
def __len__(self):
return len(self._publishers)
def __iter__(self):
return iter(self._publishers)
def __getitem__(self, key):
return self._id_map[key]
def __contains__(self, item):
return item in self._id_map or item in self._publishers
class VideoRoomSessionInfo(object):
slow_download = SlowLinkDescriptor()
slow_upload = SlowLinkDescriptor()
def __init__(self, id, owner):
self.id = id
self.owner = owner
self.account_id = None
self.type = None # publisher / subscriber
self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers
self.janus_handle_id = None
self.room = None
self.bitrate = None
self.parent_session = None
self.slow_download = False
self.slow_upload = False
self.feeds = PublisherFeedContainer() # keeps references to all the other participant's publisher feeds that we subscribed to
def initialize(self, account_id, type, room):
assert type in ('publisher', 'subscriber')
self.account_id = account_id
self.type = type
self.room = room
self.bitrate = room.config.max_bitrate
def __repr__(self):
return '<%s: id=%s janus_handle_id=%s type=%s>' % (self.__class__.__name__, self.id, self.janus_handle_id, self.type)
class SessionContainer(object):
def __init__(self):
self._sessions = set()
self._id_map = {} # map session.id -> session and session.janus_handle_id -> session
def add(self, session):
assert session not in self._sessions
assert session.id not in self._id_map and session.janus_handle_id not in self._id_map
self._sessions.add(session)
self._id_map[session.id] = self._id_map[session.janus_handle_id] = session
def discard(self, item): # item can be any of session, session.id or session.janus_handle_id
session = self._id_map[item] if item in self._id_map else item if item in self._sessions else None
if session is not None:
self._sessions.discard(session)
self._id_map.pop(session.id, None)
self._id_map.pop(session.janus_handle_id, None)
def remove(self, item): # item can be any of session, session.id or session.janus_handle_id
session = self._id_map[item] if item in self._id_map else item
self._sessions.remove(session)
self._id_map.pop(session.id)
self._id_map.pop(session.janus_handle_id)
def pop(self, item): # item can be any of session, session.id or session.janus_handle_id
session = self._id_map[item] if item in self._id_map else item
self._sessions.remove(session)
self._id_map.pop(session.id)
self._id_map.pop(session.janus_handle_id)
return session
def clear(self):
self._sessions.clear()
self._id_map.clear()
def __len__(self):
return len(self._sessions)
def __iter__(self):
return iter(self._sessions)
def __getitem__(self, key):
return self._id_map[key]
def __contains__(self, item):
return item in self._id_map or item in self._sessions
+class OperationName(str):
+ __normalizer__ = maketrans('-', '_')
+
+ @property
+ def normalized(self):
+ return self.translate(self.__normalizer__)
+
+
class Operation(object):
- __slots__ = 'name', 'data'
+ __slots__ = 'type', 'name', 'data'
+ __types__ = 'Request', 'Event'
- def __init__(self, name, data):
- self.name = name
+ # noinspection PyShadowingBuiltins
+ def __init__(self, type, name, data):
+ if type not in self.__types__:
+ raise ValueError("Can't instantiate class {.__class__.__name__} with unknown type: {!r}".format(self, type))
+ self.type = type
+ self.name = OperationName(name)
self.data = data
class APIError(Exception):
pass
class ConnectionHandler(object):
def __init__(self, protocol):
self.protocol = protocol
self.device_id = hashlib.md5(protocol.peer).digest().encode('base64').rstrip('=\n')
self.janus_session_id = None
self.accounts_map = {} # account ID -> account
self.account_handles_map = {} # Janus handle ID -> account
self.sip_sessions = SessionContainer() # keeps references to all the SIP sessions created or received by this device
self.videoroom_sessions = SessionContainer() # keeps references to all the videoroom sessions created by this participant (as publisher and subscriber)
self.ready_event = GreenEvent()
self.resolver = DNSLookup()
self.proc = proc.spawn(self._operations_handler)
self.operations_queue = coros.queue()
self.log = ConnectionLogger(self)
def start(self):
self._create_janus_session()
def stop(self):
if self.proc is not None: # Kill the operation's handler proc first, in order to not have any operations active while we cleanup.
self.proc.kill() # Also proc.kill() will switch to another green thread, which is another reason to do it first so that
self.proc = None # we do not switch to another green thread in the middle of the cleanup with a partially deleted handler
if self.ready_event.is_set():
assert self.janus_session_id is not None
for account_info in self.accounts_map.values():
handle_id = account_info.janus_handle_id
if handle_id is not None:
self.protocol.backend.janus_detach(self.janus_session_id, handle_id)
self.protocol.backend.janus_set_event_handler(handle_id, None)
for session in self.sip_sessions:
handle_id = session.janus_handle_id
if handle_id is not None:
self.protocol.backend.janus_detach(self.janus_session_id, handle_id)
self.protocol.backend.janus_set_event_handler(handle_id, None)
for session in self.videoroom_sessions:
handle_id = session.janus_handle_id
if handle_id is not None:
self.protocol.backend.janus_detach(self.janus_session_id, handle_id)
self.protocol.backend.janus_set_event_handler(handle_id, None)
session.room.discard(session)
session.feeds.clear()
self.protocol.backend.janus_stop_keepalive(self.janus_session_id)
self.protocol.backend.janus_destroy_session(self.janus_session_id)
# cleanup
self.ready_event.clear()
self.accounts_map.clear()
self.account_handles_map.clear()
self.sip_sessions.clear()
self.videoroom_sessions.clear()
self.janus_session_id = None
self.protocol = None
def handle_message(self, data):
try:
request_type = data.pop('sylkrtc')
except KeyError:
self.log.error('could not get WebSocket message type')
return
self.ready_event.wait()
try:
model = sylkrtc_models[request_type]
except KeyError:
self.log.error('unknown WebSocket request type: %s' % request_type)
return
try:
request = model(**data)
request.validate()
except Exception as e:
self.log.error('%s: %s' % (request_type, e))
transaction = data.get('transaction') # we cannot rely on request.transaction as request may not have been created
if transaction:
self._send_response(sylkrtc.ErrorResponse(transaction=transaction, error=str(e)))
return
- op = Operation(request_type, request)
- self.operations_queue.send(op)
+ operation = Operation(type='Request', name=request_type, data=request)
+ self.operations_queue.send(operation)
def handle_conference_invite(self, originator, room, invited_uris):
for account_id in set(self.accounts_map).intersection(invited_uris):
data = dict(sylkrtc='account_event', event='conference_invite', account=account_id, data=dict(originator=dict(uri=originator.id, display_name=originator.display_name), room=room.uri))
room.log.info('invitation from %s for %s', originator.id, account_id)
self.log.info('received an invitation from %s for %s to join video room %s', originator.id, account_id, room.uri)
self._send_data(json.dumps(data))
def notify(self, event):
event.validate()
self._send_data(json.dumps(event.to_struct()))
# internal methods (not overriding / implementing the protocol API)
def _send_response(self, response):
response.validate()
self._send_data(json.dumps(response.to_struct()))
def _send_data(self, data):
self.protocol.sendMessage(data)
def _cleanup_session(self, session):
# should only be called from a green thread.
if self.janus_session_id is None: # The connection was closed, there is noting to do
return
if session in self.sip_sessions:
self.sip_sessions.remove(session)
if session.direction == 'outgoing':
# Destroy plugin handle for outgoing sessions. For incoming ones it's the same as the account handle, so don't
block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None)
def _cleanup_videoroom_session(self, session):
# should only be called from a green thread.
if self.janus_session_id is None: # The connection was closed, there is noting to do
return
if session in self.videoroom_sessions:
self.videoroom_sessions.remove(session)
if session.type == 'publisher':
session.room.discard(session)
session.feeds.clear()
block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None)
self._maybe_destroy_videoroom(session.room)
else:
session.parent_session.feeds.discard(session.publisher_id)
block_on(self.protocol.backend.janus_detach(self.janus_session_id, session.janus_handle_id))
self.protocol.backend.janus_set_event_handler(session.janus_handle_id, None)
def _maybe_destroy_videoroom(self, videoroom):
# should only be called from a green thread.
if self.protocol is None or self.janus_session_id is None: # The connection was closed, there is nothing to do
return
if videoroom in self.protocol.factory.videorooms and not videoroom:
self.protocol.factory.videorooms.remove(videoroom)
# create a handle to do the cleanup
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
data = dict(request='destroy', room=videoroom.id)
try:
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except JanusError:
pass
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
videoroom.log.info('destroyed')
@run_in_green_thread
def _create_janus_session(self):
if self.ready_event.is_set():
self._send_response(sylkrtc.ReadyEvent())
return
try:
self.janus_session_id = block_on(self.protocol.backend.janus_create_session())
self.protocol.backend.janus_start_keepalive(self.janus_session_id)
except Exception as e:
self.log.warning('could not create session, disconnecting: %s' % e)
self.protocol.disconnect(3000, unicode(e))
return
self._send_response(sylkrtc.ReadyEvent())
self.ready_event.set()
def _lookup_sip_proxy(self, uri):
# The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed
# as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use
# caching to avoid long delays, we randomize the results matching the highest priority route's
# transport.
proxy = GeneralConfig.outbound_sip_proxy
if proxy is not None:
sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport})
else:
sip_uri = SIPURI.parse('sip:%s' % uri)
settings = SIPSimpleSettings()
try:
routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait()
except DNSLookupError as e:
raise DNSLookupError('DNS lookup error: {exception!s}'.format(exception=e))
if not routes:
raise DNSLookupError('DNS lookup error: no results found')
route = random.choice([r for r in routes if r.transport == routes[0].transport])
self.log.debug('DNS lookup for SIP proxy for {} yielded {}'.format(uri, route))
# Build a proxy URI Sofia-SIP likes
return 'sips:{route.address}:{route.port}'.format(route=route) if route.transport == 'tls' else str(route.uri)
def _handle_janus_event_sip(self, handle_id, event_type, event):
# TODO: use a model
self.log.debug('janus SIP event: type={event_type} handle_id={handle_id} event={event}'.format(event_type=event_type, handle_id=handle_id, event=event))
- op = Operation('janus-event-sip', data=dict(handle_id=handle_id, event_type=event_type, event=event))
- self.operations_queue.send(op)
+ operation = Operation(type='Event', name='janus-event-sip', data=dict(handle_id=handle_id, event_type=event_type, event=event))
+ self.operations_queue.send(operation)
def _handle_janus_event_videoroom(self, handle_id, event_type, event):
# TODO: use a model
self.log.debug('janus video room event: type={event_type} handle_id={handle_id} event={event}'.format(event_type=event_type, handle_id=handle_id, event=event))
- op = Operation('janus-event-videoroom', data=dict(handle_id=handle_id, event_type=event_type, event=event))
- self.operations_queue.send(op)
+ operation = Operation(type='Event', name='janus-event-videoroom', data=dict(handle_id=handle_id, event_type=event_type, event=event))
+ self.operations_queue.send(operation)
def _operations_handler(self):
while True:
- op = self.operations_queue.wait()
- handler = getattr(self, '_OH_%s' % op.name.replace('-', '_'), Null)
+ operation = self.operations_queue.wait()
+ handler = getattr(self, '_OH_' + operation.type)
try:
- handler(op.data)
+ handler(operation)
except Exception as e:
- self.log.exception('unhandled exception in operation {operation.name!r}: {exception!s}'.format(operation=op, exception=e))
- del op, handler
+ self.log.exception('unhandled exception in operation {operation.type} {operation.name}: {exception!s}'.format(operation=operation, exception=e))
+ del operation, handler
+
+ def _OH_Request(self, operation):
+ handler = getattr(self, '_RH_' + operation.name.normalized)
+ handler(operation.data)
+
+ def _OH_Event(self, operation):
+ handler = getattr(self, '_EH_' + operation.name.normalized)
+ handler(operation.data)
+
+ # Request handlers
- def _OH_account_add(self, request):
+ def _RH_account_add(self, request):
try:
if request.account in self.accounts_map:
raise APIError('Account {request.account} already added'.format(request=request))
# check if domain is acceptable
domain = request.account.partition('@')[2]
if not {'*', domain}.intersection(GeneralConfig.sip_domains):
raise APIError('SIP domain not allowed: %s' % domain)
# Create and store our mapping
account_info = AccountInfo(request.account, request.password, request.display_name, request.user_agent)
self.accounts_map[account_info.id] = account_info
except APIError as e:
self.log.error('account-add: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self.log.info('added account {request.account} using {request.user_agent}'.format(request=request))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_account_remove(self, request):
+ def _RH_account_remove(self, request):
try:
try:
account_info = self.accounts_map.pop(request.account)
except KeyError:
raise APIError('Unknown account specified: {request.account}'.format(request=request))
# cleanup in case the client didn't unregister before removing the account
handle_id = account_info.janus_handle_id
if handle_id is not None:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
self.account_handles_map.pop(handle_id)
except (APIError, JanusError) as e:
self.log.error('account-remove: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self.log.info('removed account {request.account}'.format(request=request))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_account_register(self, request):
+ def _RH_account_register(self, request):
try:
try:
account_info = self.accounts_map[request.account]
except KeyError:
raise APIError('Unknown account specified: {request.account}'.format(request=request))
proxy = self._lookup_sip_proxy(request.account)
handle_id = account_info.janus_handle_id
if handle_id is not None:
# Destroy the existing plugin handle
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
self.account_handles_map.pop(handle_id)
account_info.janus_handle_id = None
# Create a plugin handle
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip)
account_info.janus_handle_id = handle_id
self.account_handles_map[handle_id] = account_info
data = dict(request='register', proxy=proxy, **account_info.user_data)
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except (APIError, DNSLookupError, JanusError) as e:
self.log.error('account-register: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_account_unregister(self, request):
+ def _RH_account_unregister(self, request):
try:
try:
account_info = self.accounts_map[request.account]
except KeyError:
raise APIError('Unknown account specified: {request.account}'.format(request=request))
handle_id = account_info.janus_handle_id
if handle_id is not None:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
account_info.janus_handle_id = None
self.account_handles_map.pop(handle_id)
except (APIError, JanusError) as e:
self.log.error('account-unregister: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self.log.info('unregistered {request.account} from receiving incoming calls'.format(request=request))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_account_devicetoken(self, request):
+ def _RH_account_devicetoken(self, request):
try:
if request.account not in self.accounts_map:
raise APIError('Unknown account specified: {request.account}'.format(request=request))
storage = TokenStorage()
if request.old_token is not None:
storage.remove(request.account, request.old_token)
self.log.debug('removed token {request.old_token} for {request.account}'.format(request=request))
if request.new_token is not None:
storage.add(request.account, request.new_token)
self.log.debug('added token {request.new_token} for {request.account}'.format(request=request))
except APIError as e:
self.log.error('account-devicetoken: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_session_create(self, request):
+ def _RH_session_create(self, request):
handle_id = None
try:
if request.session in self.sip_sessions:
raise APIError('Session ID {request.session} already in use'.format(request=request))
try:
account_info = self.accounts_map[request.account]
except KeyError:
raise APIError('Unknown account specified: {request.account}'.format(request=request))
proxy = self._lookup_sip_proxy(request.uri)
# Create a new plugin handle and 'register' it, without actually doing so
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.sip'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_sip)
data = dict(request='register', send_register=False, proxy=proxy, **account_info.user_data)
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
data = dict(request='call', uri='sip:%s' % SIP_PREFIX_RE.sub('', request.uri), srtp='sdes_optional')
jsep = dict(type='offer', sdp=request.sdp)
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep))
session_info = SIPSessionInfo(request.session)
session_info.janus_handle_id = handle_id
session_info.init_outgoing(request.account, request.uri)
self.sip_sessions.add(session_info)
except (APIError, DNSLookupError, JanusError) as e:
self.log.error('session-create: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
if handle_id is not None:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
else:
self.log.info('created outgoing session {request.session} from {request.account} to {request.uri}'.format(request=request))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_session_answer(self, request):
+ def _RH_session_answer(self, request):
try:
try:
session_info = self.sip_sessions[request.session]
except KeyError:
raise APIError('Unknown session specified: {request.session}'.format(request=request))
if session_info.direction != 'incoming':
raise APIError('Cannot answer outgoing session {request.session}'.format(request=request))
if session_info.state != 'connecting':
raise APIError('Invalid state for answering session {session.id}: {session.state}'.format(session=session_info))
data = dict(request='accept')
jsep = dict(type='answer', sdp=request.sdp)
block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data, jsep))
except (APIError, JanusError) as e:
self.log.error('session-answer: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self.log.info('answered incoming session {session.id}'.format(session=session_info))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_session_trickle(self, request):
+ def _RH_session_trickle(self, request):
try:
try:
session_info = self.sip_sessions[request.session]
except KeyError:
raise APIError('Unknown session specified: {request.session}'.format(request=request))
if session_info.state == 'terminated':
raise APIError('Session {request.session} is terminated'.format(request=request))
candidates = [c.to_struct() for c in request.candidates]
block_on(self.protocol.backend.janus_trickle(self.janus_session_id, session_info.janus_handle_id, candidates))
except (APIError, JanusError) as e:
self.log.error('session-trickle: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
if not candidates:
self.log.debug('session {session.id} negotiated ICE'.format(session=session_info))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_session_terminate(self, request):
+ def _RH_session_terminate(self, request):
try:
try:
session_info = self.sip_sessions[request.session]
except KeyError:
raise APIError('Unknown session specified: {request.session}'.format(request=request))
if session_info.state not in ('connecting', 'progress', 'accepted', 'established'):
raise APIError('Invalid state for terminating session {session.id}: {session.state}'.format(session=session_info))
if session_info.direction == 'incoming' and session_info.state == 'connecting':
data = dict(request='decline', code=486)
else:
data = dict(request='hangup')
block_on(self.protocol.backend.janus_message(self.janus_session_id, session_info.janus_handle_id, data))
except (APIError, JanusError) as e:
self.log.error('session-terminate: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self.log.info('requested termination for session {session.id}'.format(session=session_info))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_videoroom_join(self, request):
+ def _RH_videoroom_join(self, request):
videoroom = None
handle_id = None
try:
if request.session in self.videoroom_sessions:
raise APIError('Session ID {request.session} already in use'.format(request=request))
try:
account_info = self.accounts_map[request.account]
except KeyError:
raise APIError('Unknown account specified: {request.account}'.format(request=request))
try:
videoroom = self.protocol.factory.videorooms[request.uri]
except KeyError:
videoroom = VideoRoom(request.uri)
self.protocol.factory.videorooms.add(videoroom)
if not videoroom.allow_uri(request.account):
raise APIError('{request.account} is not allowed to join room {request.uri}'.format(request=request))
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
# create the room if it doesn't exist
config = videoroom.config
data = dict(request='create', room=videoroom.id, publishers=10, bitrate=config.max_bitrate, videocodec=config.video_codec, record=config.record, rec_dir=config.recording_dir)
try:
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except JanusError as e:
if e.code != 427: # 427 means room already exists
raise
# join the room
data = dict(request='joinandconfigure', room=videoroom.id, ptype='publisher', audio=True, video=True)
if account_info.display_name:
data.update(display=account_info.display_name)
jsep = dict(type='offer', sdp=request.sdp)
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data, jsep))
videoroom_session = VideoRoomSessionInfo(request.session, owner=self)
videoroom_session.janus_handle_id = handle_id
videoroom_session.initialize(request.account, 'publisher', videoroom)
self.videoroom_sessions.add(videoroom_session)
except (APIError, JanusError) as e:
self.log.error('videoroom-join: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
if handle_id is not None:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
self.protocol.backend.janus_set_event_handler(handle_id, None)
self._maybe_destroy_videoroom(videoroom)
else:
self.log.info('created session {request.session} from account {request.account} to video room {request.uri}'.format(request=request))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='progress'))
self._send_data(json.dumps(data))
- def _OH_videoroom_ctl(self, request):
+ def _RH_videoroom_ctl(self, request):
try:
option_name = request.option.replace('-', '_')
if getattr(request, option_name) is None:
raise APIError('missing {!r} field in request'.format(option_name))
- sub_handler = getattr(self, '_OH_videoroom_ctl_{}'.format(option_name))
+ sub_handler = getattr(self, '_RH_videoroom_ctl_{}'.format(option_name))
sub_handler(request)
except (APIError, JanusError) as e:
self.log.error('videoroom-ctl: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
- def _OH_videoroom_ctl_configure_room(self, request):
+ def _RH_videoroom_ctl_configure_room(self, request):
try:
videoroom_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('configure-room: unknown video room session: {request.session}'.format(request=request))
videoroom = videoroom_session.room
# todo: should we send out events if the active participant list did not change?
try:
videoroom.active_participants = request.configure_room.active_participants
except ValueError as e:
raise APIError('configure-room: {exception!s}'.format(exception=e))
for session in videoroom:
session.owner.notify(sylkrtc.VideoRoomConfigurationEvent(session=session.id, active_participants=videoroom.active_participants, originator=request.session))
- def _OH_videoroom_ctl_feed_attach(self, request):
+ def _RH_videoroom_ctl_feed_attach(self, request):
if request.feed_attach.session in self.videoroom_sessions:
raise APIError('feed-attach: video room session ID {request.feed_attach.session} already in use'.format(request=request))
# get the 'base' session, the one used to join and publish
try:
base_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('feed-attach: unknown video room session: {request.session}'.format(request=request))
# get the publisher's session
try:
publisher_session = base_session.room[request.feed_attach.publisher]
except KeyError:
raise APIError('feed-attach: unknown publisher video room session to attach to: {request.feed_attach.publisher}'.format(request=request))
if publisher_session.publisher_id is None:
raise APIError('feed-attach: video room session {session.id} does not have a publisher ID'.format(session=publisher_session))
handle_id = block_on(self.protocol.backend.janus_attach(self.janus_session_id, 'janus.plugin.videoroom'))
self.protocol.backend.janus_set_event_handler(handle_id, self._handle_janus_event_videoroom)
# join the room as a listener
try:
data = dict(request='join', room=base_session.room.id, ptype='listener', feed=publisher_session.publisher_id)
block_on(self.protocol.backend.janus_message(self.janus_session_id, handle_id, data))
except JanusError:
try:
block_on(self.protocol.backend.janus_detach(self.janus_session_id, handle_id))
except JanusError:
pass
self.protocol.backend.janus_set_event_handler(handle_id, None)
raise
videoroom_session = VideoRoomSessionInfo(request.feed_attach.session, owner=self)
videoroom_session.janus_handle_id = handle_id
videoroom_session.parent_session = base_session
videoroom_session.publisher_id = publisher_session.id
videoroom_session.initialize(base_session.account_id, 'subscriber', base_session.room)
self.videoroom_sessions.add(videoroom_session)
base_session.feeds.add(publisher_session)
- def _OH_videoroom_ctl_feed_answer(self, request):
+ def _RH_videoroom_ctl_feed_answer(self, request):
try:
videoroom_session = self.videoroom_sessions[request.feed_answer.session]
except KeyError:
raise APIError('feed-answer: unknown video room session: {request.feed_answer.session}'.format(request=request))
data = dict(request='start', room=videoroom_session.room.id)
jsep = dict(type='answer', sdp=request.feed_answer.sdp)
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data, jsep))
- def _OH_videoroom_ctl_feed_detach(self, request):
+ def _RH_videoroom_ctl_feed_detach(self, request):
try:
videoroom_session = self.videoroom_sessions[request.feed_detach.session]
except KeyError:
raise APIError('feed-detach: unknown video room session to detach: {request.feed_detach.session}'.format(request=request))
if videoroom_session.parent_session.id != request.session:
raise APIError('feed-detach: {request.feed_detach.session} is not an attached feed of {request.session}'.format(request=request))
data = dict(request='leave')
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data))
self._cleanup_videoroom_session(videoroom_session)
- def _OH_videoroom_ctl_invite_participants(self, request):
+ def _RH_videoroom_ctl_invite_participants(self, request):
try:
base_session = self.videoroom_sessions[request.session]
account_info = self.accounts_map[base_session.account_id]
except KeyError:
raise APIError('invite-participants: unknown video room session: {request.session}'.format(request=request))
for protocol in self.protocol.factory.connections.difference([self.protocol]):
protocol.connection_handler.handle_conference_invite(account_info, base_session.room, request.invite_participants.participants)
- def _OH_videoroom_ctl_trickle(self, request):
+ def _RH_videoroom_ctl_trickle(self, request):
session = request.trickle.session or request.session
try:
videoroom_session = self.videoroom_sessions[session]
except KeyError:
raise APIError('trickle: unknown video room session: {session}'.format(session=session))
candidates = [c.to_struct() for c in request.trickle.candidates]
block_on(self.protocol.backend.janus_trickle(self.janus_session_id, videoroom_session.janus_handle_id, candidates))
if not candidates and videoroom_session.type == 'publisher':
self.log.debug('video room session {session.id} negotiated ICE'.format(session=videoroom_session))
- def _OH_videoroom_ctl_update(self, request):
+ def _RH_videoroom_ctl_update(self, request):
try:
videoroom_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('update: unknown video room session: {request.session}'.format(request=request))
update_data = request.update.to_struct()
if update_data:
data = dict(request='configure', room=videoroom_session.room.id, **update_data)
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data))
modified = ', '.join('{}={}'.format(key, update_data[key]) for key in update_data)
self.log.info('updated video room session {request.session} with {modified}'.format(request=request, modified=modified))
- def _OH_videoroom_terminate(self, request):
+ def _RH_videoroom_terminate(self, request):
try:
try:
videoroom_session = self.videoroom_sessions[request.session]
except KeyError:
raise APIError('Unknown video room session: {request.session}'.format(request=request))
data = dict(request='leave')
block_on(self.protocol.backend.janus_message(self.janus_session_id, videoroom_session.janus_handle_id, data))
except (APIError, JanusError) as e:
self.log.error('videoroom-terminate: {exception!s}'.format(exception=e))
self._send_response(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e)))
else:
self.log.info('requesting termination for video room session {request.session}'.format(request=request))
self._send_response(sylkrtc.AckResponse(transaction=request.transaction))
data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='terminated'))
self._send_data(json.dumps(data))
# safety net in case we do not get any answer for the leave request
# todo: to be adjusted later after pseudo-synchronous communication with janus is implemented
reactor.callLater(2, call_in_green_thread, self._cleanup_videoroom_session, videoroom_session)
# Event handlers
- def _OH_janus_event_sip(self, data):
+ def _EH_janus_event_sip(self, data):
handle_id = data['handle_id']
event_type = data['event_type']
event = data['event']
if event_type == 'event':
self._janus_event_plugin_sip(data)
elif event_type == 'webrtcup':
try:
session_info = self.sip_sessions[handle_id]
except KeyError:
self.log.warning('could not find session for handle ID %s' % handle_id)
return
session_info.state = 'established'
data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state))
self._send_data(json.dumps(data)) # TODO: SessionEvent model
self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info))
self.log.info('established WEBRTC connection for session {session.id}'.format(session=session_info))
elif event_type == 'hangup':
try:
session_info = self.sip_sessions[handle_id]
except KeyError:
return
if session_info.state != 'terminated':
session_info.state = 'terminated'
self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info))
reason = event.get('reason', 'reason unspecified')
data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state, reason=reason))
# TODO: SessionEvent model
self._send_data(json.dumps(data))
self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason))
self._cleanup_session(session_info)
elif event_type in ('media', 'detached'):
# ignore
pass
elif event_type == 'slowlink':
try:
session_info = self.sip_sessions[handle_id]
except KeyError:
self.log.warning('could not find session for handle ID %s' % handle_id)
return
try:
uplink = data['event']['uplink']
except KeyError:
self.log.warning('could not find uplink in slowlink event data')
return
if uplink: # uplink is from janus' point of view
if not session_info.slow_download:
self.log.info('poor download connectivity for session {session.id}'.format(session=session_info))
session_info.slow_download = True
else:
if not session_info.slow_upload:
self.log.info('poor upload connectivity for session {session.id}'.format(session=session_info))
session_info.slow_upload = True
else:
self.log.warning('received unexpected event type: %s' % event_type)
def _janus_event_plugin_sip(self, data):
handle_id = data['handle_id']
event = data['event']
plugin_data = event['plugindata']
assert plugin_data['plugin'] == 'janus.plugin.sip'
event_data = plugin_data['data']
assert event_data.get('sip') == 'event'
if 'result' not in event_data:
self.log.warning('unexpected event: %s' % event)
return
event_data = event_data['result']
jsep = event.get('jsep', None)
event_type = event_data['event']
if event_type in ('registering', 'registered', 'registration_failed', 'incomingcall'):
# skip 'registered' events from session handles
if event_type == 'registered' and event_data['register_sent'] in (False, 'false'):
return
# account event
try:
account_info = self.account_handles_map[handle_id]
except KeyError:
self.log.warning('could not find account for handle ID %s' % handle_id)
return
if event_type == 'incomingcall':
originator_uri = SIP_PREFIX_RE.sub('', event_data['username'])
originator_display_name = event_data.get('displayname', '').replace('"', '')
jsep = event.get('jsep', None)
assert jsep is not None
session_id = uuid.uuid4().hex
session = SIPSessionInfo(session_id)
session.janus_handle_id = handle_id
session.init_incoming(account_info.id, originator_uri, originator_display_name)
self.sip_sessions.add(session)
data = dict(sylkrtc='account_event', account=account_info.id, session=session_id, event='incoming_session', data=dict(originator=session.remote_identity.__dict__, sdp=jsep['sdp']))
self.log.info('received incoming session {session.id} from {session.remote_identity.uri!s} to {session.local_identity.uri!s}'.format(session=session))
else:
registration_state = event_type
if registration_state == 'registration_failed':
registration_state = 'failed'
if account_info.registration_state == registration_state:
return
account_info.registration_state = registration_state
registration_data = dict(state=registration_state)
if registration_state == 'failed':
registration_data['reason'] = reason = '{0[code]} {0[reason]}'.format(event_data)
self.log.info('registration for {account.id} failed: {reason}'.format(account=account_info, reason=reason))
elif registration_state == 'registered':
self.log.info('registered {account.id} to receive incoming calls'.format(account=account_info))
data = dict(sylkrtc='account_event', account=account_info.id, event='registration_state', data=registration_data)
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type in ('calling', 'accepted'):
# session event
try:
session_info = self.sip_sessions[handle_id]
except KeyError:
self.log.warning('could not find session for handle ID %s' % handle_id)
return
state_map = dict(calling='progress', accepted='accepted')
session_info.state = state_map[event_type]
self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info))
data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state))
if session_info.state == 'accepted' and session_info.direction == 'outgoing':
assert jsep is not None
data['data']['sdp'] = jsep['sdp']
self._send_data(json.dumps(data)) # TODO: SessionEvent model
elif event_type == 'hangup': # session hangup event
try:
session_info = self.sip_sessions[handle_id]
except KeyError:
return
if session_info.state != 'terminated':
session_info.state = 'terminated'
self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info))
code = event_data.get('code', 0)
reason = '%d %s' % (code, event_data.get('reason', 'Unknown'))
data = dict(sylkrtc='session_event', session=session_info.id, event='state', data=dict(state=session_info.state, reason=reason))
self._send_data(json.dumps(data)) # TODO: SessionEvent model
if session_info.direction == 'incoming' and code == 487: # check if missed incoming call
data = dict(sylkrtc='account_event', account=session_info.account_id, event='missed_session', data=dict(originator=session_info.remote_identity.__dict__))
self._send_data(json.dumps(data)) # TODO: AccountEvent model
if code >= 300:
self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason))
else:
self.log.info('{session.direction} session {session.id} terminated'.format(session=session_info))
self._cleanup_session(session_info)
elif event_type == 'missed_call':
try:
account_info = self.account_handles_map[handle_id]
except KeyError:
self.log.warning('could not find account for handle ID %s' % handle_id)
return
originator_uri = SIP_PREFIX_RE.sub('', event_data['caller'])
originator_display_name = event_data.get('displayname', '').replace('"', '')
# We have no session, so create an identity object by hand
originator = SessionPartyIdentity(originator_uri, originator_display_name)
data = dict(sylkrtc='account_event', account=account_info.id, event='missed_session', data=dict(originator=originator.__dict__))
self.log.info('missed incoming call from {originator.uri}'.format(originator=originator))
# TODO: AccountEvent model
self._send_data(json.dumps(data))
elif event_type in ('ack', 'declining', 'hangingup', 'proceeding'):
pass # ignore
else:
self.log.warning('unexpected SIP plugin event type: %s' % event_type)
- def _OH_janus_event_videoroom(self, data):
+ def _EH_janus_event_videoroom(self, data):
handle_id = data['handle_id']
event_type = data['event_type']
if event_type == 'event':
self._janus_event_plugin_videoroom(data)
elif event_type == 'webrtcup':
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
self.log.warning('could not find video room session for handle ID %s during webrtcup event' % handle_id)
return
if videoroom_session.type == 'publisher':
self.log.info('established WEBRTC connection for session {session.id}'.format(session=videoroom_session))
data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='established'))
else:
data = dict(sylkrtc='videoroom_event', session=videoroom_session.parent_session.id, event='feed_established', data=dict(state='established', subscription=videoroom_session.id))
self._send_data(json.dumps(data))
elif event_type == 'hangup':
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
return
self._cleanup_videoroom_session(videoroom_session)
elif event_type in ('media', 'detached'):
pass
elif event_type == 'slowlink':
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
self.log.warning('could not find video room session for handle ID %s during slowlink event' % handle_id)
return
try:
uplink = data['event']['uplink']
except KeyError:
self.log.error('could not find uplink in slowlink event data')
return
if uplink: # uplink is from janus' point of view
if not videoroom_session.slow_download:
self.log.info('poor download connectivity to video room {session.room.uri} with session {session.id}'.format(session=videoroom_session))
videoroom_session.slow_download = True
else:
if not videoroom_session.slow_upload:
self.log.info('poor upload connectivity to video room {session.room.uri} with session {session.id}'.format(session=videoroom_session))
videoroom_session.slow_upload = True
else:
self.log.warning('received unexpected event type %s: data=%s' % (event_type, data))
def _janus_event_plugin_videoroom(self, data):
handle_id = data['handle_id']
event = data['event']
plugin_data = event['plugindata']
assert(plugin_data['plugin'] == 'janus.plugin.videoroom')
event_data = event['plugindata']['data']
assert 'videoroom' in event_data
event_type = event_data['videoroom']
if event_type == 'joined':
# a join request succeeded, this is a publisher
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
self.log.warning('could not find video room session for handle ID %s during joined event' % handle_id)
return
self.log.info('joined video room {session.room.uri} with session {session.id}'.format(session=videoroom_session))
videoroom_session.publisher_id = event_data['id']
room = videoroom_session.room
jsep = event.get('jsep', None)
assert jsep is not None
data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='state', data=dict(state='accepted', sdp=jsep['sdp']))
self._send_data(json.dumps(data))
# send information about existing publishers
publishers = []
for publisher in event_data['publishers']:
publisher_id = publisher['id']
try:
publisher_session = room[publisher_id]
except KeyError:
self.log.warning('could not find matching session for publisher %s during joined event' % publisher_id)
continue
publishers.append(dict(id=publisher_session.id, uri=publisher_session.account_id, display_name=publisher.get('display', '')))
data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='initial_publishers', data=dict(publishers=publishers))
self._send_data(json.dumps(data))
room.add(videoroom_session) # adding the session to the room might also trigger sending an event with the active participants which must be sent last
elif event_type == 'event':
if 'publishers' in event_data:
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
self.log.warning('could not find video room session for handle ID %s during publishers event' % handle_id)
return
room = videoroom_session.room
# send information about new publishers
publishers = []
for publisher in event_data['publishers']:
publisher_id = publisher['id']
try:
publisher_session = room[publisher_id]
except KeyError:
self.log.warning('could not find matching session for publisher %s during publishers event' % publisher_id)
continue
publishers.append(dict(id=publisher_session.id, uri=publisher_session.account_id, display_name=publisher.get('display', '')))
data = dict(sylkrtc='videoroom_event', session=videoroom_session.id, event='publishers_joined', data=dict(publishers=publishers))
self._send_data(json.dumps(data))
elif 'leaving' in event_data:
janus_publisher_id = event_data['leaving'] # janus_publisher_id == 'ok' when the event is about ourselves leaving the room
try:
base_session = self.videoroom_sessions[handle_id]
except KeyError:
if janus_publisher_id != 'ok':
self.log.warning('could not find video room session for handle ID %s during leaving event' % handle_id)
return
if janus_publisher_id == 'ok':
self.log.info('left video room {session.room.uri} with session {session.id}'.format(session=base_session))
self._cleanup_videoroom_session(base_session)
return
try:
publisher_session = base_session.feeds.pop(janus_publisher_id)
except KeyError:
return
data = dict(sylkrtc='videoroom_event', session=base_session.id, event='publishers_left', data=dict(publishers=[publisher_session.id]))
self._send_data(json.dumps(data))
elif {'started', 'unpublished', 'left', 'configured'}.intersection(event_data):
pass
else:
self.log.warning('received unexpected video room plugin event: type={} data={}'.format(event_type, event_data))
elif event_type == 'attached':
# sent when a feed is subscribed for a given publisher
try:
videoroom_session = self.videoroom_sessions[handle_id]
except KeyError:
self.log.warning('could not find video room session for handle ID %s during attached event' % handle_id)
return
# get the session which originated the subscription
base_session = videoroom_session.parent_session
jsep = event.get('jsep', None)
assert base_session is not None
assert jsep is not None
assert jsep['type'] == 'offer'
data = dict(sylkrtc='videoroom_event', session=base_session.id, event='feed_attached', data=dict(sdp=jsep['sdp'], subscription=videoroom_session.id))
self._send_data(json.dumps(data))
elif event_type == 'slow_link':
pass
else:
self.log.warning('received unexpected video room plugin event: type={} data={}'.format(event_type, event_data))

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 5:55 AM (1 d, 8 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408916
Default Alt Text
(70 KB)

Event Timeline