Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/webrtcgateway/janus.py b/sylk/applications/webrtcgateway/janus.py
index 6bf3e23..a611843 100644
--- a/sylk/applications/webrtcgateway/janus.py
+++ b/sylk/applications/webrtcgateway/janus.py
@@ -1,248 +1,248 @@
import json
import uuid
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from autobahn.twisted.websocket import connectWS, WebSocketClientFactory, WebSocketClientProtocol
from twisted.internet import reactor, defer
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.python.failure import Failure
from zope.interface import implements
from sylk import __version__ as SYLK_VERSION
from sylk.applications.webrtcgateway.configuration import JanusConfig
from sylk.applications.webrtcgateway.logger import log
class JanusRequest(object):
def __init__(self, request_type, **kwargs):
self.janus = request_type
self.transaction = uuid.uuid4().hex
if JanusConfig.api_secret:
self.apisecret = JanusConfig.api_secret
self.__dict__.update(**kwargs)
@property
def type(self):
return self.janus
@property
def transaction_id(self):
return self.transaction
def as_dict(self):
return self.__dict__.copy()
class JanusError(Exception):
def __init__(self, code, reason):
super(JanusError, self).__init__(reason)
self.code = code
self.reason = reason
class JanusClientProtocol(WebSocketClientProtocol):
_janus_event_handlers = None
_janus_pending_transactions = None
_janus_keepalive_timers = None
_janus_keepalive_interval = 45
notification_center = NotificationCenter()
def onOpen(self):
self.notification_center.post_notification('JanusBackendConnected', sender=self)
self._janus_pending_transactions = {}
self._janus_keepalive_timers = {}
self._janus_event_handlers = {}
def onMessage(self, payload, isBinary):
if isBinary:
log.warn('Unexpected binary payload received')
return
self.notification_center.post_notification('WebRTCJanusTrace', sender=self, data=NotificationData(direction='INCOMING', message=payload, peer=self.peer))
try:
data = json.loads(payload)
except Exception as e:
log.warn('Error decoding payload: %s' % e)
return
try:
message_type = data.pop('janus')
except KeyError:
log.warn('Received payload lacks message type: %s' % payload)
return
transaction_id = data.pop('transaction', None)
if message_type == 'event' or transaction_id is None:
# This is an event. Janus is not very consistent here, some 'events'
# do have the transaction id set. So we check for the message type as well.
handle_id = data.pop('sender', -1)
handler = self._janus_event_handlers.get(handle_id, Null)
try:
handler(handle_id, message_type, data)
except Exception:
log.exception()
return
try:
- req, d = self._janus_pending_transactions.pop(transaction_id)
+ request, deferred = self._janus_pending_transactions.pop(transaction_id)
except KeyError:
log.warn('Discarding unexpected response: %s' % payload)
return
# events were handled above, so the only message types we get here are ack, success and error
# todo: some plugin errors are delivered with message_type == 'success' and the error code is buried somewhere in plugindata
if message_type == 'error':
code = data['error']['code']
reason = data['error']['reason']
- d.errback(JanusError(code, reason))
+ deferred.errback(JanusError(code, reason))
elif message_type == 'ack':
- d.callback(None)
+ deferred.callback(None)
else: # success
# keepalive and trickle only receive an ACK, thus are handled above in message_type == 'ack', not here
- if req.type in ('create', 'attach'):
+ if request.type in ('create', 'attach'):
result = data['data']['id']
- elif req.type in ('destroy', 'detach'):
+ elif request.type in ('destroy', 'detach'):
result = None
else: # info, message (for synchronous message requests only)
result = data
- d.callback(result)
+ deferred.callback(result)
def connectionLost(self, reason):
super(JanusClientProtocol, self).connectionLost(reason)
self.notification_center.post_notification('JanusBackendDisconnected', sender=self, data=NotificationData(reason=reason.getErrorMessage()))
def disconnect(self, code=1000, reason=u''):
self.sendClose(code, reason)
def janus_set_event_handler(self, handle_id, event_handler):
if event_handler is None:
self._janus_event_handlers.pop(handle_id, None)
else:
assert callable(event_handler)
self._janus_event_handlers[handle_id] = event_handler
- def _janus_send_request(self, req):
- data = json.dumps(req.as_dict())
+ def _janus_send_request(self, request):
+ deferred = defer.Deferred()
+ data = json.dumps(request.as_dict())
self.notification_center.post_notification('WebRTCJanusTrace', sender=self, data=NotificationData(direction='OUTGOING', message=data, peer=self.peer))
self.sendMessage(data)
- d = defer.Deferred()
- self._janus_pending_transactions[req.transaction_id] = (req, d)
- return d
+ self._janus_pending_transactions[request.transaction_id] = (request, deferred)
+ return deferred
def janus_info(self):
- req = JanusRequest('info')
- return self._janus_send_request(req)
+ request = JanusRequest('info')
+ return self._janus_send_request(request)
def janus_create_session(self):
- req = JanusRequest('create')
- return self._janus_send_request(req)
+ request = JanusRequest('create')
+ return self._janus_send_request(request)
def janus_destroy_session(self, session_id):
- req = JanusRequest('destroy', session_id=session_id)
- return self._janus_send_request(req)
+ request = JanusRequest('destroy', session_id=session_id)
+ return self._janus_send_request(request)
def janus_attach(self, session_id, plugin):
- req = JanusRequest('attach', session_id=session_id, plugin=plugin)
- return self._janus_send_request(req)
+ request = JanusRequest('attach', session_id=session_id, plugin=plugin)
+ return self._janus_send_request(request)
def janus_detach(self, session_id, handle_id):
- req = JanusRequest('detach', session_id=session_id, handle_id=handle_id)
- return self._janus_send_request(req)
+ request = JanusRequest('detach', session_id=session_id, handle_id=handle_id)
+ return self._janus_send_request(request)
def janus_message(self, session_id, handle_id, body, jsep=None):
- req = JanusRequest('message', session_id=session_id, handle_id=handle_id, body=body)
+ request = JanusRequest('message', session_id=session_id, handle_id=handle_id, body=body)
if jsep is not None:
- req.jsep = jsep
- return self._janus_send_request(req)
+ request.jsep = jsep
+ return self._janus_send_request(request)
def janus_trickle(self, session_id, handle_id, candidates):
- req = JanusRequest('trickle', session_id=session_id, handle_id=handle_id)
+ request = JanusRequest('trickle', session_id=session_id, handle_id=handle_id)
if candidates:
if len(candidates) == 1:
- req.candidate = candidates[0]
+ request.candidate = candidates[0]
else:
- req.candidates = candidates
+ request.candidates = candidates
else:
- req.candidate = {'completed': True}
- return self._janus_send_request(req)
+ request.candidate = {'completed': True}
+ return self._janus_send_request(request)
def _janus_keepalive_callback(self, result, session_id):
if isinstance(result, Failure):
self._janus_keepalive_timers.pop(session_id)
else:
self._janus_keepalive_timers[session_id] = reactor.callLater(self._janus_keepalive_interval, self._janus_send_keepalive, session_id)
def _janus_send_keepalive(self, session_id):
- req = JanusRequest('keepalive', session_id=session_id)
- d = self._janus_send_request(req)
- d.addBoth(self._janus_keepalive_callback, session_id)
- return d
+ request = JanusRequest('keepalive', session_id=session_id)
+ deferred = self._janus_send_request(request)
+ deferred.addBoth(self._janus_keepalive_callback, session_id)
+ return deferred
def janus_start_keepalive(self, session_id):
self.janus_stop_keepalive(session_id)
self._janus_keepalive_timers[session_id] = reactor.callLater(self._janus_keepalive_interval, self._janus_send_keepalive, session_id)
def janus_stop_keepalive(self, session_id):
timer = self._janus_keepalive_timers.pop(session_id, None)
if timer is not None and timer.active():
timer.cancel()
class JanusClientFactory(ReconnectingClientFactory, WebSocketClientFactory):
noisy = False
protocol = JanusClientProtocol
class JanusBackend(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
self.factory = JanusClientFactory(url=JanusConfig.api_url, protocols=['janus-protocol'], useragent='SylkServer/%s' % SYLK_VERSION)
self.connector = None
self.protocol = Null
self._stopped = False
def __getattr__(self, attr):
if attr.startswith('janus_'):
return getattr(self.protocol, attr)
return self.attr
@property
def ready(self):
return self.protocol is not Null
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, name='JanusBackendConnected')
notification_center.add_observer(self, name='JanusBackendDisconnected')
self.connector = connectWS(self.factory)
def stop(self):
if self._stopped:
return
self._stopped = True
self.factory.stopTrying()
notification_center = NotificationCenter()
notification_center.discard_observer(self, name='JanusBackendConnected')
notification_center.discard_observer(self, name='JanusBackendDisconnected')
if self.connector is not None:
self.connector.disconnect()
self.connector = None
if self.protocol is not None:
self.protocol.disconnect()
self.protocol = Null
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_JanusBackendConnected(self, notification):
assert self.protocol is Null
self.protocol = notification.sender
log.info('Janus backend connection up')
self.factory.resetDelay()
def _NH_JanusBackendDisconnected(self, notification):
log.info('Janus backend connection down: %s' % notification.data.reason)
self.protocol = Null

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 5:54 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408912
Default Alt Text
(11 KB)

Event Timeline