diff --git a/debian/control b/debian/control index 52b6048..8c77554 100644 --- a/debian/control +++ b/debian/control @@ -1,35 +1,35 @@ Source: sylkserver Section: net Priority: optional Maintainer: Dan Pascu Uploaders: Adrian Georgescu Build-Depends: debhelper (>= 9), dh-python, dh-systemd, python-all (>= 2.7) Standards-Version: 3.9.8 Package: sylkserver Architecture: all -Depends: ${python:Depends}, ${misc:Depends}, lsb-base, python-application (>= 1.4.0), python-eventlib, python-lxml, python-sipsimple (>= 3.0.0), python-twisted, python-klein +Depends: ${python:Depends}, ${misc:Depends}, lsb-base, python-application (>= 1.4.0), python-eventlib, python-lxml, python-sipsimple (>= 3.0.0), python-twisted, python-typing, python-klein Suggests: libavahi-compat-libdnssd1, python-twisted-words, python-wokkel (>= 0.7.0), sylkserver-webrtc-gateway Recommends: sylkserver-sounds Description: Extensible real-time-communications application server SylkServer is an application server that can be programmed to perform SIP end-point applications and act as a gateway between SIP and XMPP domains. Package: sylkserver-sounds Architecture: all Depends: ${misc:Depends}, sylkserver Description: Extensible real-time-communications application server sounds SylkServer is an application server that can be programmed to perform SIP end-point applications and act as a gateway between SIP and XMPP domains. . This package contains sounds used by SylkServer. Package: sylkserver-webrtc-gateway Architecture: all Depends: ${misc:Depends}, sylkserver, janus (>= 0.1.1), python-autobahn (>= 0.10.3) Description: Extensible real-time-communications application server WebRTC gateway SylkServer is an application server that can be programmed to perform SIP end-point applications and act as a gateway between SIP and XMPP domains. . This is a meta-package containing the dependencies required to run the WebRTC gateway application. diff --git a/sylk/applications/webrtcgateway/configuration.py b/sylk/applications/webrtcgateway/configuration.py index a895eae..c9db339 100644 --- a/sylk/applications/webrtcgateway/configuration.py +++ b/sylk/applications/webrtcgateway/configuration.py @@ -1,144 +1,153 @@ import os import re from application.configuration import ConfigFile, ConfigSection, ConfigSetting from application.configuration.datatypes import NetworkAddress, StringList from sylk.configuration import ServerConfig from sylk.configuration.datatypes import Path, SIPProxyAddress, VideoBitrate, VideoCodec __all__ = 'GeneralConfig', 'JanusConfig', 'get_room_config' # Datatypes class AccessPolicyValue(str): allowed_values = ('allow,deny', 'deny,allow') def __new__(cls, value): value = re.sub('\s', '', value) if value not in cls.allowed_values: raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values)) return str.__new__(cls, value) class Domain(str): domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$") def __new__(cls, value): value = str(value) if not cls.domain_re.match(value): raise ValueError("illegal domain: %s" % value) return str.__new__(cls, value) class SIPAddress(str): def __new__(cls, address): address = str(address) address = address.replace('@', '%40', address.count('@')-1) try: username, domain = address.split('@') Domain(domain) except ValueError: raise ValueError("illegal SIP address: %s, must be in user@domain format" % address) return str.__new__(cls, address) class PolicyItem(object): def __new__(cls, item): lowercase_item = item.lower() if lowercase_item in ('none', ''): return 'none' elif lowercase_item in ('any', 'all', '*'): return 'all' elif '@' in item: return SIPAddress(item) else: return Domain(item) class PolicySettingValue(object): def __init__(self, value): if isinstance(value, (tuple, list)): items = [str(x) for x in value] elif isinstance(value, str): items = re.split(r'\s*,\s*', value) else: raise TypeError("value must be a string, list or tuple") self.items = {PolicyItem(item) for item in items} self.items.discard('none') def __repr__(self): return '{0.__class__.__name__}({1})'.format(self, sorted(self.items)) def match(self, uri): if 'all' in self.items: return True elif not self.items: return False uri = re.sub('^(sip:|sips:)', '', str(uri)) domain = uri.split('@')[-1] return uri in self.items or domain in self.items class ManagementInterfaceAddress(NetworkAddress): default_port = 20888 # Configuration objects class GeneralConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'General' web_origins = ConfigSetting(type=StringList, value=['*']) sip_domains = ConfigSetting(type=StringList, value=['*']) outbound_sip_proxy = ConfigSetting(type=SIPProxyAddress, value=None) trace_client = False websocket_ping_interval = 120 recording_dir = ConfigSetting(type=Path, value=Path(os.path.join(ServerConfig.spool_dir.normalized, 'videoconference', 'recordings'))) http_management_interface = ConfigSetting(type=ManagementInterfaceAddress, value=ManagementInterfaceAddress('127.0.0.1')) http_management_auth_secret = ConfigSetting(type=str, value=None) firebase_server_key = ConfigSetting(type=str, value=None) class JanusConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' __section__ = 'Janus' api_url = 'ws://127.0.0.1:8188' api_secret = '0745f2f74f34451c89343afcdcae5809' trace_janus = False max_bitrate = ConfigSetting(type=VideoBitrate, value=VideoBitrate(2016000)) # ~2 MBits/s video_codec = ConfigSetting(type=VideoCodec, value=VideoCodec('vp9')) class RoomConfig(ConfigSection): __cfgfile__ = 'webrtcgateway.ini' record = False access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny')) allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all')) deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none')) max_bitrate = ConfigSetting(type=VideoBitrate, value=JanusConfig.max_bitrate) video_codec = ConfigSetting(type=VideoCodec, value=JanusConfig.video_codec) -class Configuration(object): +class VideoroomConfiguration(object): + video_codec = 'vp9' + max_bitrate = 2016000 + record = False + recording_dir = None + def __init__(self, data): self.__dict__.update(data) + @property + def janus_data(self): + return dict(videocodec=self.video_codec, bitrate=self.max_bitrate, record=self.record, rec_dir=self.recording_dir) + def get_room_config(room): config_file = ConfigFile(RoomConfig.__cfgfile__) section = config_file.get_section(room) if section is not None: RoomConfig.read(section=room) - config = Configuration(dict(RoomConfig)) + config = VideoroomConfiguration(dict(RoomConfig)) RoomConfig.reset() else: - config = Configuration(dict(RoomConfig)) # use room defaults + config = VideoroomConfiguration(dict(RoomConfig)) # use room defaults config.recording_dir = os.path.join(GeneralConfig.recording_dir, room) return config diff --git a/sylk/applications/webrtcgateway/handler.py b/sylk/applications/webrtcgateway/handler.py index 72d243e..1689837 100644 --- a/sylk/applications/webrtcgateway/handler.py +++ b/sylk/applications/webrtcgateway/handler.py @@ -1,1300 +1,1266 @@ import hashlib import json import random -import re import time import uuid from application.python import limit from application.python.weakref import defaultweakobjectmap from application.system import makedirs from eventlib import coros, proc -from eventlib.twistedutil import block_on from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.threading.green import call_in_green_thread, run_in_green_thread from string import maketrans from twisted.internet import reactor +from typing import Generic, Container, Iterable, Sized, TypeVar, Dict, Set, Optional, Union from sylk.applications.webrtcgateway.configuration import GeneralConfig, get_room_config -from sylk.applications.webrtcgateway.janus import JanusBackend, JanusError +from sylk.applications.webrtcgateway.janus import JanusBackend, JanusError, JanusSession, SIPPluginHandle, VideoroomPluginHandle from sylk.applications.webrtcgateway.logger import ConnectionLogger, VideoroomLogger -from sylk.applications.webrtcgateway.models import sylkrtc +from sylk.applications.webrtcgateway.models import sylkrtc, janus from sylk.applications.webrtcgateway.storage import TokenStorage -SIP_PREFIX_RE = re.compile('^sips?:') - - class AccountInfo(object): + # noinspection PyShadowingBuiltins def __init__(self, id, password, display_name=None, user_agent=None): self.id = id self.password = password self.display_name = display_name self.user_agent = user_agent self.registration_state = None - self.janus_handle_id = None + self.janus_handle = None # type: Optional[SIPPluginHandle] @property def uri(self): return 'sip:' + self.id @property def user_data(self): return dict(username=self.uri, display_name=self.display_name, user_agent=self.user_agent, ha1_secret=self.password) class SessionPartyIdentity(object): - def __init__(self, uri, display_name=''): + def __init__(self, uri, display_name=None): self.uri = uri self.display_name = display_name # todo: might need to replace this auto-resetting descriptor with a timer in case we need to know when the slow link state expired -# class SlowLinkState(object): def __init__(self): self.slow_link = False self.last_reported = 0 class SlowLinkDescriptor(object): __timeout__ = 30 # 30 seconds def __init__(self): self.values = defaultweakobjectmap(SlowLinkState) def __get__(self, instance, owner): if instance is None: return self state = self.values[instance] if state.slow_link and time.time() - state.last_reported > self.__timeout__: state.slow_link = False return state.slow_link def __set__(self, instance, value): state = self.values[instance] if value: state.last_reported = time.time() state.slow_link = bool(value) def __delete__(self, instance): raise AttributeError('Attribute cannot be deleted') class SIPSessionInfo(object): slow_download = SlowLinkDescriptor() slow_upload = SlowLinkDescriptor() + # noinspection PyShadowingBuiltins def __init__(self, id): self.id = id self.direction = None self.state = None self.account_id = None - self.local_identity = None # instance of SessionPartyIdentity - self.remote_identity = None # instance of SessionPartyIdentity - self.janus_handle_id = None + self.local_identity = None # type: SessionPartyIdentity + self.remote_identity = None # type: SessionPartyIdentity + self.janus_handle = None # type: SIPPluginHandle self.slow_download = False self.slow_upload = False def init_outgoing(self, account_id, destination): self.account_id = account_id self.direction = 'outgoing' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(destination) def init_incoming(self, account_id, originator, originator_display_name=''): self.account_id = account_id self.direction = 'incoming' self.state = 'connecting' self.local_identity = SessionPartyIdentity(account_id) self.remote_identity = SessionPartyIdentity(originator, originator_display_name) +class VideoroomSessionInfo(object): + slow_download = SlowLinkDescriptor() + slow_upload = SlowLinkDescriptor() + + # noinspection PyShadowingBuiltins + def __init__(self, id, owner): + self.id = id + self.owner = owner + self.account_id = None + self.type = None # publisher / subscriber + self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers + self.janus_handle = None # type: VideoroomPluginHandle + self.room = None + self.bitrate = None + self.parent_session = None + self.slow_download = False + self.slow_upload = False + self.feeds = PublisherFeedContainer() # keeps references to all the other participant's publisher feeds that we subscribed to + + # noinspection PyShadowingBuiltins + def initialize(self, account_id, type, room): + assert type in ('publisher', 'subscriber') + self.account_id = account_id + self.type = type + self.room = room + self.bitrate = room.config.max_bitrate + + def __repr__(self): + return '<{0.__class__.__name__}: type={0.type!r} id={0.id!r} janus_handle={0.janus_handle!r}>'.format(self) + + +class PublisherFeedContainer(object): + """A container for the other participant's publisher sessions that we have subscribed to""" + + def __init__(self): + self._publishers = set() + self._id_map = {} # map publisher.id -> publisher and publisher.publisher_id -> publisher + + def add(self, session): + assert session not in self._publishers + assert session.id not in self._id_map and session.publisher_id not in self._id_map + self._publishers.add(session) + self._id_map[session.id] = self._id_map[session.publisher_id] = session + + def discard(self, item): # item can be any of session, session.id or session.publisher_id + session = self._id_map[item] if item in self._id_map else item if item in self._publishers else None + if session is not None: + self._publishers.discard(session) + self._id_map.pop(session.id, None) + self._id_map.pop(session.publisher_id, None) + + def remove(self, item): # item can be any of session, session.id or session.publisher_id + session = self._id_map[item] if item in self._id_map else item + self._publishers.remove(session) + self._id_map.pop(session.id) + self._id_map.pop(session.publisher_id) + + def pop(self, item): # item can be any of session, session.id or session.publisher_id + session = self._id_map[item] if item in self._id_map else item + self._publishers.remove(session) + self._id_map.pop(session.id) + self._id_map.pop(session.publisher_id) + return session + + def clear(self): + self._publishers.clear() + self._id_map.clear() + + def __len__(self): + return len(self._publishers) + + def __iter__(self): + return iter(self._publishers) + + def __getitem__(self, key): + return self._id_map[key] + + def __contains__(self, item): + return item in self._id_map or item in self._publishers + + class Videoroom(object): def __init__(self, uri): self.id = random.getrandbits(32) # janus needs numeric room names self.uri = uri self.config = get_room_config(uri) self.log = VideoroomLogger(self) self._active_participants = [] - self._sessions = set() - self._id_map = {} # map session.id -> session and session.publisher_id -> session + self._sessions = set() # type: Set[VideoroomSessionInfo] + self._id_map = {} # type: Dict[Union[str, int], VideoroomSessionInfo] # map session.id -> session and session.publisher_id -> session if self.config.record: makedirs(self.config.recording_dir, 0o755) self.log.info('created (recording on)') else: self.log.info('created') @property def active_participants(self): return self._active_participants @active_participants.setter def active_participants(self, participant_list): unknown_participants = set(participant_list).difference(self._id_map) if unknown_participants: raise ValueError('unknown participant session id: {}'.format(', '.join(unknown_participants))) if self._active_participants != participant_list: self._active_participants = participant_list self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None)) self._update_bitrate() def add(self, session): assert session not in self._sessions assert session.publisher_id is not None assert session.publisher_id not in self._id_map and session.id not in self._id_map self._sessions.add(session) self._id_map[session.id] = self._id_map[session.publisher_id] = session self.log.info('{session.account_id} has joined the room'.format(session=session)) self._update_bitrate() if self._active_participants: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=self._active_participants, originator='videoroom')) def discard(self, session): if session in self._sessions: self._sessions.discard(session) self._id_map.pop(session.id, None) self._id_map.pop(session.publisher_id, None) self.log.info('{session.account_id} has left the room'.format(session=session)) if session.id in self._active_participants: self._active_participants.remove(session.id) self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None)) for session in self._sessions: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=self._active_participants, originator='videoroom')) self._update_bitrate() def remove(self, session): self._sessions.remove(session) self._id_map.pop(session.id) self._id_map.pop(session.publisher_id) self.log.info('{session.account_id} has left the room'.format(session=session)) if session.id in self._active_participants: self._active_participants.remove(session.id) self.log.info('active participants: {}'.format(', '.join(self._active_participants) or None)) for session in self._sessions: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=self._active_participants, originator='videoroom')) self._update_bitrate() def clear(self): for session in self._sessions: self.log.info('{session.account_id} has left the room'.format(session=session)) self._active_participants = [] self._sessions.clear() self._id_map.clear() def allow_uri(self, uri): config = self.config if config.access_policy == 'allow,deny': return config.allow.match(uri) and not config.deny.match(uri) else: return not config.deny.match(uri) or config.allow.match(uri) def _update_bitrate(self): if self._sessions: if self._active_participants: # todo: should we use max_bitrate / 2 or max_bitrate for each active participant if there are 2 active participants? active_participant_bitrate = self.config.max_bitrate // len(self._active_participants) other_participant_bitrate = 100000 self.log.info('participant bitrate is {} (active) / {} (others)'.format(active_participant_bitrate, other_participant_bitrate)) for session in self._sessions: if session.id in self._active_participants: bitrate = active_participant_bitrate else: bitrate = other_participant_bitrate if session.bitrate != bitrate: session.bitrate = bitrate - data = dict(request='configure', room=self.id, bitrate=bitrate) - session.owner.janus.message(session.owner.janus_session_id, session.janus_handle_id, data) + session.janus_handle.message(janus.VideoroomUpdatePublisher(bitrate=bitrate), async=True) else: bitrate = self.config.max_bitrate // limit(len(self._sessions) - 1, min=1) self.log.info('participant bitrate is {}'.format(bitrate)) for session in self._sessions: if session.bitrate != bitrate: session.bitrate = bitrate - data = dict(request='configure', room=self.id, bitrate=bitrate) - session.owner.janus.message(session.owner.janus_session_id, session.janus_handle_id, data) + session.janus_handle.message(janus.VideoroomUpdatePublisher(bitrate=bitrate), async=True) # todo: make Videoroom be a context manager that is retained/released on enter/exit and implement __nonzero__ to be different from __len__ # todo: so that a videoroom is not accidentally released by the last participant leaving while a new participant waits to join # todo: this needs a new model for communication with janus and the client that is pseudo-synchronous (uses green threads) def __len__(self): return len(self._sessions) def __iter__(self): return iter(self._sessions) def __getitem__(self, key): return self._id_map[key] def __contains__(self, item): return item in self._id_map or item in self._sessions -class PublisherFeedContainer(object): - """A container for the other participant's publisher sessions that we have subscribed to""" - - def __init__(self): - self._publishers = set() - self._id_map = {} # map publisher.id -> publisher and publisher.publisher_id -> publisher - - def add(self, session): - assert session not in self._publishers - assert session.id not in self._id_map and session.publisher_id not in self._id_map - self._publishers.add(session) - self._id_map[session.id] = self._id_map[session.publisher_id] = session - - def discard(self, item): # item can be any of session, session.id or session.publisher_id - session = self._id_map[item] if item in self._id_map else item if item in self._publishers else None - if session is not None: - self._publishers.discard(session) - self._id_map.pop(session.id, None) - self._id_map.pop(session.publisher_id, None) +SessionT = TypeVar('SessionT', SIPSessionInfo, VideoroomSessionInfo) - def remove(self, item): # item can be any of session, session.id or session.publisher_id - session = self._id_map[item] if item in self._id_map else item - self._publishers.remove(session) - self._id_map.pop(session.id) - self._id_map.pop(session.publisher_id) - def pop(self, item): # item can be any of session, session.id or session.publisher_id - session = self._id_map[item] if item in self._id_map else item - self._publishers.remove(session) - self._id_map.pop(session.id) - self._id_map.pop(session.publisher_id) - return session - - def clear(self): - self._publishers.clear() - self._id_map.clear() - - def __len__(self): - return len(self._publishers) - - def __iter__(self): - return iter(self._publishers) - - def __getitem__(self, key): - return self._id_map[key] - - def __contains__(self, item): - return item in self._id_map or item in self._publishers - - -class VideoroomSessionInfo(object): - slow_download = SlowLinkDescriptor() - slow_upload = SlowLinkDescriptor() - - def __init__(self, id, owner): - self.id = id - self.owner = owner - self.account_id = None - self.type = None # publisher / subscriber - self.publisher_id = None # janus publisher ID for publishers / publisher session ID for subscribers - self.janus_handle_id = None - self.room = None - self.bitrate = None - self.parent_session = None - self.slow_download = False - self.slow_upload = False - self.feeds = PublisherFeedContainer() # keeps references to all the other participant's publisher feeds that we subscribed to - - def initialize(self, account_id, type, room): - assert type in ('publisher', 'subscriber') - self.account_id = account_id - self.type = type - self.room = room - self.bitrate = room.config.max_bitrate - - def __repr__(self): - return '<%s: id=%s janus_handle_id=%s type=%s>' % (self.__class__.__name__, self.id, self.janus_handle_id, self.type) - - -class SessionContainer(object): +class SessionContainer(Sized, Iterable[SessionT], Container[SessionT], Generic[SessionT]): def __init__(self): self._sessions = set() - self._id_map = {} # map session.id -> session and session.janus_handle_id -> session + self._id_map = {} # map session.id -> session and session.janus_handle.id -> session def add(self, session): assert session not in self._sessions - assert session.id not in self._id_map and session.janus_handle_id not in self._id_map + assert session.id not in self._id_map and session.janus_handle.id not in self._id_map self._sessions.add(session) - self._id_map[session.id] = self._id_map[session.janus_handle_id] = session + self._id_map[session.id] = self._id_map[session.janus_handle.id] = session - def discard(self, item): # item can be any of session, session.id or session.janus_handle_id + def discard(self, item): # item can be any of session, session.id or session.janus_handle.id session = self._id_map[item] if item in self._id_map else item if item in self._sessions else None if session is not None: self._sessions.discard(session) self._id_map.pop(session.id, None) - self._id_map.pop(session.janus_handle_id, None) + self._id_map.pop(session.janus_handle.id, None) - def remove(self, item): # item can be any of session, session.id or session.janus_handle_id + def remove(self, item): # item can be any of session, session.id or session.janus_handle.id session = self._id_map[item] if item in self._id_map else item self._sessions.remove(session) self._id_map.pop(session.id) - self._id_map.pop(session.janus_handle_id) + self._id_map.pop(session.janus_handle.id) - def pop(self, item): # item can be any of session, session.id or session.janus_handle_id + def pop(self, item): # item can be any of session, session.id or session.janus_handle.id session = self._id_map[item] if item in self._id_map else item self._sessions.remove(session) self._id_map.pop(session.id) - self._id_map.pop(session.janus_handle_id) + self._id_map.pop(session.janus_handle.id) return session def clear(self): self._sessions.clear() self._id_map.clear() def __len__(self): return len(self._sessions) def __iter__(self): return iter(self._sessions) def __getitem__(self, key): return self._id_map[key] def __contains__(self, item): return item in self._id_map or item in self._sessions class OperationName(str): __normalizer__ = maketrans('-', '_') @property def normalized(self): return self.translate(self.__normalizer__) class Operation(object): __slots__ = 'type', 'name', 'data' __types__ = 'request', 'event' # noinspection PyShadowingBuiltins def __init__(self, type, name, data): if type not in self.__types__: raise ValueError("Can't instantiate class {.__class__.__name__} with unknown type: {!r}".format(self, type)) self.type = type self.name = OperationName(name) self.data = data class APIError(Exception): pass class GreenEvent(object): def __init__(self): self._event = coros.event() def set(self): if self._event.ready(): return self._event.send(True) def is_set(self): return self._event.ready() def clear(self): if self._event.ready(): self._event.reset() def wait(self): return self._event.wait() +# noinspection PyPep8Naming class ConnectionHandler(object): janus = JanusBackend() def __init__(self, protocol): self.protocol = protocol self.device_id = hashlib.md5(protocol.peer).digest().encode('base64').rstrip('=\n') - self.janus_session_id = None + self.janus_session = None # type: JanusSession self.accounts_map = {} # account ID -> account self.account_handles_map = {} # Janus handle ID -> account - self.sip_sessions = SessionContainer() # keeps references to all the SIP sessions created or received by this device - self.videoroom_sessions = SessionContainer() # keeps references to all the videoroom sessions created by this participant (as publisher and subscriber) + self.sip_sessions = SessionContainer() # type: SessionContainer[SIPSessionInfo] # incoming and outgoing SIP sessions + self.videoroom_sessions = SessionContainer() # type: SessionContainer[VideoroomSessionInfo] # publisher and subscriber sessions in video rooms self.ready_event = GreenEvent() self.resolver = DNSLookup() self.proc = proc.spawn(self._operations_handler) self.operations_queue = coros.queue() self.log = ConnectionLogger(self) self.state = None self._stop_pending = False @run_in_green_thread def start(self): self.state = 'starting' try: - self.janus_session_id = block_on(self.janus.create_session()) + self.janus_session = JanusSession() except Exception as e: self.state = 'failed' self.log.warning('could not create session, disconnecting: %s' % e) if self._stop_pending: # if stop was already called it means we were already disconnected self.stop() else: self.protocol.disconnect(3000, unicode(e)) else: self.state = 'started' self.ready_event.set() if self._stop_pending: self.stop() else: self.send(sylkrtc.ReadyEvent()) def stop(self): if self.state in (None, 'starting'): self._stop_pending = True return self.state = 'stopping' self._stop_pending = False if self.proc is not None: # Kill the operation's handler proc first, in order to not have any operations active while we cleanup. self.proc.kill() # Also proc.kill() will switch to another green thread, which is another reason to do it first so that self.proc = None # we do not switch to another green thread in the middle of the cleanup with a partially deleted handler if self.ready_event.is_set(): # Do not explicitly detach the janus plugin handles before destroying the janus session. Janus runs each request in a different # thread, so making detach and destroy request without waiting for the detach to finish can result in errors from race conditions. # Because we do not want to wait for them, we will rely instead on the fact that janus automatically detaches the plugin handles # when it destroys a session, so we only remove our event handlers and issue a destroy request for the session. for account_info in self.accounts_map.values(): - if account_info.janus_handle_id is not None: - self.janus.set_event_handler(account_info.janus_handle_id, None) + if account_info.janus_handle is not None: + self.janus.set_event_handler(account_info.janus_handle.id, None) for session in self.sip_sessions: - if session.janus_handle_id is not None: - self.janus.set_event_handler(session.janus_handle_id, None) + if session.janus_handle is not None: + self.janus.set_event_handler(session.janus_handle.id, None) for session in self.videoroom_sessions: - if session.janus_handle_id is not None: - self.janus.set_event_handler(session.janus_handle_id, None) + if session.janus_handle is not None: + self.janus.set_event_handler(session.janus_handle.id, None) session.room.discard(session) session.feeds.clear() - self.janus.destroy_session(self.janus_session_id) # this automatically detaches all plugin handles associated with it, not need to manually do it + self.janus_session.destroy() # this automatically detaches all plugin handles associated with it, no need to manually do it # cleanup self.ready_event.clear() self.accounts_map.clear() self.account_handles_map.clear() self.sip_sessions.clear() self.videoroom_sessions.clear() - self.janus_session_id = None + self.janus_session = None self.protocol = None self.state = 'stopped' def handle_message(self, message): try: request = sylkrtc.SylkRTCRequest.from_message(message) except sylkrtc.ProtocolError as e: self.log.error(str(e)) except Exception as e: self.log.error('{request_type}: {exception!s}'.format(request_type=message['sylkrtc'], exception=e)) if 'transaction' in message: self.send(sylkrtc.ErrorResponse(transaction=message['transaction'], error=str(e))) else: operation = Operation(type='request', name=request.sylkrtc, data=request) self.operations_queue.send(operation) def send(self, message): if self.protocol is not None: self.protocol.sendMessage(json.dumps(message.__data__)) # internal methods (not overriding / implementing the protocol API) def _cleanup_session(self, session): # should only be called from a green thread. - if self.janus_session_id is None: # The connection was closed, there is noting to do + if self.janus_session is None: # The connection was closed, there is noting to do return if session in self.sip_sessions: self.sip_sessions.remove(session) if session.direction == 'outgoing': # Destroy plugin handle for outgoing sessions. For incoming ones it's the same as the account handle, so don't - block_on(self.janus.detach(self.janus_session_id, session.janus_handle_id)) - self.janus.set_event_handler(session.janus_handle_id, None) + session.janus_handle.detach() def _cleanup_videoroom_session(self, session): # should only be called from a green thread. - if self.janus_session_id is None: # The connection was closed, there is noting to do + if self.janus_session is None: # The connection was closed, there is noting to do return if session in self.videoroom_sessions: self.videoroom_sessions.remove(session) if session.type == 'publisher': session.room.discard(session) session.feeds.clear() - block_on(self.janus.detach(self.janus_session_id, session.janus_handle_id)) - self.janus.set_event_handler(session.janus_handle_id, None) + session.janus_handle.detach() self._maybe_destroy_videoroom(session.room) else: session.parent_session.feeds.discard(session.publisher_id) - block_on(self.janus.detach(self.janus_session_id, session.janus_handle_id)) - self.janus.set_event_handler(session.janus_handle_id, None) + session.janus_handle.detach() def _maybe_destroy_videoroom(self, videoroom): # should only be called from a green thread. - if self.protocol is None or self.janus_session_id is None: # The connection was closed, there is nothing to do + if self.protocol is None or self.janus_session is None: # The connection was closed, there is nothing to do return if videoroom in self.protocol.factory.videorooms and not videoroom: self.protocol.factory.videorooms.remove(videoroom) - # create a handle to do the cleanup - handle_id = block_on(self.janus.attach(self.janus_session_id, 'janus.plugin.videoroom')) - self.janus.set_event_handler(handle_id, self._handle_janus_event_videoroom) - data = dict(request='destroy', room=videoroom.id) - try: - block_on(self.janus.message(self.janus_session_id, handle_id, data)) - except JanusError: - pass - block_on(self.janus.detach(self.janus_session_id, handle_id)) - self.janus.set_event_handler(handle_id, None) + with VideoroomPluginHandle(self.janus_session, event_handler=self._handle_janus_videoroom_event) as videoroom_handle: + videoroom_handle.destroy(room=videoroom.id) videoroom.log.info('destroyed') def _lookup_sip_proxy(self, uri): # The proxy dance: Sofia-SIP seems to do a DNS lookup per SIP message when a domain is passed # as the proxy, so do the resolution ourselves and give it pre-resolver proxy URL. Since we use # caching to avoid long delays, we randomize the results matching the highest priority route's # transport. proxy = GeneralConfig.outbound_sip_proxy if proxy is not None: sip_uri = SIPURI(host=proxy.host, port=proxy.port, parameters={'transport': proxy.transport}) else: sip_uri = SIPURI.parse('sip:%s' % uri) settings = SIPSimpleSettings() try: routes = self.resolver.lookup_sip_proxy(sip_uri, settings.sip.transport_list).wait() except DNSLookupError as e: raise DNSLookupError('DNS lookup error: {exception!s}'.format(exception=e)) if not routes: raise DNSLookupError('DNS lookup error: no results found') route = random.choice([r for r in routes if r.transport == routes[0].transport]) self.log.debug('DNS lookup for SIP proxy for {} yielded {}'.format(uri, route)) # Build a proxy URI Sofia-SIP likes return 'sips:{route.address}:{route.port}'.format(route=route) if route.transport == 'tls' else str(route.uri) - def _handle_janus_event_sip(self, handle_id, event_type, event): - # TODO: use a model - self.log.debug('janus SIP event: type={event_type} handle_id={handle_id} event={event}'.format(event_type=event_type, handle_id=handle_id, event=event)) - operation = Operation(type='event', name='janus-event-sip', data=dict(handle_id=handle_id, event_type=event_type, event=event)) + def _handle_janus_sip_event(self, event): + operation = Operation(type='event', name='janus-sip', data=event) self.operations_queue.send(operation) - def _handle_janus_event_videoroom(self, handle_id, event_type, event): - # TODO: use a model - self.log.debug('janus video room event: type={event_type} handle_id={handle_id} event={event}'.format(event_type=event_type, handle_id=handle_id, event=event)) - operation = Operation(type='event', name='janus-event-videoroom', data=dict(handle_id=handle_id, event_type=event_type, event=event)) + def _handle_janus_videoroom_event(self, event): + operation = Operation(type='event', name='janus-videoroom', data=event) self.operations_queue.send(operation) def _operations_handler(self): self.ready_event.wait() while True: operation = self.operations_queue.wait() handler = getattr(self, '_OH_' + operation.type) handler(operation) del operation, handler def _OH_request(self, operation): handler = getattr(self, '_RH_' + operation.name.normalized) request = operation.data try: handler(request) except (APIError, DNSLookupError, JanusError) as e: self.log.error('{operation.name}: {exception!s}'.format(operation=operation, exception=e)) self.send(sylkrtc.ErrorResponse(transaction=request.transaction, error=str(e))) except Exception as e: self.log.exception('{operation.type} {operation.name}: {exception!s}'.format(operation=operation, exception=e)) self.send(sylkrtc.ErrorResponse(transaction=request.transaction, error='Internal error')) else: self.send(sylkrtc.AckResponse(transaction=request.transaction)) def _OH_event(self, operation): handler = getattr(self, '_EH_' + operation.name.normalized) try: handler(operation.data) except Exception as e: self.log.exception('{operation.type} {operation.name}: {exception!s}'.format(operation=operation, exception=e)) # Request handlers def _RH_account_add(self, request): if request.account in self.accounts_map: raise APIError('Account {request.account} already added'.format(request=request)) # check if domain is acceptable domain = request.account.partition('@')[2] if not {'*', domain}.intersection(GeneralConfig.sip_domains): raise APIError('SIP domain not allowed: %s' % domain) # Create and store our mapping account_info = AccountInfo(request.account, request.password, request.display_name, request.user_agent) self.accounts_map[account_info.id] = account_info self.log.info('added account {request.account} using {request.user_agent}'.format(request=request)) def _RH_account_remove(self, request): try: account_info = self.accounts_map.pop(request.account) except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) # cleanup in case the client didn't unregister before removing the account - handle_id = account_info.janus_handle_id - if handle_id is not None: - block_on(self.janus.detach(self.janus_session_id, handle_id)) - self.janus.set_event_handler(handle_id, None) - self.account_handles_map.pop(handle_id) + if account_info.janus_handle is not None: + account_info.janus_handle.detach() + self.account_handles_map.pop(account_info.janus_handle.id) self.log.info('removed account {request.account}'.format(request=request)) def _RH_account_register(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) proxy = self._lookup_sip_proxy(request.account) - handle_id = account_info.janus_handle_id - if handle_id is not None: + if account_info.janus_handle is not None: # Destroy the existing plugin handle - block_on(self.janus.detach(self.janus_session_id, handle_id)) - self.janus.set_event_handler(handle_id, None) - self.account_handles_map.pop(handle_id) - account_info.janus_handle_id = None + account_info.janus_handle.detach() + self.account_handles_map.pop(account_info.janus_handle.id) + account_info.janus_handle = None # Create a plugin handle - handle_id = block_on(self.janus.attach(self.janus_session_id, 'janus.plugin.sip')) - self.janus.set_event_handler(handle_id, self._handle_janus_event_sip) - account_info.janus_handle_id = handle_id - self.account_handles_map[handle_id] = account_info + account_info.janus_handle = SIPPluginHandle(self.janus_session, event_handler=self._handle_janus_sip_event) + self.account_handles_map[account_info.janus_handle.id] = account_info - data = dict(request='register', proxy=proxy, **account_info.user_data) - block_on(self.janus.message(self.janus_session_id, handle_id, data)) + account_info.janus_handle.register(account_info, proxy=proxy) def _RH_account_unregister(self, request): try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) - handle_id = account_info.janus_handle_id - if handle_id is not None: - block_on(self.janus.detach(self.janus_session_id, handle_id)) - self.janus.set_event_handler(handle_id, None) - account_info.janus_handle_id = None - self.account_handles_map.pop(handle_id) + if account_info.janus_handle is not None: + account_info.janus_handle.detach() + self.account_handles_map.pop(account_info.janus_handle.id) + account_info.janus_handle = None self.log.info('unregistered {request.account} from receiving incoming calls'.format(request=request)) def _RH_account_devicetoken(self, request): if request.account not in self.accounts_map: raise APIError('Unknown account specified: {request.account}'.format(request=request)) storage = TokenStorage() if request.old_token is not None: storage.remove(request.account, request.old_token) self.log.debug('removed token {request.old_token} for {request.account}'.format(request=request)) if request.new_token is not None: storage.add(request.account, request.new_token) self.log.debug('added token {request.new_token} for {request.account}'.format(request=request)) def _RH_session_create(self, request): if request.session in self.sip_sessions: raise APIError('Session ID {request.session} already in use'.format(request=request)) try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) proxy = self._lookup_sip_proxy(request.uri) # Create a new plugin handle and 'register' it, without actually doing so - handle_id = block_on(self.janus.attach(self.janus_session_id, 'janus.plugin.sip')) - self.janus.set_event_handler(handle_id, self._handle_janus_event_sip) + janus_handle = SIPPluginHandle(self.janus_session, event_handler=self._handle_janus_sip_event) try: - data = dict(request='register', send_register=False, proxy=proxy, **account_info.user_data) - block_on(self.janus.message(self.janus_session_id, handle_id, data)) - - data = dict(request='call', uri='sip:%s' % SIP_PREFIX_RE.sub('', request.uri), srtp='sdes_optional') - jsep = dict(type='offer', sdp=request.sdp) - block_on(self.janus.message(self.janus_session_id, handle_id, data, jsep)) - except: - try: - block_on(self.janus.detach(self.janus_session_id, handle_id)) - except JanusError: - pass - self.janus.set_event_handler(handle_id, None) + janus_handle.call(account_info, uri=request.uri, sdp=request.sdp, proxy=proxy) + except Exception: + janus_handle.detach() raise session_info = SIPSessionInfo(request.session) - session_info.janus_handle_id = handle_id + session_info.janus_handle = janus_handle session_info.init_outgoing(request.account, request.uri) self.sip_sessions.add(session_info) self.log.info('created outgoing session {request.session} from {request.account} to {request.uri}'.format(request=request)) def _RH_session_answer(self, request): try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session specified: {request.session}'.format(request=request)) if session_info.direction != 'incoming': raise APIError('Cannot answer outgoing session {request.session}'.format(request=request)) if session_info.state != 'connecting': raise APIError('Invalid state for answering session {session.id}: {session.state}'.format(session=session_info)) - data = dict(request='accept') - jsep = dict(type='answer', sdp=request.sdp) - block_on(self.janus.message(self.janus_session_id, session_info.janus_handle_id, data, jsep)) + session_info.janus_handle.accept(sdp=request.sdp) self.log.info('answered incoming session {session.id}'.format(session=session_info)) def _RH_session_trickle(self, request): try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session specified: {request.session}'.format(request=request)) if session_info.state == 'terminated': raise APIError('Session {request.session} is terminated'.format(request=request)) - candidates = request.candidates.__data__ - - block_on(self.janus.trickle(self.janus_session_id, session_info.janus_handle_id, candidates)) + session_info.janus_handle.trickle(request.candidates) - if not candidates: + if not request.candidates: self.log.debug('session {session.id} negotiated ICE'.format(session=session_info)) def _RH_session_terminate(self, request): try: session_info = self.sip_sessions[request.session] except KeyError: raise APIError('Unknown session specified: {request.session}'.format(request=request)) if session_info.state not in ('connecting', 'progress', 'accepted', 'established'): raise APIError('Invalid state for terminating session {session.id}: {session.state}'.format(session=session_info)) if session_info.direction == 'incoming' and session_info.state == 'connecting': - data = dict(request='decline', code=486) + session_info.janus_handle.decline() else: - data = dict(request='hangup') - block_on(self.janus.message(self.janus_session_id, session_info.janus_handle_id, data)) + session_info.janus_handle.hangup() self.log.info('requested termination for session {session.id}'.format(session=session_info)) def _RH_videoroom_join(self, request): if request.session in self.videoroom_sessions: raise APIError('Session ID {request.session} already in use'.format(request=request)) try: account_info = self.accounts_map[request.account] except KeyError: raise APIError('Unknown account specified: {request.account}'.format(request=request)) try: videoroom = self.protocol.factory.videorooms[request.uri] except KeyError: videoroom = Videoroom(request.uri) self.protocol.factory.videorooms.add(videoroom) if not videoroom.allow_uri(request.account): self._maybe_destroy_videoroom(videoroom) raise APIError('{request.account} is not allowed to join room {request.uri}'.format(request=request)) try: - handle_id = block_on(self.janus.attach(self.janus_session_id, 'janus.plugin.videoroom')) - self.janus.set_event_handler(handle_id, self._handle_janus_event_videoroom) + videoroom_handle = VideoroomPluginHandle(self.janus_session, event_handler=self._handle_janus_videoroom_event) try: - # create the room if it doesn't exist - config = videoroom.config - data = dict(request='create', room=videoroom.id, publishers=10, bitrate=config.max_bitrate, videocodec=config.video_codec, record=config.record, rec_dir=config.recording_dir) try: - block_on(self.janus.message(self.janus_session_id, handle_id, data)) + videoroom_handle.create(room=videoroom.id, config=videoroom.config, publishers=10) except JanusError as e: if e.code != 427: # 427 means room already exists raise - - # join the room - data = dict(request='joinandconfigure', room=videoroom.id, ptype='publisher', audio=True, video=True) - if account_info.display_name: - data.update(display=account_info.display_name) - jsep = dict(type='offer', sdp=request.sdp) - block_on(self.janus.message(self.janus_session_id, handle_id, data, jsep)) - except: - try: - block_on(self.janus.detach(self.janus_session_id, handle_id)) - except JanusError: - pass - self.janus.set_event_handler(handle_id, None) + videoroom_handle.join(room=videoroom.id, sdp=request.sdp, display_name=account_info.display_name) + except Exception: + videoroom_handle.detach() raise - except: + except Exception: self._maybe_destroy_videoroom(videoroom) raise videoroom_session = VideoroomSessionInfo(request.session, owner=self) - videoroom_session.janus_handle_id = handle_id + videoroom_session.janus_handle = videoroom_handle videoroom_session.initialize(request.account, 'publisher', videoroom) self.videoroom_sessions.add(videoroom_session) self.send(sylkrtc.VideoroomSessionProgressEvent(session=videoroom_session.id)) self.log.info('created session {request.session} from account {request.account} to video room {request.uri}'.format(request=request)) def _RH_videoroom_leave(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) - data = dict(request='leave') - block_on(self.janus.message(self.janus_session_id, videoroom_session.janus_handle_id, data)) + videoroom_session.janus_handle.leave() self.send(sylkrtc.VideoroomSessionTerminatedEvent(session=videoroom_session.id)) # safety net in case we do not get any answer for the leave request # todo: to be adjusted later after pseudo-synchronous communication with janus is implemented reactor.callLater(2, call_in_green_thread, self._cleanup_videoroom_session, videoroom_session) self.log.info('leaving video room session {request.session}'.format(request=request)) def _RH_videoroom_configure(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) videoroom = videoroom_session.room # todo: should we send out events if the active participant list did not change? try: videoroom.active_participants = request.active_participants except ValueError as e: raise APIError(str(e)) for session in videoroom: session.owner.send(sylkrtc.VideoroomConfigureEvent(session=session.id, active_participants=videoroom.active_participants, originator=request.session)) def _RH_videoroom_feed_attach(self, request): if request.feed in self.videoroom_sessions: raise APIError('Video room session ID {request.feed} already in use'.format(request=request)) - # get the 'base' session, the one used to join and publish try: - base_session = self.videoroom_sessions[request.session] + base_session = self.videoroom_sessions[request.session] # our 'base' session (the one used to join and publish) except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) - # get the publisher's session try: - publisher_session = base_session.room[request.publisher] + publisher_session = base_session.room[request.publisher] # the publisher's session (the one we want to subscribe to) except KeyError: raise APIError('Unknown publisher video room session to attach to: {request.publisher}'.format(request=request)) if publisher_session.publisher_id is None: raise APIError('Video room session {session.id} does not have a publisher ID'.format(session=publisher_session)) - handle_id = block_on(self.janus.attach(self.janus_session_id, 'janus.plugin.videoroom')) - self.janus.set_event_handler(handle_id, self._handle_janus_event_videoroom) + videoroom_handle = VideoroomPluginHandle(self.janus_session, event_handler=self._handle_janus_videoroom_event) - # join the room as a listener try: - data = dict(request='join', room=base_session.room.id, ptype='listener', feed=publisher_session.publisher_id) - block_on(self.janus.message(self.janus_session_id, handle_id, data)) - except: - try: - block_on(self.janus.detach(self.janus_session_id, handle_id)) - except JanusError: - pass - self.janus.set_event_handler(handle_id, None) + videoroom_handle.feed_attach(room=base_session.room.id, feed=publisher_session.publisher_id) + except Exception: + videoroom_handle.detach() raise videoroom_session = VideoroomSessionInfo(request.feed, owner=self) - videoroom_session.janus_handle_id = handle_id + videoroom_session.janus_handle = videoroom_handle videoroom_session.parent_session = base_session videoroom_session.publisher_id = publisher_session.id videoroom_session.initialize(base_session.account_id, 'subscriber', base_session.room) self.videoroom_sessions.add(videoroom_session) base_session.feeds.add(publisher_session) def _RH_videoroom_feed_answer(self, request): try: videoroom_session = self.videoroom_sessions[request.feed] except KeyError: raise APIError('Unknown video room session: {request.feed}'.format(request=request)) if videoroom_session.parent_session.id != request.session: raise APIError('{request.feed} is not an attached feed of {request.session}'.format(request=request)) - data = dict(request='start', room=videoroom_session.room.id) - jsep = dict(type='answer', sdp=request.sdp) - block_on(self.janus.message(self.janus_session_id, videoroom_session.janus_handle_id, data, jsep)) + videoroom_session.janus_handle.feed_start(sdp=request.sdp) def _RH_videoroom_feed_detach(self, request): try: videoroom_session = self.videoroom_sessions[request.feed] except KeyError: raise APIError('Unknown video room session to detach: {request.feed}'.format(request=request)) if videoroom_session.parent_session.id != request.session: raise APIError('{request.feed} is not an attached feed of {request.session}'.format(request=request)) - data = dict(request='leave') - block_on(self.janus.message(self.janus_session_id, videoroom_session.janus_handle_id, data)) + videoroom_session.janus_handle.feed_detach() self._cleanup_videoroom_session(videoroom_session) def _RH_videoroom_invite(self, request): try: base_session = self.videoroom_sessions[request.session] account_info = self.accounts_map[base_session.account_id] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) room = base_session.room participants = set(request.participants) originator = sylkrtc.SIPIdentity(uri=account_info.id, display_name=account_info.display_name) event = sylkrtc.AccountConferenceInviteEvent(account='placeholder', room=room.uri, originator=originator) for protocol in self.protocol.factory.connections.difference([self.protocol]): connection_handler = protocol.connection_handler for account in participants.intersection(connection_handler.accounts_map): event.account = account connection_handler.send(event) room.log.info('invitation from %s for %s', originator.uri, account) connection_handler.log.info('received an invitation from %s for %s to join video room %s', originator.uri, account, room.uri) def _RH_videoroom_session_trickle(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) - candidates = request.candidates.__data__ - block_on(self.janus.trickle(self.janus_session_id, videoroom_session.janus_handle_id, candidates)) - if not candidates and videoroom_session.type == 'publisher': + videoroom_session.janus_handle.trickle(request.candidates) + if not request.candidates and videoroom_session.type == 'publisher': self.log.debug('video room session {session.id} negotiated ICE'.format(session=videoroom_session)) def _RH_videoroom_session_update(self, request): try: videoroom_session = self.videoroom_sessions[request.session] except KeyError: raise APIError('Unknown video room session: {request.session}'.format(request=request)) - update_data = request.options.__data__ - if update_data: - data = dict(request='configure', room=videoroom_session.room.id, **update_data) - block_on(self.janus.message(self.janus_session_id, videoroom_session.janus_handle_id, data)) - modified = ', '.join('{}={}'.format(key, update_data[key]) for key in update_data) + options = request.options.__data__ + if options: + videoroom_session.janus_handle.update_publisher(options) + modified = ', '.join('{}={}'.format(key, options[key]) for key in options) self.log.info('updated video room session {request.session} with {modified}'.format(request=request, modified=modified)) # Event handlers - def _EH_janus_event_sip(self, data): - handle_id = data['handle_id'] - event_type = data['event_type'] - event = data['event'] - - if event_type == 'event': - self._janus_event_plugin_sip(data) - elif event_type == 'webrtcup': - try: - session_info = self.sip_sessions[handle_id] - except KeyError: - self.log.warning('could not find session for handle ID %s' % handle_id) - return - session_info.state = 'established' - self.send(sylkrtc.SessionEstablishedEvent(session=session_info.id)) - self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) - self.log.info('established WEBRTC connection for session {session.id}'.format(session=session_info)) - elif event_type == 'hangup': - try: - session_info = self.sip_sessions[handle_id] - except KeyError: - return - if session_info.state != 'terminated': - session_info.state = 'terminated' - reason = event.get('reason', 'reason unspecified') - self.send(sylkrtc.SessionTerminatedEvent(session=session_info.id, reason=reason)) - self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason)) - self._cleanup_session(session_info) - elif event_type in ('media', 'detached'): - # ignore - pass - elif event_type == 'slowlink': + def _EH_janus_sip(self, event): + if isinstance(event, janus.PluginEvent): + event_id = event.plugindata.data.__id__ try: - session_info = self.sip_sessions[handle_id] - except KeyError: - self.log.warning('could not find session for handle ID %s' % handle_id) - return + handler = getattr(self, '_EH_janus_' + '_'.join(event_id)) + except AttributeError: + self.log.warning('unhandled Janus SIP event: {event_name}'.format(event_name=event_id[-1])) + else: + self.log.debug('janus SIP event: {event_name} (handle_id={event.sender})'.format(event=event, event_name=event_id[-1])) + handler(event) + else: # janus.CoreEvent try: - uplink = data['event']['uplink'] - except KeyError: - self.log.warning('could not find uplink in slowlink event data') - return - if uplink: # uplink is from janus' point of view - if not session_info.slow_download: - self.log.info('poor download connectivity for session {session.id}'.format(session=session_info)) - session_info.slow_download = True + handler = getattr(self, '_EH_janus_sip_' + event.janus) + except AttributeError: + self.log.warning('unhandled Janus SIP event: {event.janus}'.format(event=event)) else: - if not session_info.slow_upload: - self.log.info('poor upload connectivity for session {session.id}'.format(session=session_info)) - session_info.slow_upload = True + self.log.debug('janus SIP event: {event.janus} (handle_id={event.sender})'.format(event=event)) + handler(event) + + def _EH_janus_sip_error(self, event): + # fixme: implement error handling + self.log.error('got SIP error event: {}'.format(event.__data__)) + handle_id = event.sender + if handle_id in self.sip_sessions: + pass # this is a session related event + elif handle_id in self.account_handles_map: + pass # this is an account related event + + def _EH_janus_sip_webrtcup(self, event): + try: + session_info = self.sip_sessions[event.sender] + except KeyError: + self.log.warning('could not find SIP session with handle ID {event.sender} for webrtcup event'.format(event=event)) + return + session_info.state = 'established' + self.send(sylkrtc.SessionEstablishedEvent(session=session_info.id)) + self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) + self.log.info('established WEBRTC connection for session {session.id}'.format(session=session_info)) + + def _EH_janus_sip_hangup(self, event): + try: + session_info = self.sip_sessions[event.sender] + except KeyError: + return + if session_info.state != 'terminated': + session_info.state = 'terminated' + reason = event.reason or 'unspecified reason' + self.send(sylkrtc.SessionTerminatedEvent(session=session_info.id, reason=reason)) + self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason)) + self._cleanup_session(session_info) + + def _EH_janus_sip_slowlink(self, event): + try: + session_info = self.sip_sessions[event.sender] + except KeyError: + self.log.warning('could not find SIP session with handle ID {event.sender} for slowlink event'.format(event=event)) + return + if event.uplink: # uplink is from janus' point of view + if not session_info.slow_download: + self.log.info('poor download connectivity for session {session.id}'.format(session=session_info)) + session_info.slow_download = True else: - self.log.warning('received unexpected event type: %s' % event_type) - - def _janus_event_plugin_sip(self, data): - handle_id = data['handle_id'] - event = data['event'] - plugin_data = event['plugindata'] - assert plugin_data['plugin'] == 'janus.plugin.sip' - event_data = plugin_data['data'] - assert event_data.get('sip') == 'event' - - if 'result' not in event_data: - self.log.warning('unexpected event: %s' % event) + if not session_info.slow_upload: + self.log.info('poor upload connectivity for session {session.id}'.format(session=session_info)) + session_info.slow_upload = True + + def _EH_janus_sip_media(self, event): + pass + + def _EH_janus_sip_detached(self, event): + pass + + def _EH_janus_sip_event_registering(self, event): + try: + account_info = self.account_handles_map[event.sender] + except KeyError: + self.log.warning('could not find account with handle ID {event.sender} for registering event'.format(event=event)) return + if account_info.registration_state != 'registering': + account_info.registration_state = 'registering' + self.send(sylkrtc.AccountRegisteringEvent(account=account_info.id)) - event_data = event_data['result'] - jsep = event.get('jsep', None) - event_type = event_data['event'] - - # if event_type == 'registering': - # account_info = self.account_handles_map.get(handle_id) - # if account_info is None: - # self.log.warning('could not find account for handle ID %s' % handle_id) - # elif account_info.registration_state != event_type: - # account_info.registration_state = event_type - # self.send(sylkrtc.AccountRegisteringEvent(account=account_info.id)) - if event_type == 'registering': - try: - account_info = self.account_handles_map[handle_id] - except KeyError: - self.log.warning('could not find account for handle ID %s' % handle_id) - return - if account_info.registration_state != event_type: - account_info.registration_state = event_type - self.send(sylkrtc.AccountRegisteringEvent(account=account_info.id)) - elif event_type == 'registered': - if event_data['register_sent'] in (False, 'false'): # skip 'registered' events from outgoing session handles - return - try: - account_info = self.account_handles_map[handle_id] - except KeyError: - self.log.warning('could not find account for handle ID %s' % handle_id) - return - if account_info.registration_state != event_type: - account_info.registration_state = event_type - self.send(sylkrtc.AccountRegisteredEvent(account=account_info.id)) - self.log.info('registered {account.id} to receive incoming calls'.format(account=account_info)) - elif event_type == 'registration_failed': - try: - account_info = self.account_handles_map[handle_id] - except KeyError: - self.log.warning('could not find account for handle ID %s' % handle_id) - return - if account_info.registration_state != 'failed': - account_info.registration_state = 'failed' - reason = '{0[code]} {0[reason]}'.format(event_data) - self.send(sylkrtc.AccountRegistrationFailedEvent(account=account_info.id, reason=reason)) - self.log.info('registration for {account.id} failed: {reason}'.format(account=account_info, reason=reason)) - elif event_type == 'incomingcall': - try: - account_info = self.account_handles_map[handle_id] - except KeyError: - self.log.warning('could not find account for handle ID %s' % handle_id) - return - jsep = event.get('jsep', None) - assert jsep is not None - originator = sylkrtc.SIPIdentity(uri=SIP_PREFIX_RE.sub('', event_data['username']), display_name=event_data.get('displayname', '').replace('"', '')) - session = SIPSessionInfo(uuid.uuid4().hex) - session.janus_handle_id = handle_id - session.init_incoming(account_info.id, originator.uri, originator.display_name) - self.sip_sessions.add(session) - self.send(sylkrtc.AccountIncomingSessionEvent(account=account_info.id, session=session.id, originator=originator, sdp=jsep['sdp'])) - self.log.info('received incoming session {session.id} from {session.remote_identity.uri!s} to {session.local_identity.uri!s}'.format(session=session)) - elif event_type == 'missed_call': - try: - account_info = self.account_handles_map[handle_id] - except KeyError: - self.log.warning('could not find account for handle ID %s' % handle_id) - return - originator = sylkrtc.SIPIdentity(uri=SIP_PREFIX_RE.sub('', event_data['caller']), display_name=event_data.get('displayname', '').replace('"', '')) - self.send(sylkrtc.AccountMissedSessionEvent(account=account_info.id, originator=originator)) - self.log.info('missed incoming call from {originator.uri}'.format(originator=originator)) - elif event_type == 'calling': - try: - session_info = self.sip_sessions[handle_id] - except KeyError: - self.log.warning('could not find session for handle ID %s' % handle_id) - return - session_info.state = 'progress' - self.send(sylkrtc.SessionProgressEvent(session=session_info.id)) - self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) - elif event_type == 'accepted': - try: - session_info = self.sip_sessions[handle_id] - except KeyError: - self.log.warning('could not find session for handle ID %s' % handle_id) - return - session_info.state = 'accepted' - event = sylkrtc.SessionAcceptedEvent(session=session_info.id) - if session_info.direction == 'outgoing': - assert jsep is not None - event.sdp = jsep['sdp'] - self.send(event) - self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) - elif event_type == 'hangup': - try: - session_info = self.sip_sessions[handle_id] - except KeyError: - return - if session_info.state != 'terminated': - session_info.state = 'terminated' - code = event_data.get('code', 0) - reason = '%d %s' % (code, event_data.get('reason', 'Unknown')) - self.send(sylkrtc.SessionTerminatedEvent(session=session_info.id, reason=reason)) - if session_info.direction == 'incoming' and code == 487: # incoming call was cancelled -> missed - self.send(sylkrtc.AccountMissedSessionEvent(account=session_info.account_id, originator=session_info.remote_identity.__dict__)) - if code >= 300: - self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason)) - else: - self.log.info('{session.direction} session {session.id} terminated'.format(session=session_info)) - self._cleanup_session(session_info) - elif event_type in ('ack', 'declining', 'hangingup', 'proceeding'): - pass # ignore + def _EH_janus_sip_event_registered(self, event): + if event.sender in self.sip_sessions: # skip 'registered' events from outgoing session handles + return + try: + account_info = self.account_handles_map[event.sender] + except KeyError: + self.log.warning('could not find account with handle ID {event.sender} for registered event'.format(event=event)) + return + if account_info.registration_state != 'registered': + account_info.registration_state = 'registered' + self.send(sylkrtc.AccountRegisteredEvent(account=account_info.id)) + self.log.info('registered {account.id} to receive incoming calls'.format(account=account_info)) + + def _EH_janus_sip_event_registration_failed(self, event): + try: + account_info = self.account_handles_map[event.sender] + except KeyError: + self.log.warning('could not find account with handle ID {event.sender} for registration failed event'.format(event=event)) + return + if account_info.registration_state != 'failed': + account_info.registration_state = 'failed' + reason = '{result.code} {result.reason}'.format(result=event.plugindata.data.result) + self.send(sylkrtc.AccountRegistrationFailedEvent(account=account_info.id, reason=reason)) + self.log.info('registration for {account.id} failed: {reason}'.format(account=account_info, reason=reason)) + + def _EH_janus_sip_event_incomingcall(self, event): + try: + account_info = self.account_handles_map[event.sender] + except KeyError: + self.log.warning('could not find account with handle ID {event.sender} for incoming call event'.format(event=event)) + return + assert event.jsep is not None + data = event.plugindata.data.result # type: janus.SIPResultIncomingCall + originator = sylkrtc.SIPIdentity(uri=data.username, display_name=data.displayname) + session = SIPSessionInfo(uuid.uuid4().hex) + session.janus_handle = account_info.janus_handle + session.init_incoming(account_info.id, originator.uri, originator.display_name) + self.sip_sessions.add(session) + self.send(sylkrtc.AccountIncomingSessionEvent(account=account_info.id, session=session.id, originator=originator, sdp=event.jsep.sdp)) + self.log.info('received incoming session {session.id} from {session.remote_identity.uri!s} to {session.local_identity.uri!s}'.format(session=session)) + + def _EH_janus_sip_event_missed_call(self, event): + try: + account_info = self.account_handles_map[event.sender] + except KeyError: + self.log.warning('could not find account with handle ID {event.sender} for missed call event'.format(event=event)) + return + data = event.plugindata.data.result # type: janus.SIPResultMissedCall + originator = sylkrtc.SIPIdentity(uri=data.caller, display_name=data.displayname) + self.send(sylkrtc.AccountMissedSessionEvent(account=account_info.id, originator=originator)) + self.log.info('missed incoming call from {originator.uri}'.format(originator=originator)) + + def _EH_janus_sip_event_calling(self, event): + try: + session_info = self.sip_sessions[event.sender] + except KeyError: + self.log.warning('could not find SIP session with handle ID {event.sender} for calling event'.format(event=event)) + return + session_info.state = 'progress' + self.send(sylkrtc.SessionProgressEvent(session=session_info.id)) + self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) + + def _EH_janus_sip_event_accepted(self, event): + try: + session_info = self.sip_sessions[event.sender] + except KeyError: + self.log.warning('could not find SIP session with handle ID {event.sender} for accepted event'.format(event=event)) + return + session_info.state = 'accepted' + if session_info.direction == 'outgoing': + assert event.jsep is not None + self.send(sylkrtc.SessionAcceptedEvent(session=session_info.id, sdp=event.jsep.sdp)) else: - self.log.warning('unexpected SIP plugin event type: %s' % event_type) + self.send(sylkrtc.SessionAcceptedEvent(session=session_info.id)) + self.log.debug('{session.direction} session {session.id} state: {session.state}'.format(session=session_info)) - def _EH_janus_event_videoroom(self, data): - handle_id = data['handle_id'] - event_type = data['event_type'] + def _EH_janus_sip_event_hangup(self, event): + try: + session_info = self.sip_sessions[event.sender] + except KeyError: + self.log.warning('could not find SIP session with handle ID {event.sender} for hangup event'.format(event=event)) + return + if session_info.state != 'terminated': + session_info.state = 'terminated' + data = event.plugindata.data.result # type: janus.SIPResultHangup + reason = '{0.code} {0.reason}'.format(data) + self.send(sylkrtc.SessionTerminatedEvent(session=session_info.id, reason=reason)) + if session_info.direction == 'incoming' and data.code == 487: # incoming call was cancelled -> missed + self.send(sylkrtc.AccountMissedSessionEvent(account=session_info.account_id, originator=session_info.remote_identity.__dict__)) + if data.code >= 300: + self.log.info('{session.direction} session {session.id} terminated ({reason})'.format(session=session_info, reason=reason)) + else: + self.log.info('{session.direction} session {session.id} terminated'.format(session=session_info)) + self._cleanup_session(session_info) + + def _EH_janus_sip_event_declining(self, event): + pass + + def _EH_janus_sip_event_hangingup(self, event): + pass - if event_type == 'event': - self._janus_event_plugin_videoroom(data) - elif event_type == 'webrtcup': + def _EH_janus_sip_event_proceeding(self, event): + pass + + def _EH_janus_videoroom(self, event): + if isinstance(event, janus.PluginEvent): + event_id = event.plugindata.data.__id__ try: - videoroom_session = self.videoroom_sessions[handle_id] - except KeyError: - self.log.warning('could not find video room session for handle ID %s during webrtcup event' % handle_id) - return - if videoroom_session.type == 'publisher': - self.log.info('established WEBRTC connection for session {session.id}'.format(session=videoroom_session)) - self.send(sylkrtc.VideoroomSessionEstablishedEvent(session=videoroom_session.id)) + handler = getattr(self, '_EH_janus_' + '_'.join(event_id)) + except AttributeError: + self.log.warning('unhandled Janus videoroom event: {event_name}'.format(event_name=event_id[-1])) else: - self.send(sylkrtc.VideoroomFeedEstablishedEvent(session=videoroom_session.parent_session.id, feed=videoroom_session.id)) - elif event_type == 'hangup': + self.log.debug('janus videoroom event: {event_name} (handle_id={event.sender})'.format(event=event, event_name=event_id[-1])) + handler(event) + else: # janus.CoreEvent try: - videoroom_session = self.videoroom_sessions[handle_id] - except KeyError: - return - self._cleanup_videoroom_session(videoroom_session) - elif event_type in ('media', 'detached'): - pass - elif event_type == 'slowlink': - try: - videoroom_session = self.videoroom_sessions[handle_id] - except KeyError: - self.log.warning('could not find video room session for handle ID %s during slowlink event' % handle_id) - return - try: - uplink = data['event']['uplink'] - except KeyError: - self.log.error('could not find uplink in slowlink event data') - return - if uplink: # uplink is from janus' point of view - if not videoroom_session.slow_download: - self.log.info('poor download connectivity to video room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) - videoroom_session.slow_download = True + handler = getattr(self, '_EH_janus_videoroom_' + event.janus) + except AttributeError: + self.log.warning('unhandled Janus videoroom event: {event.janus}'.format(event=event)) else: - if not videoroom_session.slow_upload: - self.log.info('poor upload connectivity to video room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) - videoroom_session.slow_upload = True + self.log.debug('janus videoroom event: {event.janus} (handle_id={event.sender})'.format(event=event)) + handler(event) + + def _EH_janus_videoroom_error(self, event): + # fixme: implement error handling + self.log.error('got videoroom error event: {}'.format(event.__data__)) + try: + videoroom_session = self.videoroom_sessions[event.sender] + except KeyError: + self.log.warning('could not find video room session with handle ID {event.sender} for error event'.format(event=event)) + return + if videoroom_session.type == 'publisher': + pass + else: + pass + + def _EH_janus_videoroom_webrtcup(self, event): + try: + videoroom_session = self.videoroom_sessions[event.sender] + except KeyError: + self.log.warning('could not find video room session with handle ID {event.sender} for webrtcup event'.format(event=event)) + return + if videoroom_session.type == 'publisher': + self.log.info('established WEBRTC connection for session {session.id}'.format(session=videoroom_session)) + self.send(sylkrtc.VideoroomSessionEstablishedEvent(session=videoroom_session.id)) + else: + self.send(sylkrtc.VideoroomFeedEstablishedEvent(session=videoroom_session.parent_session.id, feed=videoroom_session.id)) + + def _EH_janus_videoroom_hangup(self, event): + try: + videoroom_session = self.videoroom_sessions[event.sender] + except KeyError: + return + self._cleanup_videoroom_session(videoroom_session) + + def _EH_janus_videoroom_slowlink(self, event): + try: + videoroom_session = self.videoroom_sessions[event.sender] + except KeyError: + self.log.warning('could not find video room session with handle ID {event.sender} for slowlink event'.format(event=event)) + return + if event.uplink: # uplink is from janus' point of view + if not videoroom_session.slow_download: + self.log.info('poor download connectivity to video room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) + videoroom_session.slow_download = True else: - self.log.warning('received unexpected event type %s: data=%s' % (event_type, data)) - - def _janus_event_plugin_videoroom(self, data): - handle_id = data['handle_id'] - event = data['event'] - plugin_data = event['plugindata'] - assert(plugin_data['plugin'] == 'janus.plugin.videoroom') - event_data = event['plugindata']['data'] - assert 'videoroom' in event_data - - event_type = event_data['videoroom'] - if event_type == 'joined': - # a join request succeeded, this is a publisher + if not videoroom_session.slow_upload: + self.log.info('poor upload connectivity to video room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) + videoroom_session.slow_upload = True + + def _EH_janus_videoroom_media(self, event): + pass + + def _EH_janus_videoroom_detached(self, event): + pass + + def _EH_janus_videoroom_joined(self, event): + # send when a publisher successfully joined a room + try: + videoroom_session = self.videoroom_sessions[event.sender] + except KeyError: + self.log.warning('could not find video room session with handle ID {event.sender} for joined event'.format(event=event)) + return + self.log.info('joined video room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) + data = event.plugindata.data # type: janus.VideoroomJoined + videoroom_session.publisher_id = data.id + room = videoroom_session.room + assert event.jsep is not None + self.send(sylkrtc.VideoroomSessionAcceptedEvent(session=videoroom_session.id, sdp=event.jsep.sdp)) + # send information about existing publishers + publishers = [] + for publisher in data.publishers: # type: janus.VideoroomPublisher try: - videoroom_session = self.videoroom_sessions[handle_id] + publisher_session = room[publisher.id] except KeyError: - self.log.warning('could not find video room session for handle ID %s during joined event' % handle_id) - return - self.log.info('joined video room {session.room.uri} with session {session.id}'.format(session=videoroom_session)) - videoroom_session.publisher_id = event_data['id'] - room = videoroom_session.room - jsep = event.get('jsep', None) - assert jsep is not None - self.send(sylkrtc.VideoroomSessionAcceptedEvent(session=videoroom_session.id, sdp=jsep['sdp'])) - # send information about existing publishers - publishers = [] - for publisher in event_data['publishers']: - publisher_id = publisher['id'] - try: - publisher_session = room[publisher_id] - except KeyError: - self.log.warning('could not find matching session for publisher %s during joined event' % publisher_id) - else: - publishers.append(dict(id=publisher_session.id, uri=publisher_session.account_id, display_name=publisher.get('display', ''))) - self.send(sylkrtc.VideoroomInitialPublishersEvent(session=videoroom_session.id, publishers=publishers)) - room.add(videoroom_session) # adding the session to the room might also trigger sending an event with the active participants which must be sent last - elif event_type == 'event': - if 'publishers' in event_data: - try: - videoroom_session = self.videoroom_sessions[handle_id] - except KeyError: - self.log.warning('could not find video room session for handle ID %s during publishers event' % handle_id) - return - room = videoroom_session.room - # send information about new publishers - publishers = [] - for publisher in event_data['publishers']: - publisher_id = publisher['id'] - try: - publisher_session = room[publisher_id] - except KeyError: - self.log.warning('could not find matching session for publisher %s during publishers event' % publisher_id) - continue - publishers.append(dict(id=publisher_session.id, uri=publisher_session.account_id, display_name=publisher.get('display', ''))) - self.send(sylkrtc.VideoroomPublishersJoinedEvent(session=videoroom_session.id, publishers=publishers)) - elif 'leaving' in event_data: - janus_publisher_id = event_data['leaving'] # janus_publisher_id == 'ok' when the event is about ourselves leaving the room - try: - base_session = self.videoroom_sessions[handle_id] - except KeyError: - if janus_publisher_id != 'ok': - self.log.warning('could not find video room session for handle ID %s during leaving event' % handle_id) - return - if janus_publisher_id == 'ok': - self.log.info('left video room {session.room.uri} with session {session.id}'.format(session=base_session)) - self._cleanup_videoroom_session(base_session) - return - try: - publisher_session = base_session.feeds.pop(janus_publisher_id) - except KeyError: - return - self.send(sylkrtc.VideoroomPublishersLeftEvent(session=base_session.id, publishers=[publisher_session.id])) - elif {'started', 'unpublished', 'left', 'configured'}.intersection(event_data): - pass + self.log.warning('could not find matching session for publisher {publisher.id} during joined event'.format(publisher=publisher)) else: - self.log.warning('received unexpected video room plugin event: type={} data={}'.format(event_type, event_data)) - elif event_type == 'attached': - # sent when a feed is subscribed for a given publisher + publishers.append(dict(id=publisher_session.id, uri=publisher_session.account_id, display_name=publisher.display or '')) + self.send(sylkrtc.VideoroomInitialPublishersEvent(session=videoroom_session.id, publishers=publishers)) + room.add(videoroom_session) # adding the session to the room might also trigger sending an event with the active participants which must be sent last + + def _EH_janus_videoroom_attached(self, event): + # sent when a feed is subscribed for a given publisher + try: + videoroom_session = self.videoroom_sessions[event.sender] + except KeyError: + self.log.warning('could not find video room session with handle ID {event.sender} for attached event'.format(event=event)) + return + # get the session which originated the subscription + base_session = videoroom_session.parent_session + assert base_session is not None + assert event.jsep is not None and event.jsep.type == 'offer' + self.send(sylkrtc.VideoroomFeedAttachedEvent(session=base_session.id, feed=videoroom_session.id, sdp=event.jsep.sdp)) + + def _EH_janus_videoroom_slow_link(self, event): + pass + + def _EH_janus_videoroom_event_publishers(self, event): + try: + videoroom_session = self.videoroom_sessions[event.sender] + except KeyError: + self.log.warning('could not find video room session with handle ID {event.sender} for publishers event'.format(event=event)) + return + room = videoroom_session.room + # send information about new publishers + publishers = [] + for publisher in event.plugindata.data.publishers: # type: janus.VideoroomPublisher try: - videoroom_session = self.videoroom_sessions[handle_id] + publisher_session = room[publisher.id] except KeyError: - self.log.warning('could not find video room session for handle ID %s during attached event' % handle_id) - return - # get the session which originated the subscription - base_session = videoroom_session.parent_session - jsep = event.get('jsep', None) - assert base_session is not None - assert jsep is not None - assert jsep['type'] == 'offer' - self.send(sylkrtc.VideoroomFeedAttachedEvent(session=base_session.id, feed=videoroom_session.id, sdp=jsep['sdp'])) - elif event_type == 'slow_link': - pass - else: - self.log.warning('received unexpected video room plugin event: type={} data={}'.format(event_type, event_data)) + self.log.warning('could not find matching session for publisher {publisher.id} during publishers event'.format(publisher=publisher)) + continue + publishers.append(dict(id=publisher_session.id, uri=publisher_session.account_id, display_name=publisher.display or '')) + self.send(sylkrtc.VideoroomPublishersJoinedEvent(session=videoroom_session.id, publishers=publishers)) + + def _EH_janus_videoroom_event_leaving(self, event): + # this is a publisher + publisher_id = event.plugindata.data.leaving # publisher_id == 'ok' when the event is about ourselves leaving the room, else the publisher's janus ID + try: + base_session = self.videoroom_sessions[event.sender] + except KeyError: + if publisher_id != 'ok': + self.log.warning('could not find video room session with handle ID {event.sender} for leaving event'.format(event=event)) + return + if publisher_id == 'ok': + self.log.info('left video room {session.room.uri} with session {session.id}'.format(session=base_session)) + self._cleanup_videoroom_session(base_session) + return + try: + publisher_session = base_session.feeds.pop(publisher_id) + except KeyError: + return + self.send(sylkrtc.VideoroomPublishersLeftEvent(session=base_session.id, publishers=[publisher_session.id])) + + def _EH_janus_videoroom_event_left(self, event): + # this is a subscriber + pass + + def _EH_janus_videoroom_event_configured(self, event): + pass + + def _EH_janus_videoroom_event_started(self, event): + pass + + def _EH_janus_videoroom_event_unpublished(self, event): + pass diff --git a/sylk/applications/webrtcgateway/janus.py b/sylk/applications/webrtcgateway/janus.py index 4dcd917..82ed832 100644 --- a/sylk/applications/webrtcgateway/janus.py +++ b/sylk/applications/webrtcgateway/janus.py @@ -1,265 +1,356 @@ import json -import uuid from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from autobahn.twisted.websocket import connectWS, WebSocketClientFactory, WebSocketClientProtocol +from eventlib.twistedutil import block_on from twisted.internet import reactor, defer from twisted.internet.protocol import ReconnectingClientFactory from twisted.python.failure import Failure from zope.interface import implements -from sylk import __version__ as SYLK_VERSION -from sylk.applications.webrtcgateway.configuration import JanusConfig -from sylk.applications.webrtcgateway.logger import log - - -class Request(object): - def __init__(self, request_type, **kw): - self.janus = request_type - self.transaction = uuid.uuid4().hex - if JanusConfig.api_secret: - self.apisecret = JanusConfig.api_secret - conflicting_keywords = set(self.__dict__).intersection(kw) - if conflicting_keywords: - raise KeyError('request specified keyword arguments that are already in use: {}'.format(', '.join(sorted(conflicting_keywords)))) - self.__dict__.update(**kw) - - @property - def type(self): - return self.janus - - @property - def transaction_id(self): - return self.transaction - - def as_dict(self): - return self.__dict__.copy() +from sylk import __version__ +from .configuration import JanusConfig +from .logger import log +from .models import janus class JanusError(Exception): def __init__(self, code, reason): super(JanusError, self).__init__(reason) self.code = code self.reason = reason class JanusClientProtocol(WebSocketClientProtocol): _event_handlers = None _pending_transactions = None _keepalive_timers = None _keepalive_interval = 45 notification_center = NotificationCenter() def onOpen(self): self.notification_center.post_notification('JanusBackendConnected', sender=self) self._pending_transactions = {} self._keepalive_timers = {} self._event_handlers = {} def onMessage(self, payload, isBinary): if isBinary: log.warn('Unexpected binary payload received') return + self.notification_center.post_notification('WebRTCJanusTrace', sender=self, data=NotificationData(direction='INCOMING', message=payload, peer=self.peer)) + try: - data = json.loads(payload) + message = janus.JanusMessage.from_payload(json.loads(payload)) except Exception as e: - log.warn('Error decoding payload: %s' % e) - return - try: - message_type = data.pop('janus') - except KeyError: - log.warn('Received payload lacks message type: %s' % payload) + log.warning('Error decoding Janus message: {!s}'.format(e)) return - transaction_id = data.pop('transaction', None) - if message_type == 'event' or transaction_id is None: - # This is an event. Janus is not very consistent here, some 'events' - # do have the transaction id set. So we check for the message type as well. - handle_id = data.pop('sender', -1) - handler = self._event_handlers.get(handle_id, Null) + + if isinstance(message, (janus.CoreEvent, janus.PluginEvent)): + # some of the plugin events might have the transaction, but we do not finalize + # the transaction for them as they are not direct responses for the transaction + handler = self._event_handlers.get(message.sender, Null) try: - handler(handle_id, message_type, data) - except Exception: - log.exception() + handler(message) + except Exception as e: + log.exception('Error while running Janus event handler: {!s}'.format(e)) return + + # at this point it can only be a response. clear the transaction and return the answer. try: - request, deferred = self._pending_transactions.pop(transaction_id) + request, deferred = self._pending_transactions.pop(message.transaction) except KeyError: log.warn('Discarding unexpected response: %s' % payload) return - # events were handled above, so the only message types we get here are ack, success and error - # todo: some plugin errors are delivered with message_type == 'success' and the error code is buried somewhere in plugindata - if message_type == 'error': - code = data['error']['code'] - reason = data['error']['reason'] - deferred.errback(JanusError(code, reason)) - elif message_type == 'ack': + + if isinstance(message, janus.AckResponse): deferred.callback(None) - else: # success - # keepalive and trickle only receive an ACK, thus are handled above in message_type == 'ack', not here - if request.type in ('create', 'attach'): - result = data['data']['id'] - elif request.type in ('destroy', 'detach'): - result = None - else: # info, message (for synchronous message requests only) - result = data - deferred.callback(result) + elif isinstance(message, janus.SuccessResponse): + deferred.callback(message) + elif isinstance(message, janus.ErrorResponse): + deferred.errback(JanusError(message.error.code, message.error.reason)) + else: + assert isinstance(message, janus.PluginResponse) + plugin_data = message.plugindata.data + if isinstance(plugin_data, (janus.SIPErrorEvent, janus.VideoroomErrorEvent)): + deferred.errback(JanusError(plugin_data.error_code, plugin_data.error)) + else: + deferred.callback(message) def connectionLost(self, reason): super(JanusClientProtocol, self).connectionLost(reason) self.notification_center.post_notification('JanusBackendDisconnected', sender=self, data=NotificationData(reason=reason.getErrorMessage())) def disconnect(self, code=1000, reason=u''): self.sendClose(code, reason) def _send_request(self, request): + if request.janus != 'keepalive' and 'session_id' in request: # postpone keepalive messages as long as we have non-keepalive traffic for a given session + keepalive_timer = self._keepalive_timers.get(request.session_id, None) + if keepalive_timer is not None and keepalive_timer.active(): + keepalive_timer.reset(self._keepalive_interval) deferred = defer.Deferred() - data = json.dumps(request.as_dict()) - if request.type != 'keepalive' and 'session_id' in data: # postpone keepalive messages as long as we have non-keepalive traffic for a given session - keepalive_timer = self._keepalive_timers.get(request.session_id, Null) - keepalive_timer.reset(self._keepalive_interval) - self.notification_center.post_notification('WebRTCJanusTrace', sender=self, data=NotificationData(direction='OUTGOING', message=data, peer=self.peer)) - self.sendMessage(data) - self._pending_transactions[request.transaction_id] = (request, deferred) + message = json.dumps(request.__data__) + self.notification_center.post_notification('WebRTCJanusTrace', sender=self, data=NotificationData(direction='OUTGOING', message=message, peer=self.peer)) + self.sendMessage(message) + self._pending_transactions[request.transaction] = request, deferred return deferred - def _start_keepalive(self, session_id): + def _start_keepalive(self, response): + session_id = response.data.id self._keepalive_timers[session_id] = reactor.callLater(self._keepalive_interval, self._send_keepalive, session_id) - return session_id + return response def _stop_keepalive(self, session_id): timer = self._keepalive_timers.pop(session_id, None) if timer is not None and timer.active(): timer.cancel() def _send_keepalive(self, session_id): - deferred = self._send_request(Request('keepalive', session_id=session_id)) + deferred = self._send_request(janus.SessionKeepaliveRequest(session_id=session_id)) deferred.addBoth(self._keepalive_callback, session_id) def _keepalive_callback(self, result, session_id): if isinstance(result, Failure): self._keepalive_timers.pop(session_id) else: self._keepalive_timers[session_id] = reactor.callLater(self._keepalive_interval, self._send_keepalive, session_id) # Public API def set_event_handler(self, handle_id, event_handler): if event_handler is None: self._event_handlers.pop(handle_id, None) else: assert callable(event_handler) self._event_handlers[handle_id] = event_handler def info(self): - return self._send_request(Request('info')) + return self._send_request(janus.InfoRequest()) def create_session(self): - return self._send_request(Request('create')).addCallback(self._start_keepalive) + return self._send_request(janus.SessionCreateRequest()).addCallback(self._start_keepalive) def destroy_session(self, session_id): self._stop_keepalive(session_id) - return self._send_request(Request('destroy', session_id=session_id)) + return self._send_request(janus.SessionDestroyRequest(session_id=session_id)) - def attach(self, session_id, plugin): - return self._send_request(Request('attach', session_id=session_id, plugin=plugin)) + def attach_plugin(self, session_id, plugin): + return self._send_request(janus.PluginAttachRequest(session_id=session_id, plugin=plugin)) - def detach(self, session_id, handle_id): - return self._send_request(Request('detach', session_id=session_id, handle_id=handle_id)) + def detach_plugin(self, session_id, handle_id): + return self._send_request(janus.PluginDetachRequest(session_id=session_id, handle_id=handle_id)) def message(self, session_id, handle_id, body, jsep=None): - extra_kw = {} if jsep is None else {'jsep': jsep} - return self._send_request(Request('message', session_id=session_id, handle_id=handle_id, body=body, **extra_kw)) + if jsep is not None: + return self._send_request(janus.MessageRequest(session_id=session_id, handle_id=handle_id, body=body, jsep=jsep)) + else: + return self._send_request(janus.MessageRequest(session_id=session_id, handle_id=handle_id, body=body)) def trickle(self, session_id, handle_id, candidates): - if candidates: - candidates_kw = {'candidate': candidates[0]} if len(candidates) == 1 else {'candidates': candidates} # todo: why handle single candidate differently? - else: - candidates_kw = {'candidate': {'completed': True}} - return self._send_request(Request('trickle', session_id=session_id, handle_id=handle_id, **candidates_kw)) + return self._send_request(janus.TrickleRequest(session_id=session_id, handle_id=handle_id, candidates=candidates)) class JanusClientFactory(ReconnectingClientFactory, WebSocketClientFactory): noisy = False protocol = JanusClientProtocol class JanusBackend(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): - self.factory = JanusClientFactory(url=JanusConfig.api_url, protocols=['janus-protocol'], useragent='SylkServer/%s' % SYLK_VERSION) + self.factory = JanusClientFactory(url=JanusConfig.api_url, protocols=['janus-protocol'], useragent='SylkServer/%s' % __version__) self.connector = None self.protocol = Null self._stopped = False @property def ready(self): return self.protocol is not Null def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='JanusBackendConnected') notification_center.add_observer(self, name='JanusBackendDisconnected') self.connector = connectWS(self.factory) def stop(self): if self._stopped: return self._stopped = True self.factory.stopTrying() notification_center = NotificationCenter() notification_center.discard_observer(self, name='JanusBackendConnected') notification_center.discard_observer(self, name='JanusBackendDisconnected') if self.connector is not None: self.connector.disconnect() self.connector = None if self.protocol is not None: self.protocol.disconnect() self.protocol = Null def set_event_handler(self, handle_id, event_handler): self.protocol.set_event_handler(handle_id, event_handler) def info(self): return self.protocol.info() def create_session(self): return self.protocol.create_session() def destroy_session(self, session_id): return self.protocol.destroy_session(session_id) - def attach(self, session_id, plugin): - return self.protocol.attach(session_id, plugin) + def attach_plugin(self, session_id, plugin): + return self.protocol.attach_plugin(session_id, plugin) - def detach(self, session_id, handle_id): - return self.protocol.detach(session_id, handle_id) + def detach_plugin(self, session_id, handle_id): + return self.protocol.detach_plugin(session_id, handle_id) def message(self, session_id, handle_id, body, jsep=None): return self.protocol.message(session_id, handle_id, body, jsep) def trickle(self, session_id, handle_id, candidates): return self.protocol.trickle(session_id, handle_id, candidates) # Notification handling def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_JanusBackendConnected(self, notification): assert self.protocol is Null self.protocol = notification.sender log.info('Janus backend connection up') self.factory.resetDelay() def _NH_JanusBackendDisconnected(self, notification): log.info('Janus backend connection down: %s' % notification.data.reason) self.protocol = Null + + +class JanusSession(object): + backend = JanusBackend() + + def __init__(self): + response = block_on(self.backend.create_session()) # type: janus.SuccessResponse + self.id = response.data.id + + def destroy(self): + return self.backend.destroy_session(self.id) + + +class JanusPluginHandle(object): + backend = JanusBackend() + plugin = None + + def __init__(self, session, event_handler): + if self.plugin is None: + raise TypeError('Cannot instantiate {0.__class__.__name__} with no associated plugin'.format(self)) + response = block_on(self.backend.attach_plugin(session.id, self.plugin)) # type: janus.SuccessResponse + self.id = response.data.id + self.session = session + self.backend.set_event_handler(self.id, event_handler) + + def __enter__(self): + return self + + def __exit__(self, exception_type, exception_value, traceback): + self.detach() + + def detach(self): + try: + block_on(self.backend.detach_plugin(self.session.id, self.id)) + except JanusError as e: + log.warning('could not detach Janus plugin: %s', e) + self.backend.set_event_handler(self.id, None) + + def message(self, body, jsep=None, async=False): + deferred = self.backend.message(self.session.id, self.id, body, jsep) + return deferred if async else block_on(deferred) + + def trickle(self, candidates, async=False): + deferred = self.backend.trickle(self.session.id, self.id, candidates) + return deferred if async else block_on(deferred) + + +class GenericPluginHandle(JanusPluginHandle): + def __init__(self, plugin, session, event_handler): + self.plugin = plugin + super(GenericPluginHandle, self).__init__(session, event_handler) + + +class SIPPluginHandle(JanusPluginHandle): + plugin = 'janus.plugin.sip' + + def register(self, account, proxy=None): + self.message(janus.SIPRegister(proxy=proxy, **account.user_data)) + + def unregister(self): + self.message(janus.SIPUnregister()) + + def call(self, account, uri, sdp, proxy=None): + # in order to make a call we need to register first. do so without actually registering, as we are already registered + self.message(janus.SIPRegister(proxy=proxy, send_register=False, **account.user_data)) + self.message(janus.SIPCall(uri=uri, srtp='sdes_optional'), jsep=janus.SDPOffer(sdp=sdp)) + + def accept(self, sdp): + self.message(janus.SIPAccept(), jsep=janus.SDPAnswer(sdp=sdp)) + + def decline(self, code=486): + self.message(janus.SIPDecline(code=code)) + + def hangup(self): + self.message(janus.SIPHangup()) + + +class VideoroomPluginHandle(JanusPluginHandle): + plugin = 'janus.plugin.videoroom' + + def create(self, room, config, publishers=10): + self.message(janus.VideoroomCreate(room=room, publishers=publishers, **config.janus_data)) + + def destroy(self, room): + try: + self.message(janus.VideoroomDestroy(room=room)) + except JanusError as e: + log.warning('could not destroy video room %s: %s', room.id, e) + + def join(self, room, sdp, display_name=None): + if display_name: + self.message(janus.VideoroomJoin(room=room, display=display_name), jsep=janus.SDPOffer(sdp=sdp)) + else: + self.message(janus.VideoroomJoin(room=room), jsep=janus.SDPOffer(sdp=sdp)) + + def leave(self): + self.message(janus.VideoroomLeave()) + + def update_publisher(self, options): + self.message(janus.VideoroomUpdatePublisher(**options)) + + def feed_attach(self, room, feed): + self.message(janus.VideoroomFeedAttach(room=room, feed=feed)) + + def feed_detach(self): + self.message(janus.VideoroomFeedDetach()) + + def feed_start(self, sdp): + self.message(janus.VideoroomFeedStart(), jsep=janus.SDPAnswer(sdp=sdp)) + + def feed_pause(self): + self.message(janus.VideoroomFeedPause()) + + def feed_resume(self): + self.message(janus.VideoroomFeedStart()) + + def feed_update(self, options): + self.message(janus.VideoroomFeedUpdate(**options)) diff --git a/sylk/applications/webrtcgateway/models/janus.py b/sylk/applications/webrtcgateway/models/janus.py new file mode 100644 index 0000000..b699bc6 --- /dev/null +++ b/sylk/applications/webrtcgateway/models/janus.py @@ -0,0 +1,869 @@ + +import os + +from application.python import subclasses +from application.python.descriptor import classproperty +from binascii import b2a_hex as hex_encode +from typing import Union + +from ..configuration import JanusConfig +from .jsonobjects import AbstractProperty, BooleanProperty, IntegerProperty, StringProperty, ArrayProperty, ObjectProperty +from .jsonobjects import FixedValueProperty, LimitedChoiceProperty, AbstractObjectProperty, JSONObject, JSONArray +from .sylkrtc import ICECandidates +from .validators import URIValidator + + +# Base models (these are abstract and should not be used directly) + +class JanusRequestBase(JSONObject): + transaction = StringProperty() + apisecret = FixedValueProperty(JanusConfig.api_secret or None) + + def __init__(self, **kw): + if 'transaction' not in kw: + kw['transaction'] = hex_encode(os.urandom(16)) # uuid4().hex is really slow + super(JanusRequestBase, self).__init__(**kw) + + +class JanusCoreMessageBase(JSONObject): # base class for messages/ sent by the Janus core (responses and events) + pass + + +class JanusPluginMessageBase(JSONObject): # base class for messages sent by the Janus plugins (responses and events) + pass + + +class PluginDataBase(JSONObject): + pass + + +class SIPPluginData(PluginDataBase): + __plugin__ = 'sip' + + # noinspection PyMethodParameters,PyUnresolvedReferences + @classproperty + def __id__(cls): + if cls is SIPErrorEvent: + return cls.__plugin__, cls.sip.value, cls.error.name + else: + return cls.__plugin__, cls.sip.value, cls.result.object_type.event.value + + +class VideoroomPluginData(PluginDataBase): + __plugin__ = 'videoroom' + __event__ = None # for events, the name of the property that describes the event + + # noinspection PyMethodParameters,PyUnresolvedReferences + @classproperty + def __id__(cls): + if cls.__event__ is not None: + return cls.__plugin__, cls.videoroom.value, cls.__event__ + else: + return cls.__plugin__, cls.videoroom.value + + +class CoreResponse(JanusCoreMessageBase): + transaction = StringProperty() + + +class CoreEvent(JanusCoreMessageBase): + session_id = IntegerProperty() + sender = IntegerProperty() + + +# Miscellaneous models + +class NumericId(JSONObject): + id = IntegerProperty() + + +class UserId(JSONObject): + id = IntegerProperty() + display = StringProperty(optional=True) + + +class ErrorInfo(JSONObject): + code = IntegerProperty() + reason = StringProperty() + + +class PluginDataContainer(JSONObject): + plugin = StringProperty() + data = AbstractObjectProperty() # type: Union[SIPPluginData, VideoroomPluginData] + + +class SIPDataContainer(JSONObject): + sip = FixedValueProperty('event') + result = AbstractObjectProperty() + + +class SDPOffer(JSONObject): + type = FixedValueProperty('offer') + sdp = StringProperty() + + +class SDPAnswer(JSONObject): + type = FixedValueProperty('answer') + sdp = StringProperty() + + +class JSEP(JSONObject): + type = LimitedChoiceProperty(['offer', 'answer']) + sdp = StringProperty() + + +class VideoroomPublisher(JSONObject): + id = IntegerProperty() + display = StringProperty(optional=True) + audio_codec = StringProperty(optional=True) + video_codec = StringProperty(optional=True) + talking = BooleanProperty(optional=True) + + +class VideoroomPublishers(JSONArray): + item_type = VideoroomPublisher + + +# Janus requests + +class InfoRequest(JanusRequestBase): + janus = FixedValueProperty('info') + + +class SessionCreateRequest(JanusRequestBase): + janus = FixedValueProperty('create') + + +class SessionDestroyRequest(JanusRequestBase): + janus = FixedValueProperty('destroy') + session_id = IntegerProperty() + + +class SessionKeepaliveRequest(JanusRequestBase): + janus = FixedValueProperty('keepalive') + session_id = IntegerProperty() + + +class PluginAttachRequest(JanusRequestBase): + janus = FixedValueProperty('attach') + session_id = IntegerProperty() + plugin = StringProperty() + + +class PluginDetachRequest(JanusRequestBase): + janus = FixedValueProperty('detach') + session_id = IntegerProperty() + handle_id = IntegerProperty() + + +class MessageRequest(JanusRequestBase): + janus = FixedValueProperty('message') + session_id = IntegerProperty() + handle_id = IntegerProperty() + body = AbstractObjectProperty() + jsep = AbstractObjectProperty(optional=True) + + +class TrickleRequest(JanusRequestBase): + janus = FixedValueProperty('trickle') + session_id = IntegerProperty() + handle_id = IntegerProperty() + candidates = ArrayProperty(ICECandidates) # type: ICECandidates + + @property + def __data__(self): + data = super(TrickleRequest, self).__data__ + candidates = data.pop('candidates', []) + if len(candidates) == 0: + data['candidate'] = {'completed': True} + # data['candidate'] = None + elif len(candidates) == 1: + data['candidate'] = candidates[0] + else: + data['candidates'] = candidates + return data + + +# SIP plugin messages (to be used as body for MessageRequest for the SIP plugin messages) + +class SIPRegister(JSONObject): + request = FixedValueProperty('register') + username = StringProperty() # this is the account.uri which is already a valid URI + ha1_secret = StringProperty() + display_name = StringProperty(optional=True) + user_agent = StringProperty(optional=True) + proxy = StringProperty(optional=True) + send_register = BooleanProperty(optional=True) + + +class SIPUnregister(JSONObject): + request = FixedValueProperty('unregister') + + +class SIPCall(JSONObject): + request = FixedValueProperty('call') + uri = StringProperty(validator=URIValidator()) # this comes from the client request.uri which was validated as an AOR and we need a URI + srtp = LimitedChoiceProperty(['sdes_optional', 'sdes_mandatory'], optional=True) + + +class SIPAccept(JSONObject): + request = FixedValueProperty('accept') + + +class SIPDecline(JSONObject): + request = FixedValueProperty('decline') + code = IntegerProperty() + + +class SIPHangup(JSONObject): + request = FixedValueProperty('hangup') + + +# Videoroom plugin messages (to be used as body for MessageRequest for videoroom plugin messages) + +class VideoroomCreate(JSONObject): + request = FixedValueProperty('create') + room = IntegerProperty() + pin = StringProperty(optional=True) + description = StringProperty(optional=True) + publishers = IntegerProperty(optional=True, default=10) + videocodec = StringProperty(optional=True) # don't need a validator, as the value comes from the configuration and it's validated there + bitrate = IntegerProperty(optional=True) # don't need a validator, as the value comes from the configuration and it's validated there + record = BooleanProperty(optional=True, default=False) + rec_dir = StringProperty(optional=True) + + +class VideoroomDestroy(JSONObject): + request = FixedValueProperty('destroy') + room = IntegerProperty() + + +class VideoroomJoin(JSONObject): + request = FixedValueProperty('joinandconfigure') + ptype = FixedValueProperty('publisher') + room = IntegerProperty() + display = StringProperty(optional=True) + audio = BooleanProperty(optional=True, default=True) + video = BooleanProperty(optional=True, default=True) + bitrate = IntegerProperty(optional=True) + + +class VideoroomLeave(JSONObject): + request = FixedValueProperty('leave') + + +class VideoroomUpdatePublisher(JSONObject): + request = FixedValueProperty('configure') + audio = BooleanProperty(optional=True) + video = BooleanProperty(optional=True) + bitrate = IntegerProperty(optional=True) + + +class VideoroomFeedAttach(JSONObject): + request = FixedValueProperty('join') + ptype = FixedValueProperty('listener') + room = IntegerProperty() + feed = IntegerProperty() + + +class VideoroomFeedDetach(JSONObject): + request = FixedValueProperty('leave') + + +class VideoroomFeedStart(JSONObject): + request = FixedValueProperty('start') + + +class VideoroomFeedPause(JSONObject): + request = FixedValueProperty('pause') + + +class VideoroomFeedUpdate(JSONObject): + request = FixedValueProperty('configure') + audio = BooleanProperty(optional=True) + video = BooleanProperty(optional=True) + substream = IntegerProperty(optional=True) # for VP8 simulcast + temporal = IntegerProperty(optional=True) # for VP8 simulcast + spatial_layer = IntegerProperty(optional=True) # for VP9 SVC + temporal_layer = IntegerProperty(optional=True) # for VP9 SVC + + +# Janus core messages + +class AckResponse(CoreResponse): + janus = FixedValueProperty('ack') + session_id = IntegerProperty() + + +class ErrorResponse(CoreResponse): + janus = FixedValueProperty('error') + session_id = IntegerProperty(optional=True) # not used + error = ObjectProperty(ErrorInfo) # type: ErrorInfo + + +class SuccessResponse(CoreResponse): + janus = FixedValueProperty('success') + session_id = IntegerProperty(optional=True) # not used + data = ObjectProperty(NumericId, optional=True) # type: NumericId + + +class ServerInfoResponse(CoreResponse): + janus = FixedValueProperty('server_info') + name = StringProperty() + version = IntegerProperty() + version_string = StringProperty() + data_channels = BooleanProperty() + + +class DetachedEvent(CoreEvent): + janus = FixedValueProperty('detached') + + +class HangupEvent(CoreEvent): + janus = FixedValueProperty('hangup') + reason = StringProperty(optional=True) + + +class MediaEvent(CoreEvent): + janus = FixedValueProperty('media') + type = LimitedChoiceProperty(['audio', 'video']) + receiving = BooleanProperty() + seconds = IntegerProperty(optional=True) + + +class SlowlinkEvent(CoreEvent): + janus = FixedValueProperty('slowlink') + uplink = BooleanProperty() + nacks = IntegerProperty() + + +class WebrtcUpEvent(CoreEvent): + janus = FixedValueProperty('webrtcup') + + +# Janus plugin messages + +class PluginResponse(JanusPluginMessageBase): + janus = FixedValueProperty('success') + transaction = StringProperty() + session_id = IntegerProperty() + sender = IntegerProperty() + plugindata = ObjectProperty(PluginDataContainer) # type: PluginDataContainer + + +class PluginEvent(JanusPluginMessageBase): + janus = FixedValueProperty('event') + transaction = StringProperty(optional=True) + session_id = IntegerProperty() + sender = IntegerProperty() + plugindata = ObjectProperty(PluginDataContainer) # type: PluginDataContainer + jsep = ObjectProperty(JSEP, optional=True) # type: JSEP + + +# SIP plugin data messages + +# Results + +class SIPResultRegistering(JSONObject): + event = FixedValueProperty('registering') + + +class SIPResultRegistered(JSONObject): + event = FixedValueProperty('registered') + username = StringProperty() # not used + register_sent = BooleanProperty() + + +class SIPResultRegistrationFailed(JSONObject): + event = FixedValueProperty('registration_failed') + code = IntegerProperty() + reason = StringProperty() + + +class SIPResultUnregistering(JSONObject): + event = FixedValueProperty('unregistering') + + +class SIPResultUnregistered(JSONObject): + event = FixedValueProperty('unregistered') + username = StringProperty() # not used + + +class SIPResultCalling(JSONObject): + event = FixedValueProperty('calling') + + +class SIPResultRinging(JSONObject): + event = FixedValueProperty('ringing') + + +class SIPResultProceeding(JSONObject): + event = FixedValueProperty('proceeding') + code = IntegerProperty() + + +class SIPResultProgress(JSONObject): + event = FixedValueProperty('progress') + username = StringProperty() # not used + + +class SIPResultDeclining(JSONObject): + event = FixedValueProperty('declining') + code = IntegerProperty() + + +class SIPResultAccepting(JSONObject): + event = FixedValueProperty('accepting') + + +class SIPResultAccepted(JSONObject): + event = FixedValueProperty('accepted') + username = StringProperty(optional=True) # not used (only present for outgoing) + + +class SIPResultHolding(JSONObject): + event = FixedValueProperty('holding') + + +class SIPResultResuming(JSONObject): + event = FixedValueProperty('resuming') + + +class SIPResultHangingUp(JSONObject): + event = FixedValueProperty('hangingup') + + +class SIPResultHangup(JSONObject): + event = FixedValueProperty('hangup') + code = IntegerProperty() + reason = StringProperty() + + +class SIPResultIncomingCall(JSONObject): + event = FixedValueProperty('incomingcall') + username = StringProperty() + displayname = StringProperty(optional=True) + srtp = LimitedChoiceProperty(['sdes_optional', 'sdes_mandatory'], optional=True) + + +class SIPResultMissedCall(JSONObject): + event = FixedValueProperty('missed_call') + caller = StringProperty() + displayname = StringProperty(optional=True) + + +class SIPResultInfo(JSONObject): + event = FixedValueProperty('info') + sender = StringProperty() + displayname = StringProperty(optional=True) + type = StringProperty() + content = StringProperty() + + +class SIPResultInfoSent(JSONObject): + event = FixedValueProperty('infosent') + + +class SIPResultMessage(JSONObject): + event = FixedValueProperty('message') + sender = StringProperty() + displayname = StringProperty(optional=True) + content = StringProperty() + + +class SIPResultMessageSent(JSONObject): + event = FixedValueProperty('messagesent') + + +class SIPResultDTMFSent(JSONObject): + event = FixedValueProperty('dtmfsent') + + +# Data messages + +class SIPErrorEvent(SIPPluginData): + sip = FixedValueProperty('event') + error_code = IntegerProperty() + error = StringProperty() + + +class SIPRegisteringEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultRegistering) # type: SIPResultRegistering + + +class SIPRegisteredEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultRegistered) # type: SIPResultRegistered + + +class SIPRegistrationFailedEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultRegistrationFailed) # type: SIPResultRegistrationFailed + + +class SIPUnregisteringEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultUnregistering) # type: SIPResultUnregistering + + +class SIPUnregisteredEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultUnregistered) # type: SIPResultRegistered + + +class SIPCallingEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultCalling) # type: SIPResultCalling + + +class SIPRingingEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultRinging) # type: SIPResultRinging + + +class SIPProceedingEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultProceeding) # type: SIPResultProceeding + + +class SIPProgressEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultProgress) # type: SIPResultProgress + + +class SIPDecliningEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultDeclining) # type: SIPResultDeclining + + +class SIPAcceptingEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultAccepting) # type: SIPResultAccepting + + +class SIPAcceptedEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultAccepted) # type: SIPResultAccepted + + +class SIPHoldingEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultHolding) # type: SIPResultHolding + + +class SIPResumingEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultResuming) # type: SIPResultResuming + + +class SIPHangingUpEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultHangingUp) # type: SIPResultHangingUp + + +class SIPHangupEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultHangup) # type: SIPResultHangup + + +class SIPIncomingCallEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultIncomingCall) # type: SIPResultIncomingCall + + +class SIPMissedCallEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultMissedCall) # type: SIPResultMissedCall + + +class SIPInfoEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultInfo) # type: SIPResultInfo + + +class SIPInfoSentEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultInfoSent) # type: SIPResultInfoSent + + +class SIPMessageEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultMessage) # type: SIPResultMessage + + +class SIPMessageSentEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultMessageSent) # type: SIPResultMessageSent + + +class SIPDTMFSentEvent(SIPPluginData): + sip = FixedValueProperty('event') + result = ObjectProperty(SIPResultDTMFSent) # type: SIPResultDTMFSent + + +# Videoroom plugin data messages + +class VideoroomCreated(VideoroomPluginData): + videoroom = FixedValueProperty('created') + room = IntegerProperty() + # permanent = BooleanProperty() # this is not available in older janus versions. not used. + + +class VideoroomEdited(VideoroomPluginData): + videoroom = FixedValueProperty('edited') + room = IntegerProperty() + # permanent = BooleanProperty() # this is not available in older janus versions. not used. + + +class VideoroomDestroyed(VideoroomPluginData): # this comes both in the response to 'destroy' and in an event to participants still in the room when destroyed (if any) + videoroom = FixedValueProperty('destroyed') + room = IntegerProperty() + # permanent = BooleanProperty(optional=True) # this is not available in older janus versions (only present in the response, but not in the event). not used. + + +class VideoroomJoined(VideoroomPluginData): + videoroom = FixedValueProperty('joined') + room = IntegerProperty() + description = StringProperty() + id = IntegerProperty() + publishers = ArrayProperty(VideoroomPublishers) # type: VideoroomPublishers + # private_id = IntegerProperty() # this is not available in older janus versions. not used. + + +class VideoroomAttached(VideoroomPluginData): + videoroom = FixedValueProperty('attached') + room = IntegerProperty() + id = IntegerProperty() + display = StringProperty(optional=True) + + +class VideoroomSlowLink(VideoroomPluginData): + videoroom = FixedValueProperty('slow_link') + # current_bitrate = IntegerProperty() # this is actually defined as 'current-bitrate' in JSON, so we cannot map it to an attribute name. also not used. + + +class VideoroomErrorEvent(VideoroomPluginData): + __event__ = 'error' + + videoroom = FixedValueProperty('event') + error_code = IntegerProperty() + error = StringProperty() + + +class VideoroomStartedEvent(VideoroomPluginData): + __event__ = 'started' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + started = FixedValueProperty('ok') + + +class VideoroomPublishersEvent(VideoroomPluginData): + __event__ = 'publishers' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + publishers = ArrayProperty(VideoroomPublishers) # type: VideoroomPublishers + + +class VideoroomConfiguredEvent(VideoroomPluginData): + __event__ = 'configured' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + configured = FixedValueProperty('ok') + + +class VideoroomLeftEvent(VideoroomPluginData): + __event__ = 'left' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + left = FixedValueProperty('ok') + + +class VideoroomLeavingEvent(VideoroomPluginData): + __event__ = 'leaving' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + leaving = AbstractProperty() # this is either a participant id or the string "ok" + reason = StringProperty(optional=True) + + +class VideoroomKickedEvent(VideoroomPluginData): + __event__ = 'kicked' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + kicked = IntegerProperty() + + +class VideoroomUnpublishedEvent(VideoroomPluginData): + __event__ = 'unpublished' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + unpublished = AbstractProperty() # this is either a participant id or the string "ok" + + +class VideoroomPausedEvent(VideoroomPluginData): + __event__ = 'paused' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + paused = FixedValueProperty('ok') + + +class VideoroomSwitchedEvent(VideoroomPluginData): + __event__ = 'switched' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + id = IntegerProperty() + switched = FixedValueProperty('ok') + + +class VideoroomJoiningEvent(VideoroomPluginData): # only sent if room has notify_joining == True (default is False). Can be used to monitor non-publishers. + __event__ = 'joining' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + joining = ObjectProperty(UserId) # type: UserId + + +class VideoroomDisplayEvent(VideoroomPluginData): # participant display name change + __event__ = 'display' + + videoroom = FixedValueProperty('event') + id = IntegerProperty() + display = StringProperty() + + +class VideoroomSubstreamEvent(VideoroomPluginData): # simulcast substream change + __event__ = 'substream' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + substream = IntegerProperty() + + +class VideoroomTemporalEvent(VideoroomPluginData): # simulcast temporal layer change + __event__ = 'temporal' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + temporal = IntegerProperty() + + +class VideoroomSpatialLayerEvent(VideoroomPluginData): # SVC spatial layer change + __event__ = 'spatial_layer' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + spatial_layer = IntegerProperty() + + +class VideoroomTemporalLayerEvent(VideoroomPluginData): # SVC temporal layer change + __event__ = 'temporal_layer' + + videoroom = FixedValueProperty('event') + room = IntegerProperty() + temporal_layer = IntegerProperty() + + +# Janus message to model mapping + +class ProtocolError(Exception): + pass + + +class PluginDataHandler(object): + pass + + +class SIPDataHandler(PluginDataHandler): + __plugin__ = 'sip' + + __classmap__ = {cls.__id__: cls for cls in subclasses(SIPPluginData) if cls.__plugin__ in cls.__properties__} + + @classmethod + def decode(cls, data): + try: + sip = data[cls.__plugin__] + if 'error' in data: + data_key = cls.__plugin__, sip, 'error' + elif 'result' in data: + data_key = cls.__plugin__, sip, data['result']['event'] + else: + data_key = cls.__plugin__, sip + except KeyError: + raise ProtocolError('could not get {!s} plugin data type from {!r}'.format(cls.__plugin__, data)) + try: + data_class = cls.__classmap__[data_key] + except KeyError: + raise ProtocolError('unknown {!s} plugin data: {!r}'.format(cls.__plugin__, data)) + return data_class(**data) + + +class VideoroomDataHandler(PluginDataHandler): + __plugin__ = 'videoroom' + + __classmap__ = {cls.__id__: cls for cls in subclasses(VideoroomPluginData) if cls.__plugin__ in cls.__properties__} + __eventset__ = frozenset(cls.__event__ for cls in subclasses(VideoroomPluginData) if cls.__event__) + + @classmethod + def decode(cls, data): + try: + data_type = data[cls.__plugin__] + except KeyError: + raise ProtocolError('could not get {!s} plugin data type from {!r}'.format(cls.__plugin__, data)) + if data_type == 'event': + common_keys = list(cls.__eventset__.intersection(data)) + if len(common_keys) != 1: + raise ProtocolError('unknown {!s} plugin event: {!r}'.format(cls.__plugin__, data)) + event_type = common_keys[0] + data_key = cls.__plugin__, data_type, event_type + else: + data_key = cls.__plugin__, data_type + try: + data_class = cls.__classmap__[data_key] + except KeyError: + raise ProtocolError('unknown {!s} plugin data: {!r}'.format(cls.__plugin__, data)) + return data_class(**data) + + +class JanusMessage(object): + __core_classmap__ = {cls.janus.value: cls for cls in subclasses(JanusCoreMessageBase) if 'janus' in cls.__properties__} + __plugin_classmap__ = {cls.janus.value: cls for cls in subclasses(JanusPluginMessageBase) if 'janus' in cls.__properties__} + __plugin_handlers__ = {handler.__plugin__: handler for handler in subclasses(PluginDataHandler)} + + @classmethod + def from_payload(cls, payload): + try: + message_type = payload['janus'] + except KeyError: + raise ProtocolError('could not get Janus message type') + if 'plugindata' in payload: + try: + message_class = cls.__plugin_classmap__[message_type] + except KeyError: + raise ProtocolError('unknown Janus message: {!s}'.format(message_type)) + data_container = payload['plugindata'] + try: + plugin_name = data_container['plugin'].rpartition('.')[2] + plugin_data = data_container['data'] + except KeyError: + raise ProtocolError('invalid plugin data: {!r}'.format(data_container)) + try: + data_handler = cls.__plugin_handlers__[plugin_name] + except KeyError: + raise ProtocolError('could not find a data handler for {!r}'.format(plugin_data)) + data_container['data'] = data_handler.decode(plugin_data) + else: + try: + message_class = cls.__core_classmap__[message_type] + except KeyError: + raise ProtocolError('unknown Janus message: {!s}'.format(message_type)) + return message_class(**payload) diff --git a/sylk/applications/webrtcgateway/models/jsonobjects.py b/sylk/applications/webrtcgateway/models/jsonobjects.py index 9685d61..334d7bb 100644 --- a/sylk/applications/webrtcgateway/models/jsonobjects.py +++ b/sylk/applications/webrtcgateway/models/jsonobjects.py @@ -1,525 +1,537 @@ class Validator(object): def validate(self, value): """Check value and raise ValueError if invalid, else return the (possibly modified) value""" return value class CompositeValidator(Validator): def __init__(self, *validators): if len(validators) < 2: raise TypeError('need at least two validators to create a CompositeValidator') if not all(isinstance(validator, Validator) for validator in validators): raise ValueError('validators need to be Validator instances') self.validators = validators def validate(self, value): for validator in self.validators: value = validator.validate(value) return value class MultiType(tuple): """ A collection of types for which isinstance(obj, multi_type) returns True if 'obj' is an instance of any of the types in the multi_type. Instantiating the multi_type will instantiate the first type in the multi_type. """ # noinspection PyArgumentList def __new__(cls, *args): if not args: raise ValueError('{.__name__} must have at least one type'.format(cls)) instance = super(MultiType, cls).__new__(cls, args) instance.__name__ = ', '.join(cls.__name__ for cls in args) instance.main_type = args[0] return instance def __call__(self, value): return self.main_type(value) class AbstractProperty(object): data_type = object container = False def __init__(self, optional=False, default=None, validator=None): if validator is not None and not isinstance(validator, Validator): raise TypeError('validator should be a Validator instance or None') self.default = default self.optional = optional self.validator = validator self.name = None # will be set by the JSONObjectType metaclass when associating properties with objects def __get__(self, instance, owner): if instance is None: return self return instance.__dict__.get(self.name, self.default) # mandatory properties are guaranteed to be present, only optional ones can be missing def __set__(self, instance, value): if value is None and self.optional: instance.__dict__[self.name] = None else: instance.__dict__[self.name] = self._parse(value) def __delete__(self, instance): if not self.optional: raise AttributeError('Cannot delete mandatory property {property.name!r} of object {instance.__class__.__name__!r}'.format(instance=instance, property=self)) try: del instance.__dict__[self.name] except KeyError: raise AttributeError(self.name) def _parse(self, value): if not isinstance(value, self.data_type): raise ValueError('Invalid value for {property.name!r} property: {value!r}'.format(property=self, value=value)) if self.validator is not None: value = self.validator.validate(value) return value class BooleanProperty(AbstractProperty): data_type = bool class IntegerProperty(AbstractProperty): data_type = int, long class NumberProperty(AbstractProperty): data_type = int, long, float class StringProperty(AbstractProperty): data_type = str, unicode class FixedValueProperty(AbstractProperty): def __init__(self, value): super(FixedValueProperty, self).__init__(optional=True, default=value) self.value = value def _parse(self, value): if value != self.value: raise ValueError('Invalid value for {property.name!r} property: {value!r} (should be {property.value!r})'.format(property=self, value=value)) return value class LimitedChoiceProperty(AbstractProperty): def __init__(self, values, optional=False, default=None): if not values: raise ValueError('values needs to be an non-empty sequence of elements') if optional and default is not None and default not in values: raise ValueError('default value needs to be one of the allowed values or None') super(LimitedChoiceProperty, self).__init__(optional=optional, default=default) self.values = frozenset(values) self.values_string = ' or '.join(', '.join(sorted(values)).rsplit(', ', 1)) def _parse(self, value): if value not in self.values: raise ValueError('Invalid value for {property.name!r} property: {value!r} (expected: {property.values_string})'.format(property=self, value=value)) return value class ArrayProperty(AbstractProperty): data_type = list, tuple container = True def __init__(self, array_type, optional=False): if not issubclass(array_type, JSONArray): raise TypeError('array_type should be a subclass of JSONArray') super(ArrayProperty, self).__init__(optional=optional, default=None, validator=None) self.array_type = array_type def _parse(self, value): if type(value) is self.array_type: return value elif isinstance(value, self.data_type): return self.array_type(value) else: raise ValueError('Invalid value for {property.name!r} property: {value!r}'.format(property=self, value=value)) class ObjectProperty(AbstractProperty): data_type = dict container = True def __init__(self, object_type, optional=False): if not issubclass(object_type, JSONObject): raise TypeError('object_type should be a subclass of JSONObject') super(ObjectProperty, self).__init__(optional=optional, default=None, validator=None) self.object_type = object_type def _parse(self, value): if type(value) is self.object_type: return value elif isinstance(value, self.data_type): return self.object_type(**value) else: raise ValueError('Invalid value for {property.name!r} property: {value!r}'.format(property=self, value=value)) class PropertyContainer(object): def __init__(self, cls): self.__dict__.update({item.name: item for cls in reversed(cls.__mro__) for item in cls.__dict__.itervalues() if isinstance(item, AbstractProperty)}) def __getitem__(self, name): return self.__dict__[name] def __contains__(self, name): return name in self.__dict__ def __iter__(self): return self.__dict__.itervalues() @property def names(self): return set(self.__dict__) class JSONObjectType(type): # noinspection PyShadowingBuiltins def __init__(cls, name, bases, dictionary): super(JSONObjectType, cls).__init__(name, bases, dictionary) for name, property in ((name, item) for name, item in dictionary.iteritems() if isinstance(item, AbstractProperty)): property.name = name cls.__properties__ = PropertyContainer(cls) class JSONObject(object): __metaclass__ = JSONObjectType # noinspection PyShadowingBuiltins def __init__(self, **data): for property in self.__properties__: if property.name in data: property.__set__(self, data[property.name]) elif not property.optional: raise ValueError('Mandatory property {property.name!r} of {object.__class__.__name__!r} object is missing'.format(property=property, object=self)) # noinspection PyShadowingBuiltins @property def __data__(self): data = {} for property in self.__properties__: value = self.__dict__.get(property.name, property.default) if value is not None: data[property.name] = value.__data__ if property.container else value elif property.name in self.__dict__: data[property.name] = None return data def __contains__(self, name): return name in self.__properties__ class ArrayParser(object): def __init__(self, cls): self.item_type = MultiType(*cls.item_type) if isinstance(cls.item_type, (list, tuple)) else cls.item_type self.item_validator = cls.item_validator # this is only used for primitive item types if isinstance(self.item_type, JSONObjectType): self.parse_item = self.__parse_object_item self.parse_list = self.__parse_object_list elif isinstance(self.item_type, JSONArrayType): self.parse_item = self.__parse_array_item self.parse_list = self.__parse_array_list else: self.parse_item = self.__parse_primitive_item self.parse_list = self.__parse_primitive_list def __parse_primitive_item(self, item): if not isinstance(item, self.item_type): raise ValueError('Invalid value for {type.__name__}: {item!r}'.format(type=self.item_type, item=item)) if self.item_validator is not None: item = self.item_validator.validate(item) return item def __parse_primitive_list(self, iterable): item_type = self.item_type for item in iterable: if not isinstance(item, item_type): raise ValueError('Invalid value for {type.__name__}: {item!r}'.format(type=item_type, item=item)) if self.item_validator is not None: # note: can be optimized by moving this test outside the loop (not sure if the decreased readability is worth it) item = self.item_validator.validate(item) yield item def __parse_array_item(self, item): try: return item if type(item) is self.item_type else self.item_type(item) except TypeError: raise ValueError('Invalid value for {type.__name__}: {item!r}'.format(type=self.item_type, item=item)) def __parse_array_list(self, iterable): item_type = self.item_type for item in iterable: try: yield item if type(item) is item_type else item_type(item) except TypeError: raise ValueError('Invalid value for {type.__name__}: {item!r}'.format(type=item_type, item=item)) def __parse_object_item(self, item): try: return item if type(item) is self.item_type else self.item_type(**item) except TypeError: raise ValueError('Invalid value for {type.__name__}: {item!r}'.format(type=self.item_type, item=item)) def __parse_object_list(self, iterable): item_type = self.item_type for item in iterable: try: yield item if type(item) is item_type else item_type(**item) except TypeError: raise ValueError('Invalid value for {type.__name__}: {item!r}'.format(type=item_type, item=item)) class JSONArrayType(type): item_type = object item_validator = None list_validator = None def __init__(cls, name, bases, dictionary): super(JSONArrayType, cls).__init__(name, bases, dictionary) if cls.item_validator is not None and isinstance(cls.item_type, (JSONArrayType, JSONObjectType)): raise TypeError('item_validator is not used for JSONArray and JSONObject item types as they have their own validators') if cls.item_validator is not None and not isinstance(cls.item_validator, Validator): raise TypeError('item_validator should be a Validator instance or None') if cls.list_validator is not None and not isinstance(cls.list_validator, Validator): raise TypeError('list_validator should be a Validator instance or None') cls.parser = ArrayParser(cls) class JSONArray(object): __metaclass__ = JSONArrayType item_type = object item_validator = None # this should only be defined for primitive item types list_validator = None def __init__(self, iterable): if isinstance(iterable, basestring): # prevent iterable primitive types from being interpreted as arrays raise ValueError('Invalid value for {.__class__.__name__}: {!r}'.format(self, iterable)) items = list(self.parser.parse_list(iterable)) if self.list_validator is not None: items = self.list_validator.validate(items) self.__items__ = items @property def __data__(self): return [item.__data__ for item in self.__items__] if isinstance(self.item_type, (JSONArrayType, JSONObjectType)) else self.__items__[:] def __repr__(self): return '{0.__class__.__name__}({0.__items__!r})'.format(self) def __contains__(self, item): return item in self.__items__ def __iter__(self): return iter(self.__items__) def __len__(self): return len(self.__items__) def __reversed__(self): return reversed(self.__items__) __hash__ = None def __getitem__(self, index): return self.__items__[index] def __setitem__(self, index, value): value = self.parser.parse_item(value) if self.list_validator is not None: clone = self.__items__[:] clone[index] = value self.__items__ = self.list_validator.validate(clone) else: self.__items__[index] = value def __delitem__(self, index): if self.list_validator is not None: clone = self.__items__[:] del clone[index] self.__items__ = self.list_validator.validate(clone) else: del self.__items__[index] def __getslice__(self, i, j): return self.__items__[i:j] def __setslice__(self, i, j, sequence): sequence = list(self.parser.parse_list(sequence)) if self.list_validator is not None: clone = self.__items__[:] clone[i:j] = sequence self.__items__ = self.list_validator.validate(clone) else: self.__items__[i:j] = sequence def __delslice__(self, i, j): if self.list_validator is not None: clone = self.__items__[:] del clone[i:j] self.__items__ = self.list_validator.validate(clone) else: del self.__items__[i:j] def __add__(self, other): if isinstance(other, JSONArray): return self.__class__(self.__items__ + other.__items__) else: return self.__class__(self.__items__ + other) def __radd__(self, other): if isinstance(other, JSONArray): return self.__class__(other.__items__ + self.__items__) else: return self.__class__(other + self.__items__) def __iadd__(self, other): if isinstance(other, JSONArray) and self.item_type == other.item_type: items = other.__items__ else: items = list(self.parser.parse_list(other)) if self.list_validator is not None: clone = self.__items__[:] clone += items self.__items__ = self.list_validator.validate(clone) else: self.__items__ += items return self def __mul__(self, n): return self.__class__(self.__items__ * n) def __rmul__(self, n): return self.__class__(self.__items__ * n) def __imul__(self, n): if self.list_validator is not None: self.__items__ = self.list_validator.validate(n * self.__items__) else: self.__items__ *= n return self def __eq__(self, other): return self.__items__ == other.__items__ if isinstance(other, JSONArray) else self.__items__ == other def __ne__(self, other): return self.__items__ != other.__items__ if isinstance(other, JSONArray) else self.__items__ != other def __lt__(self, other): return self.__items__ < other.__items__ if isinstance(other, JSONArray) else self.__items__ < other def __le__(self, other): return self.__items__ <= other.__items__ if isinstance(other, JSONArray) else self.__items__ <= other def __gt__(self, other): return self.__items__ > other.__items__ if isinstance(other, JSONArray) else self.__items__ > other def __ge__(self, other): return self.__items__ >= other.__items__ if isinstance(other, JSONArray) else self.__items__ >= other def __format__(self, format_spec): return self.__items__.__format__(format_spec) def index(self, value, *args): return self.__items__.index(value, *args) def count(self, value): return self.__items__.count(value) def append(self, value): value = self.parser.parse_item(value) if self.list_validator is not None: clone = self.__items__[:] clone.append(value) self.__items__ = self.list_validator.validate(clone) else: self.__items__.append(value) def insert(self, index, value): value = self.parser.parse_item(value) if self.list_validator is not None: clone = self.__items__[:] clone.insert(index, value) self.__items__ = self.list_validator.validate(clone) else: self.__items__.insert(index, value) def extend(self, other): if isinstance(other, JSONArray) and self.item_type == other.item_type: items = other.__items__ else: items = list(self.parser.parse_list(other)) if self.list_validator is not None: clone = self.__items__[:] clone.extend(items) self.__items__ = self.list_validator.validate(clone) else: self.__items__.extend(items) def pop(self, index=-1): if self.list_validator is not None: clone = self.__items__[:] clone.pop(index) self.__items__ = self.list_validator.validate(clone) else: self.__items__.pop(index) def remove(self, value): if self.list_validator is not None: clone = self.__items__[:] clone.remove(value) self.__items__ = self.list_validator.validate(clone) else: self.__items__.remove(value) def reverse(self): if self.list_validator is not None: clone = self.__items__[:] clone.reverse() self.__items__ = self.list_validator.validate(clone) else: self.__items__.reverse() def sort(self, key=None, reverse=False): if self.list_validator is not None: clone = self.__items__[:] clone.sort(key=key, reverse=reverse) self.__items__ = self.list_validator.validate(clone) else: self.__items__.sort(key=key, reverse=reverse) class BooleanArray(JSONArray): item_type = bool class IntegerArray(JSONArray): item_type = int, long class NumberArray(JSONArray): item_type = int, long, float class StringArray(JSONArray): item_type = str, unicode class ArrayOf(object): def __new__(cls, item_type, name='GenericArray', item_validator=None, list_validator=None): return JSONArrayType(name, (JSONArray,), dict(item_type=item_type, item_validator=item_validator, list_validator=list_validator)) JSONList = JSONArray BooleanList = BooleanArray IntegerList = IntegerArray NumberList = NumberArray StringList = StringArray ListOf = ArrayOf + + +# Abstract container properties + +class AbstractObjectProperty(AbstractProperty): + data_type = JSONObject + container = True + + +class AbstractArrayProperty(AbstractProperty): + data_type = JSONArray + container = True