Page MenuHomePhabricator

No OneTemporary

diff --git a/mediaproxy/dispatcher.py b/mediaproxy/dispatcher.py
index d765855..9d40a0a 100644
--- a/mediaproxy/dispatcher.py
+++ b/mediaproxy/dispatcher.py
@@ -1,613 +1,613 @@
"""Implementation of the MediaProxy dispatcher"""
import hashlib
import random
import signal
import cPickle as pickle
import cjson
from base64 import b64encode as base64_encode
from collections import deque
from itertools import ifilter
from time import time
from application import log
from application.process import process
from application.system import unlink
from gnutls.errors import CertificateSecurityError
from gnutls.interfaces.twisted import TLSContext
from twisted.protocols.basic import LineOnlyReceiver
from twisted.python import failure
from twisted.internet.error import ConnectionDone, TCPTimedOutError
from twisted.internet.protocol import Factory, connectionDone
from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed
from twisted.internet import reactor
from mediaproxy import __version__
from mediaproxy.configuration import DispatcherConfig
from mediaproxy.interfaces import opensips
from mediaproxy.scheduler import RecurrentCall, KeepRunning
from mediaproxy.tls import X509Credentials
class CommandError(Exception):
pass
class Command(object):
def __init__(self, name, headers=None):
self.name = name
self.headers = headers or []
try:
self.parsed_headers = dict(header.split(': ', 1) for header in self.headers)
except Exception:
raise CommandError('Could not parse command headers')
else:
self.__dict__['session_id'] = None if self.call_id is None else base64_encode(hashlib.md5(self.call_id).digest()).rstrip('=')
@property
def call_id(self):
return self.parsed_headers.get('call_id')
@property
def dialog_id(self):
return self.parsed_headers.get('dialog_id')
@property
def session_id(self):
return self.__dict__['session_id']
class ProtocolLogger(log.ContextualLogger):
def __init__(self, name):
super(ProtocolLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend
self.name = name
def apply_context(self, message):
return '[{0}] {1}'.format(self.name, message) if message != '' else ''
class SessionLogger(log.ContextualLogger):
def __init__(self, session):
super(SessionLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend
self.session_id = session.call_id
self.relay_ip = session.relay_ip
def apply_context(self, message):
return '[session {0.session_id} at {0.relay_ip}] {1}'.format(self, message) if message != '' else ''
class ControlProtocol(LineOnlyReceiver):
logger = None # type: ProtocolLogger
noisy = False
def __init__(self):
self.in_progress = 0
def lineReceived(self, line):
raise NotImplementedError()
def connectionLost(self, reason=connectionDone):
if isinstance(reason.value, connectionDone.type):
self.logger.info('Connection closed')
else:
self.logger.warning('Connection lost: {}'.format(reason.value))
self.factory.connection_lost(self)
def reply(self, reply):
self.transport.write(reply + self.delimiter)
def _error_handler(self, failure):
failure.trap(CommandError, RelayError)
self.logger.error(failure.value)
self.reply('error')
def _catch_all(self, failure):
self.logger.error(failure.getTraceback())
self.reply('error')
def _decrement(self, result):
self.in_progress = 0
if self.factory.shutting_down:
self.transport.loseConnection()
def _add_callbacks(self, defer):
defer.addCallback(self.reply)
defer.addErrback(self._error_handler)
defer.addErrback(self._catch_all)
defer.addBoth(self._decrement)
class OpenSIPSControlProtocol(ControlProtocol):
logger = ProtocolLogger(name='OpenSIPS Interface')
def __init__(self):
self.request_lines = []
ControlProtocol.__init__(self)
def lineReceived(self, line):
if line == '':
if self.request_lines:
self.in_progress += 1
defer = maybeDeferred(self.handle_request, self.request_lines)
self._add_callbacks(defer)
self.request_lines = []
elif not line.endswith(': '):
self.request_lines.append(line)
def handle_request(self, request_lines):
command = Command(name=request_lines[0], headers=request_lines[1:])
if command.call_id is None:
raise CommandError('Request is missing the call_id header')
return self.factory.dispatcher.send_command(command)
class ManagementControlProtocol(ControlProtocol):
logger = ProtocolLogger(name='Management Interface')
def connectionMade(self):
if DispatcherConfig.management_use_tls and DispatcherConfig.management_passport is not None:
peer_cert = self.transport.getPeerCertificate()
if not DispatcherConfig.management_passport.accept(peer_cert):
- self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted'))
+ self.transport.loseConnection()
return
def lineReceived(self, line):
if line in ['quit', 'exit']:
self.transport.loseConnection()
elif line == 'summary':
defer = self.factory.dispatcher.relay_factory.get_summary()
self._add_callbacks(defer)
elif line == 'sessions':
defer = self.factory.dispatcher.relay_factory.get_statistics()
self._add_callbacks(defer)
elif line == 'version':
self.reply(__version__)
else:
self.logger.error('Unknown command: %s' % line)
self.reply('error')
class ControlFactory(Factory):
noisy = False
def __init__(self, dispatcher):
self.dispatcher = dispatcher
self.protocols = []
self.shutting_down = False
def buildProtocol(self, addr):
protocol = Factory.buildProtocol(self, addr)
self.protocols.append(protocol)
return protocol
def connection_lost(self, prot):
self.protocols.remove(prot)
if self.shutting_down and len(self.protocols) == 0:
self.defer.callback(None)
def shutdown(self):
if self.shutting_down:
return
self.shutting_down = True
if len(self.protocols) == 0:
return succeed(None)
else:
for prot in self.protocols:
if prot.in_progress == 0:
prot.transport.loseConnection()
self.defer = Deferred()
return self.defer
class OpenSIPSControlFactory(ControlFactory):
protocol = OpenSIPSControlProtocol
class ManagementControlFactory(ControlFactory):
protocol = ManagementControlProtocol
class RelayError(Exception):
pass
class ConnectionReplaced(ConnectionDone):
pass
class RelayServerProtocol(LineOnlyReceiver):
MAX_LENGTH = 4096*1024 # 4MB
noisy = False
def __init__(self):
self.ip = None # type: str
self.logger = None # type: ProtocolLogger
self.commands = {}
self.halting = False
self.timedout = False
self.disconnect_timer = None
self.sequence_number = 0
self.authenticated = False
@property
def active(self):
return not self.halting and not self.timedout
def send_command(self, command):
if command.call_id:
self.logger.info('Requesting {0.name!r} for session {0.session_id}'.format(command))
else:
self.logger.info('Requesting {0.name!r}'.format(command))
sequence_number = str(self.sequence_number)
self.sequence_number += 1
defer = Deferred()
timer = reactor.callLater(DispatcherConfig.relay_timeout, self._timeout, sequence_number)
self.commands[sequence_number] = (command, defer, timer)
self.transport.write(self.delimiter.join(['{} {}'.format(command.name, sequence_number)] + command.headers) + 2*self.delimiter)
return defer
def reply(self, reply):
self.transport.write(reply + self.delimiter)
def _timeout(self, sequence_number):
command, defer, timer = self.commands.pop(sequence_number)
defer.errback(RelayError('%r command failed: relay at %s timed out' % (command.name, self.ip)))
if self.timedout is False:
self.timedout = True
self.disconnect_timer = reactor.callLater(DispatcherConfig.relay_recover_interval, self.transport.connectionLost, failure.Failure(TCPTimedOutError()))
def connectionMade(self):
if DispatcherConfig.passport is not None:
peer_cert = self.transport.getPeerCertificate()
if not DispatcherConfig.passport.accept(peer_cert):
- self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted'))
+ self.transport.loseConnection()
return
self.authenticated = True
self.factory.new_relay(self)
def lineReceived(self, line):
try:
first, rest = line.split(' ', 1)
except ValueError:
first = line
rest = ''
if first == 'expired':
try:
stats = cjson.decode(rest)
except cjson.DecodeError as e:
self.logger.error('Could not decode JSON: {}'.format(e))
else:
call_id = stats['call_id']
session = self.factory.sessions.get(call_id, None)
if session is None:
self.logger.error('Expired session has unknown call_id %s' % call_id)
return
if session.relay_ip != self.ip:
session.logger.error('relay at %s reported the session as expired, ignoring' % self.ip)
return
all_streams_ice = all(stream_info['status'] == 'unselected ICE candidate' for stream_info in stats['streams'])
if all_streams_ice:
session.logger.info('removed because ICE was used')
stats['timed_out'] = False
else:
session.logger.info('did timeout')
stats['timed_out'] = True
stats['dialog_id'] = session.dialog_id
stats['all_streams_ice'] = all_streams_ice
self.factory.dispatcher.update_statistics(session, stats)
if session.dialog_id is not None and stats['start_time'] is not None and not all_streams_ice:
self.factory.dispatcher.opensips_management.end_dialog(session.dialog_id)
session.expire_time = time()
else:
del self.factory.sessions[call_id]
return
elif first == 'ping':
if self.timedout is True:
self.timedout = False
if self.disconnect_timer.active():
self.disconnect_timer.cancel()
self.disconnect_timer = None
self.reply('pong')
return
try:
command, defer, timer = self.commands.pop(first)
except KeyError:
self.logger.error('Got unexpected response: {}'.format(line))
return
timer.cancel()
if rest == 'error':
defer.errback(RelayError('Relay replied with error'))
elif rest == 'halting':
self.halting = True
defer.errback(RelayError('Relay is shutting down'))
elif command.name == 'remove':
try:
stats = cjson.decode(rest)
except cjson.DecodeError:
self.logger.error('Error decoding JSON')
else:
call_id = stats['call_id']
session = self.factory.sessions[call_id]
stats['dialog_id'] = session.dialog_id
stats['timed_out'] = False
self.factory.dispatcher.update_statistics(session, stats)
del self.factory.sessions[call_id]
defer.callback('removed')
else: # update command
defer.callback(rest)
def connectionLost(self, reason=connectionDone):
if reason.type == ConnectionDone:
self.logger.info('Connection closed')
elif reason.type == ConnectionReplaced:
self.logger.warning('Connection replaced')
else:
self.logger.error('Connection lost: {}'.format(reason.value))
for command, defer, timer in self.commands.itervalues():
timer.cancel()
defer.errback(RelayError('Relay disconnected'))
if self.timedout is True:
self.timedout = False
if self.disconnect_timer.active():
self.disconnect_timer.cancel()
self.disconnect_timer = None
self.factory.connection_lost(self)
class RelaySession(object):
def __init__(self, relay, command):
self.relay_ip = relay.ip
self.call_id = command.call_id
self.session_id = command.session_id
self.dialog_id = command.dialog_id
self.logger = SessionLogger(self)
self.expire_time = None
def __getstate__(self):
state = self.__dict__.copy()
del state['logger']
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.logger = SessionLogger(self)
class RelayFactory(Factory):
protocol = RelayServerProtocol
noisy = False
def __init__(self, dispatcher):
self.dispatcher = dispatcher
self.relays = {}
self.shutting_down = False
state_file = process.runtime.file('dispatcher_state')
try:
self.sessions = pickle.load(open(state_file))
except Exception:
self.sessions = {}
self.cleanup_timers = {}
else:
self.cleanup_timers = dict((ip, reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, ip)) for ip in set(session.relay_ip for session in self.sessions.itervalues()))
unlink(state_file)
self.expired_cleaner = RecurrentCall(600, self._remove_expired_sessions)
def _remove_expired_sessions(self):
now, limit = time(), DispatcherConfig.cleanup_expired_sessions_after
obsolete = [k for k, s in ifilter(lambda (k, s): s.expire_time and (now-s.expire_time>=limit), self.sessions.iteritems())]
if obsolete:
[self.sessions.pop(call_id) for call_id in obsolete]
log.warning('found %d expired sessions which were not removed during the last %d hours' % (len(obsolete), round(limit / 3600.0)))
return KeepRunning
def buildProtocol(self, addr):
protocol = Factory.buildProtocol(self, addr)
protocol.ip = addr.host
protocol.logger = ProtocolLogger(name='relay {}'.format(addr.host))
protocol.logger.info('Connection established')
return protocol
def new_relay(self, relay):
old_relay = self.relays.pop(relay.ip, None)
if old_relay is not None:
relay.logger.warning('Reconnected, closing old connection')
reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced('relay reconnected')))
self.relays[relay.ip] = relay
timer = self.cleanup_timers.pop(relay.ip, None)
if timer is not None:
timer.cancel()
defer = relay.send_command(Command('sessions'))
defer.addCallback(self._cb_purge_sessions, relay.ip)
def _cb_purge_sessions(self, result, relay_ip):
relay_sessions = cjson.decode(result)
relay_call_ids = [session['call_id'] for session in relay_sessions]
for session_id, session in self.sessions.items():
if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids:
session.logger.warning('Relay does not have the session anymore, statistics are probably lost')
if session.dialog_id is not None:
self.dispatcher.opensips_management.end_dialog(session.dialog_id)
del self.sessions[session_id]
def send_command(self, command):
session = self.sessions.get(command.call_id, None)
if session and session.expire_time is None:
relay = session.relay_ip
if relay not in self.relays:
session.logger.error('Request {0.name!r} failed: relay no longer connected'.format(command))
raise RelayError('Request {0.name!r} failed: relay no longer connected'.format(command))
return self.relays[relay].send_command(command)
# We do not have a session for this call_id or the session is already expired
if command.name == 'update':
preferred_relay = command.parsed_headers.get('media_relay')
try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay)
random.shuffle(try_relays)
if preferred_relay is not None:
protocol = self.relays.get(preferred_relay)
if protocol is not None and protocol.active:
try_relays.appendleft(protocol)
else:
log.warning('user requested media_relay %s is not available' % preferred_relay)
defer = self._try_next(try_relays, command)
defer.addCallback(self._add_session, try_relays, command)
return defer
elif command.name == 'remove' and session:
# This is the remove we received for an expired session for which we triggered dialog termination
del self.sessions[command.call_id]
return 'removed'
else:
raise RelayError('Got {0.name!r} for unknown session {0.session_id}'.format(command))
def _add_session(self, result, try_relays, command):
self.sessions[command.call_id] = RelaySession(try_relays[0], command)
return result
def _relay_error(self, failure, try_relays, command):
failure.trap(RelayError)
failed_relay = try_relays.popleft()
failed_relay.logger.warning('The {0.name!r} request failed: {1.value}'.format(command, failure))
return self._try_next(try_relays, command)
def _try_next(self, try_relays, command):
if len(try_relays) == 0:
raise RelayError('No suitable relay found')
defer = try_relays[0].send_command(command)
defer.addErrback(self._relay_error, try_relays, command)
return defer
def get_summary(self):
command = Command('summary')
defer = DeferredList([relay.send_command(command).addErrback(self._summary_error, command, relay) for relay in self.relays.itervalues()])
defer.addCallback(self._got_summaries)
return defer
def _summary_error(self, failure, command, relay):
relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure))
return cjson.encode(dict(status='error', ip=relay.ip))
def _got_summaries(self, results):
return '[%s]' % ', '.join(result for succeeded, result in results if succeeded)
def get_statistics(self):
command = Command('sessions')
defer = DeferredList([relay.send_command(command).addErrback(self._statistics_error, command, relay) for relay in self.relays.itervalues()])
defer.addCallback(self._got_statistics)
return defer
def _statistics_error(self, failure, command, relay):
relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure))
return cjson.encode([])
def _got_statistics(self, results):
return '[%s]' % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result != '[]')
def connection_lost(self, relay):
if relay not in self.relays.itervalues():
return
if relay.authenticated:
del self.relays[relay.ip]
if self.shutting_down:
if len(self.relays) == 0:
self.defer.callback(None)
else:
self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip)
def _do_cleanup(self, ip):
log.debug('Cleaning up after old relay at %s' % ip)
del self.cleanup_timers[ip]
for call_id in (call_id for call_id, session in self.sessions.items() if session.relay_ip == ip):
del self.sessions[call_id]
def shutdown(self):
if self.shutting_down:
return
self.shutting_down = True
for timer in self.cleanup_timers.itervalues():
timer.cancel()
if len(self.relays) == 0:
retval = succeed(None)
else:
for prot in self.relays.itervalues():
prot.transport.loseConnection()
self.defer = Deferred()
retval = self.defer
retval.addCallback(self._save_state)
return retval
def _save_state(self, result):
pickle.dump(self.sessions, open(process.runtime.file('dispatcher_state'), 'w'))
class Dispatcher(object):
def __init__(self):
self.accounting = [__import__('mediaproxy.interfaces.accounting.%s' % mod.lower(), globals(), locals(), ['']).Accounting() for mod in set(DispatcherConfig.accounting)]
self.cred = X509Credentials(cert_name='dispatcher')
self.tls_context = TLSContext(self.cred)
self.relay_factory = RelayFactory(self)
dispatcher_addr, dispatcher_port = DispatcherConfig.listen
self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr)
self.opensips_factory = OpenSIPSControlFactory(self)
socket_path = process.runtime.file(DispatcherConfig.socket_path)
unlink(socket_path)
self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory)
self.opensips_management = opensips.ManagementInterface()
self.management_factory = ManagementControlFactory(self)
management_addr, management_port = DispatcherConfig.listen_management
if DispatcherConfig.management_use_tls:
self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.tls_context, interface=management_addr)
else:
self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr)
def run(self):
log.debug('Using {0.__class__.__name__}'.format(reactor))
process.signals.add_handler(signal.SIGHUP, self._handle_signal)
process.signals.add_handler(signal.SIGINT, self._handle_signal)
process.signals.add_handler(signal.SIGTERM, self._handle_signal)
process.signals.add_handler(signal.SIGUSR1, self._handle_signal)
for accounting_module in self.accounting:
accounting_module.start()
reactor.run(installSignalHandlers=False)
def stop(self):
reactor.callFromThread(self._shutdown)
def send_command(self, command):
return maybeDeferred(self.relay_factory.send_command, command)
def update_statistics(self, session, stats):
session.logger.info('statistics: {}'.format(stats))
if stats['start_time'] is not None:
for accounting in self.accounting:
try:
accounting.do_accounting(stats)
except Exception, e:
log.exception('An unhandled error occurred while doing accounting: %s' % e)
def _handle_signal(self, signum, frame):
if signum == signal.SIGUSR1:
# toggle debugging
if log.level.current != log.level.DEBUG:
log.level.current = log.level.DEBUG
log.info('Switched logging level to DEBUG')
else:
log.info('Switched logging level to {}'.format(DispatcherConfig.log_level))
log.level.current = DispatcherConfig.log_level
else:
# terminate program
signal_map = {signal.SIGTERM: 'Terminated', signal.SIGINT: 'Interrupted', signal.SIGHUP: 'Hangup'}
log.info(signal_map.get(signum, 'Received signal {}, exiting.'.format(signum)))
self.stop()
def _shutdown(self):
defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None])
defer.addCallback(lambda x: self.opensips_factory.shutdown())
defer.addCallback(lambda x: self.management_factory.shutdown())
defer.addCallback(lambda x: self.relay_factory.shutdown())
defer.addCallback(lambda x: self._stop())
def _stop(self):
for act in self.accounting:
act.stop()
reactor.stop()
diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py
index c337749..9c59f34 100644
--- a/mediaproxy/relay.py
+++ b/mediaproxy/relay.py
@@ -1,392 +1,392 @@
"""Implementation of the MediaProxy relay"""
import cjson
import signal
import resource
try:
from twisted.internet import epollreactor; epollreactor.install()
except:
raise RuntimeError('mandatory epoll reactor support is not available from the twisted framework')
from application import log
from application.process import process
from gnutls.errors import CertificateError, CertificateSecurityError
from gnutls.interfaces.twisted import TLSContext
from time import time
from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError
from twisted.internet.protocol import ClientFactory, connectionDone
from twisted.internet.defer import DeferredList, succeed
from twisted.internet import reactor
from twisted.python import failure
from twisted.names import dns
from twisted.names.client import lookupService
from twisted.names.error import DomainError
from mediaproxy import __version__
from mediaproxy.configuration import RelayConfig
from mediaproxy.headers import DecodingDict, DecodingError
from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError
from mediaproxy.scheduler import RecurrentCall, KeepRunning
from mediaproxy.tls import X509Credentials
# Increase the system limit for the maximum number of open file descriptors
# to be able to handle connections to all ports in port_range
fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit))
except ValueError:
raise RuntimeError('Cannot set resource limit for maximum open file descriptors to %d' % fd_limit)
else:
new_limits = resource.getrlimit(resource.RLIMIT_NOFILE)
if new_limits < (fd_limit, fd_limit):
raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit))
else:
log.info('Set resource limit for maximum open file descriptors to %d' % fd_limit)
class RelayClientProtocol(LineOnlyReceiver):
noisy = False
required_headers = {'update': {'call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type'},
'remove': {'call_id', 'from_tag'},
'summary': set(),
'sessions': set()}
def __init__(self):
self.command = None
self.seq = None
self.headers = DecodingDict()
self._connection_watcher = None
self._queued_keepalives = 0
def _send_keepalive(self):
if self._queued_keepalives >= 3:
log.error('missed 3 keepalive answers in a row. assuming the connection is down.')
# do not use loseConnection() as it waits to flush the output buffers.
reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError()))
return None
self.transport.write('ping' + self.delimiter)
self._queued_keepalives += 1
return KeepRunning
def reply(self, reply):
self.transport.write(reply + self.delimiter)
def connectionMade(self):
peer = self.transport.getPeer()
log.info('Connected to dispatcher at %s:%d' % (peer.host, peer.port))
if RelayConfig.passport is not None:
peer_cert = self.transport.getPeerCertificate()
if not RelayConfig.passport.accept(peer_cert):
- self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted'))
+ self.transport.loseConnection()
self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive)
def connectionLost(self, reason=connectionDone):
if self._connection_watcher is not None:
self._connection_watcher.cancel()
self._connection_watcher = None
self._queued_keepalives = 0
def lineReceived(self, line):
if line == 'pong':
self._queued_keepalives -= 1
return
if self.command is None:
try:
command, seq = line.split()
except ValueError:
log.error('Could not decode command/sequence number pair from dispatcher: %s' % line)
return
if command in self.required_headers:
self.command = command
self.seq = seq
self.headers = DecodingDict()
else:
log.error('Unknown command: %s' % command)
self.reply('{} error'.format(seq))
elif line == '':
missing_headers = self.required_headers[self.command].difference(self.headers)
if missing_headers:
for header in missing_headers:
log.error('Missing mandatory header %r from %r command' % (header, self.command))
response = 'error'
else:
# noinspection PyBroadException
try:
response = self.factory.parent.got_command(self.factory.host, self.command, self.headers)
except Exception:
log.exception()
response = 'error'
self.reply('{} {}'.format(self.seq, response))
self.command = None
else:
try:
name, value = line.split(": ", 1)
except ValueError:
log.error('Unable to parse header: %s' % line)
else:
try:
self.headers[name] = value
except DecodingError, e:
log.error('Could not decode header: %s' % e)
class DispatcherConnectingFactory(ClientFactory):
noisy = False
protocol = RelayClientProtocol
def __init__(self, parent, host, port):
self.parent = parent
self.host = (host, port)
self.delayed = None
self.connection_lost = False
def __eq__(self, other):
return self.host == other.host
def clientConnectionFailed(self, connector, reason):
log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value))
if self.parent.connector_needs_reconnect(connector):
self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect)
def clientConnectionLost(self, connector, reason):
self.cancel_delayed()
if reason.type != ConnectionDone:
log.error('Connection with dispatcher at %(host)s:%(port)d was lost: %%s' % connector.__dict__ % reason.value)
else:
log.info('Connection with dispatcher at %(host)s:%(port)d was closed' % connector.__dict__)
if self.parent.connector_needs_reconnect(connector):
if isinstance(reason.value, CertificateError) or self.connection_lost:
self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect)
else:
self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect)
self.connection_lost = True
def buildProtocol(self, addr):
self.delayed = reactor.callLater(5, self._connected_successfully)
return ClientFactory.buildProtocol(self, addr)
def _connected_successfully(self):
self.connection_lost = False
def cancel_delayed(self):
if self.delayed:
if self.delayed.active():
self.delayed.cancel()
self.delayed = None
class SRVMediaRelayBase(object):
def __init__(self):
self.shutting_down = False
self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup)
self._do_lookup()
def _do_lookup(self):
defers = []
for addr, port, is_domain in RelayConfig.dispatchers:
if is_domain:
defer = lookupService("_sip._udp.%s" % addr)
defer.addCallback(self._cb_got_srv, port)
defer.addErrback(self._eb_no_srv, addr, port)
defers.append(defer)
else:
defers.append(succeed((addr, port)))
defer = DeferredList(defers)
defer.addCallback(self._cb_got_all)
return KeepRunning
def _cb_got_srv(self, (answers, auth, add), port):
for answer in answers:
if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."):
return str(answer.payload.target), port
raise DomainError
def _eb_no_srv(self, failure, addr, port):
failure.trap(DomainError)
return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr)
def _eb_no_dns(self, failure, addr):
failure.trap(DNSLookupError)
log.error("Could resolve neither SRV nor A record for '%s'" % addr)
def _cb_got_all(self, results):
if not self.shutting_down:
dispatchers = [result[1] for result in results if result[0] and result[1] is not None]
self.update_dispatchers(dispatchers)
def update_dispatchers(self, dispatchers):
raise NotImplementedError()
def run(self):
process.signals.add_handler(signal.SIGHUP, self._handle_signal)
process.signals.add_handler(signal.SIGINT, self._handle_signal)
process.signals.add_handler(signal.SIGTERM, self._handle_signal)
process.signals.add_handler(signal.SIGUSR1, self._handle_signal)
reactor.run(installSignalHandlers=False)
def stop(self, graceful=False):
reactor.callFromThread(self._shutdown, graceful=graceful)
def _handle_signal(self, signum, frame):
if signum == signal.SIGUSR1:
# toggle debugging
if log.level.current != log.level.DEBUG:
log.level.current = log.level.DEBUG
log.info('Switched logging level to DEBUG')
else:
log.info('Switched logging level to {}'.format(RelayConfig.log_level))
log.level.current = RelayConfig.log_level
else:
# terminate program
signal_map = {signal.SIGTERM: 'Terminated', signal.SIGINT: 'Interrupted', signal.SIGHUP: 'Graceful shutdown'}
log.info(signal_map.get(signum, 'Received signal {}, exiting.'.format(signum)))
self.stop(graceful=(signum == signal.SIGHUP))
def _shutdown(self, graceful=False):
raise NotImplementedError()
@staticmethod
def _shutdown_done():
reactor.stop()
try:
from mediaproxy.sipthor import SIPThorMediaRelayBase as MediaRelayBase
except ImportError:
MediaRelayBase = SRVMediaRelayBase
class MediaRelay(MediaRelayBase):
def __init__(self):
self.cred = X509Credentials(cert_name='relay')
self.tls_context = TLSContext(self.cred)
self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end)
self.dispatchers = set()
self.dispatcher_session_count = {}
self.dispatcher_connectors = {}
self.old_connectors = {}
self.shutting_down = False
self.graceful_shutdown = False
self.start_time = time()
super(MediaRelay, self).__init__()
@property
def status(self):
if self.graceful_shutdown or self.shutting_down:
return 'halting'
else:
return 'active'
def update_dispatchers(self, dispatchers):
dispatchers = set(dispatchers)
for new_dispatcher in dispatchers.difference(self.dispatchers):
if new_dispatcher in self.old_connectors.iterkeys():
log.info('Restoring old dispatcher at %s:%d' % new_dispatcher)
self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher)
else:
log.info('Adding new dispatcher at %s:%d' % new_dispatcher)
dispatcher_addr, dispatcher_port = new_dispatcher
factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port)
self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.tls_context)
for old_dispatcher in self.dispatchers.difference(dispatchers):
log.info('Removing old dispatcher at %s:%d' % old_dispatcher)
self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher)
self._check_disconnect(old_dispatcher)
self.dispatchers = dispatchers
def got_command(self, dispatcher, command, headers):
if command == 'summary':
summary = {'ip': RelayConfig.relay_ip,
'version': __version__,
'status': self.status,
'uptime': int(time() - self.start_time),
'session_count': len(self.session_manager.sessions),
'stream_count': self.session_manager.stream_count,
'bps_relayed': self.session_manager.bps_relayed}
return cjson.encode(summary)
elif command == 'sessions':
return cjson.encode(self.session_manager.statistics)
elif command == 'update':
if self.graceful_shutdown or self.shutting_down:
if not self.session_manager.has_session(**headers):
log.info('cannot add new session: media-relay is shutting down')
return 'halting'
try:
local_media = self.session_manager.update_session(dispatcher, **headers)
except RelayPortsExhaustedError:
log.error('Could not reserve relay ports for session, all allocated ports are being used')
return 'error'
if local_media:
return ' '.join([RelayConfig.advertised_ip or local_media[0][0]] + [str(media[1]) for media in local_media])
else: # command == 'remove'
session = self.session_manager.remove_session(**headers)
if session is None:
return 'error'
else:
return cjson.encode(session.statistics)
def session_expired(self, session):
connector = self.dispatcher_connectors.get(session.dispatcher)
if connector is None:
connector = self.old_connectors.get(session.dispatcher)
if connector and connector.state == 'connected':
connector.transport.write(' '.join(['expired', cjson.encode(session.statistics)]) + connector.factory.protocol.delimiter)
else:
log.warning('dispatcher for expired session is no longer online, statistics are lost!')
def add_session(self, dispatcher):
self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1
def remove_session(self, dispatcher):
self.dispatcher_session_count[dispatcher] -= 1
if self.dispatcher_session_count[dispatcher] == 0:
del self.dispatcher_session_count[dispatcher]
if self.graceful_shutdown and not self.dispatcher_session_count:
self._shutdown()
elif dispatcher in self.old_connectors:
self._check_disconnect(dispatcher)
def _check_disconnect(self, dispatcher):
connector = self.old_connectors[dispatcher]
if self.dispatcher_session_count.get(dispatcher, 0) == 0:
old_state = connector.state
connector.factory.cancel_delayed()
connector.disconnect()
if old_state == "disconnected":
del self.old_connectors[dispatcher]
if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0:
self._shutdown_done()
def connector_needs_reconnect(self, connector):
if connector in self.dispatcher_connectors.values():
return True
else:
for dispatcher, old_connector in self.old_connectors.items():
if old_connector is connector:
if self.dispatcher_session_count.get(dispatcher, 0) > 0:
return True
else:
del self.old_connectors[dispatcher]
break
if self.shutting_down:
if len(self.old_connectors) == 0:
self._shutdown_done()
return False
def _shutdown(self, graceful=False):
if graceful:
self.graceful_shutdown = True
if self.dispatcher_session_count:
return
if not self.shutting_down:
self.shutting_down = True
self.srv_monitor.cancel()
self.session_manager.cleanup()
if len(self.dispatcher_connectors) + len(self.old_connectors) == 0:
self._shutdown_done()
else:
self.update_dispatchers([])

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 11:24 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3409107
Default Alt Text
(42 KB)

Event Timeline