diff --git a/mediaproxy/__init__.py b/mediaproxy/__init__.py
index 9d5a4f5..eaf5b92 100644
--- a/mediaproxy/__init__.py
+++ b/mediaproxy/__init__.py
@@ -1,34 +1,31 @@
# Copyright (C) 2008 AG-Projects.
#
"""Mediaproxy implements a media relay for SIP calls"""
__version__ = "2.5.2"
system_config_directory = '/etc/mediaproxy'
runtime_directory = '/var/run/mediaproxy'
configuration_filename = 'config.ini'
-default_dispatcher_port = 25060
-default_management_port = 25061
-
package_requirements = {'python-application': '1.2.8',
'python-gnutls': '1.1.8',
'twisted': '2.5.0'}
try:
from application.dependency import ApplicationDependencies, DependencyError
except ImportError:
class DependencyError(Exception): pass
class ApplicationDependencies(object):
def __init__(self, *args, **kw):
pass
def check(self):
required_version = package_requirements['python-application']
raise DependencyError("need python-application version %s or higher but it's not installed" % required_version)
dependencies = ApplicationDependencies(**package_requirements)
diff --git a/mediaproxy/configuration/__init__.py b/mediaproxy/configuration/__init__.py
new file mode 100644
index 0000000..30fc16b
--- /dev/null
+++ b/mediaproxy/configuration/__init__.py
@@ -0,0 +1,88 @@
+# Copyright (C) 2008-2014 AG Projects
+#
+
+from application.configuration import ConfigSection, ConfigSetting
+from application.configuration.datatypes import IPAddress
+from application.system import host
+
+from mediaproxy import configuration_filename
+from mediaproxy.configuration.datatypes import AccountingModuleList, DispatcherIPAddress, DispatcherAddressList, DispatcherManagementAddress, PortRange, PositiveInteger, SIPThorDomain, X509NameValidator
+
+
+class DispatcherConfig(ConfigSection):
+ __cfgfile__ = configuration_filename
+ __section__ = 'Dispatcher'
+
+ socket_path = "dispatcher.sock"
+ listen = ConfigSetting(type=DispatcherIPAddress, value=DispatcherIPAddress("any"))
+ listen_management = ConfigSetting(type=DispatcherManagementAddress, value=DispatcherManagementAddress("any"))
+ relay_timeout = 5 # How much to wait for an answer from a relay
+ relay_recover_interval = 60 # How much to wait for an unresponsive relay to recover, before disconnecting it
+ cleanup_dead_relays_after = 43200 # 12 hours
+ cleanup_expired_sessions_after = 86400 # 24 hours
+ management_use_tls = True
+ accounting = ConfigSetting(type=AccountingModuleList, value=[])
+ passport = ConfigSetting(type=X509NameValidator, value=None)
+ management_passport = ConfigSetting(type=X509NameValidator, value=None)
+
+
+class RelayConfig(ConfigSection):
+ __cfgfile__ = configuration_filename
+ __section__ = 'Relay'
+
+ relay_ip = ConfigSetting(type=IPAddress, value=host.default_ip)
+ stream_timeout = 90
+ on_hold_timeout = 7200
+ traffic_sampling_period = 15
+ userspace_transmit_every = 1
+ dispatchers = ConfigSetting(type=DispatcherAddressList, value=[])
+ port_range = PortRange("50000:60000")
+ dns_check_interval = PositiveInteger(60)
+ keepalive_interval = PositiveInteger(10)
+ reconnect_delay = PositiveInteger(10)
+ passport = ConfigSetting(type=X509NameValidator, value=None)
+
+
+class OpenSIPSConfig(ConfigSection):
+ __cfgfile__ = configuration_filename
+ __section__ = 'OpenSIPS'
+
+ socket_path = '/var/run/opensips/socket'
+ max_connections = 10
+
+
+class RadiusConfig(ConfigSection):
+ __cfgfile__ = configuration_filename
+ __section__ = 'Radius'
+
+ config_file = "/etc/opensips/radius/client.conf"
+ additional_dictionary = "radius/dictionary"
+
+
+class DatabaseConfig(ConfigSection):
+ __cfgfile__ = configuration_filename
+ __section__ = 'Database'
+
+ dburi = ""
+ sessions_table = "media_sessions"
+ callid_column = "call_id"
+ fromtag_column = "from_tag"
+ totag_column = "to_tag"
+ info_column = "info"
+
+
+class TLSConfig(ConfigSection):
+ __cfgfile__ = configuration_filename
+ __section__ = 'TLS'
+
+ certs_path = 'tls'
+ verify_interval = 300
+
+
+class ThorNetworkConfig(ConfigSection):
+ __cfgfile__ = configuration_filename
+ __section__ = 'ThorNetwork'
+
+ domain = ConfigSetting(type=SIPThorDomain, value=None)
+ node_ip = host.default_ip
+
diff --git a/mediaproxy/configuration/datatypes.py b/mediaproxy/configuration/datatypes.py
new file mode 100644
index 0000000..98687d7
--- /dev/null
+++ b/mediaproxy/configuration/datatypes.py
@@ -0,0 +1,118 @@
+# Copyright (C) 2008-2014 AG Projects
+#
+
+import re
+
+from application.configuration.datatypes import IPAddress, NetworkAddress, StringList
+from gnutls import crypto
+
+
+class DispatcherIPAddress(NetworkAddress):
+ default_port = 25060
+
+
+class DispatcherManagementAddress(NetworkAddress):
+ default_port = 25061
+
+
+class AccountingModuleList(StringList):
+ _valid_backends = set(('database', 'radius'))
+
+ def __new__(cls, value):
+ proposed_backends = set(StringList.__new__(cls, value))
+ return list(proposed_backends & cls._valid_backends)
+
+
+class DispatcherAddress(tuple):
+ default_port = 25060
+
+ def __new__(cls, value):
+ match = re.search(r"^(?P
.+?):(?P\d+)$", value)
+ if match:
+ address = str(match.group("address"))
+ port = int(match.group("port"))
+ else:
+ address = value
+ port = cls.default_port
+ try:
+ address = IPAddress(address)
+ is_domain = False
+ except ValueError:
+ is_domain = True
+ return tuple.__new__(cls, (address, port, is_domain))
+
+
+class DispatcherAddressList(list):
+ def __init__(cls, value):
+ list.__init__(cls, (DispatcherAddress(dispatcher) for dispatcher in re.split(r'\s*,\s*|\s+', value)))
+
+
+class PortRange(object):
+ """A port range in the form start:end with start and end being even numbers in the [1024, 65536] range"""
+ def __init__(self, value):
+ self.start, self.end = [int(p) for p in value.split(':', 1)]
+ allowed = xrange(1024, 65537, 2)
+ if not (self.start in allowed and self.end in allowed and self.start < self.end):
+ raise ValueError("bad range: %r: ports must be even numbers in the range [1024, 65536] with start < end" % value)
+ def __repr__(self):
+ return "%s('%d:%d')" % (self.__class__.__name__, self.start, self.end)
+
+
+class PositiveInteger(int):
+ def __new__(cls, value):
+ instance = int.__new__(cls, value)
+ if instance < 1:
+ raise ValueError("value must be a positive integer")
+ return instance
+
+
+class SIPThorDomain(str):
+ """A SIP Thor domain name or the keyword None"""
+ def __new__(cls, name):
+ if name is None:
+ return None
+ elif not isinstance(name, basestring):
+ raise TypeError("domain name must be a string, unicode or None")
+ if name.lower() == 'none':
+ return None
+ return name
+
+
+class X509NameValidator(crypto.X509Name):
+ def __new__(cls, dname):
+ if dname.lower() == 'none':
+ return None
+ return crypto.X509Name.__new__(cls, dname)
+
+ def __init__(self, dname):
+ str.__init__(self)
+ pairs = [x.replace('\,', ',') for x in re.split(r'(?
#
"""Implementation of the MediaProxy dispatcher"""
import random
import signal
import cPickle as pickle
import cjson
from collections import deque
from itertools import ifilter
from time import time
-for name in ('epollreactor', 'kqreactor', 'pollreactor', 'selectreactor'):
- try: __import__('twisted.internet.%s' % name, globals(), locals(), fromlist=[name]).install()
- except: continue
- else: break
+from application import log
+from application.process import process
+from application.system import unlink
+from gnutls.errors import CertificateSecurityError
from twisted.protocols.basic import LineOnlyReceiver
from twisted.python import failure
from twisted.internet.error import ConnectionDone, TCPTimedOutError
from twisted.internet.protocol import Factory
from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed
from twisted.internet import reactor
-from gnutls.errors import CertificateSecurityError
-
-from application import log
-from application.process import process
-from application.configuration import ConfigSection, ConfigSetting
-from application.configuration.datatypes import NetworkAddress, StringList
-from application.system import unlink
-from mediaproxy import configuration_filename, default_dispatcher_port, default_management_port, __version__
-from mediaproxy.tls import X509Credentials, X509NameValidator
+from mediaproxy import __version__
+from mediaproxy.configuration import DispatcherConfig
from mediaproxy.interfaces import opensips
from mediaproxy.scheduler import RecurrentCall, KeepRunning
-
+from mediaproxy.tls import X509Credentials
log.msg("Twisted is using %s" % reactor.__module__.rsplit('.', 1)[-1])
-class DispatcherAddress(NetworkAddress):
- default_port = default_dispatcher_port
-
-class DispatcherManagementAddress(NetworkAddress):
- default_port = default_management_port
-
-class AccountingModuleList(StringList):
- _valid_backends = set(('database', 'radius'))
-
- def __new__(cls, value):
- proposed_backends = set(StringList.__new__(cls, value))
- invalid_names = proposed_backends - cls._valid_backends
- for name in invalid_names:
- log.warn("Ignoring invalid accounting module name: `%s'" % name)
- return list(proposed_backends & cls._valid_backends)
-
-
-class Config(ConfigSection):
- __cfgfile__ = configuration_filename
- __section__ = 'Dispatcher'
-
- socket_path = "dispatcher.sock"
- listen = ConfigSetting(type=DispatcherAddress, value=DispatcherAddress("any"))
- listen_management = ConfigSetting(type=DispatcherManagementAddress, value=DispatcherManagementAddress("any"))
- relay_timeout = 5 # How much to wait for an answer from a relay
- relay_recover_interval = 60 # How much to wait for an unresponsive relay to recover, before disconnecting it
- cleanup_dead_relays_after = 43200 # 12 hours
- cleanup_expired_sessions_after = 86400 # 24 hours
- management_use_tls = True
- accounting = ConfigSetting(type=AccountingModuleList, value=[])
- passport = ConfigSetting(type=X509NameValidator, value=None)
- management_passport = ConfigSetting(type=X509NameValidator, value=None)
-
-
-
class ControlProtocol(LineOnlyReceiver):
noisy = False
def __init__(self):
self.in_progress = 0
def lineReceived(self, line):
raise NotImplementedError()
def connectionLost(self, reason):
log.debug("Connection to %s lost: %s" % (self.description, reason.value))
self.factory.connection_lost(self)
def reply(self, reply):
self.transport.write(reply + "\r\n")
def _relay_error(self, failure):
failure.trap(RelayError)
log.error(failure.value)
self.transport.write("error\r\n")
def _catch_all(self, failure):
log.error(failure.getBriefTraceback())
self.transport.write("error\r\n")
def _decrement(self, result):
self.in_progress = 0
if self.factory.shutting_down:
self.transport.loseConnection()
def _add_callbacks(self, defer):
defer.addCallback(self.reply)
defer.addErrback(self._relay_error)
defer.addErrback(self._catch_all)
defer.addBoth(self._decrement)
class OpenSIPSControlProtocol(ControlProtocol):
description = "OpenSIPS"
def __init__(self):
self.line_buf = []
ControlProtocol.__init__(self)
def lineReceived(self, line):
if line == "":
if self.line_buf:
self.in_progress += 1
defer = self.factory.dispatcher.send_command(self.line_buf[0], self.line_buf[1:])
self._add_callbacks(defer)
self.line_buf = []
elif not line.endswith(": "):
self.line_buf.append(line)
class ManagementControlProtocol(ControlProtocol):
description = "Management interface client"
def connectionMade(self):
- if Config.management_use_tls and Config.management_passport is not None:
+ if DispatcherConfig.management_use_tls and DispatcherConfig.management_passport is not None:
peer_cert = self.transport.getPeerCertificate()
- if not Config.management_passport.accept(peer_cert):
+ if not DispatcherConfig.management_passport.accept(peer_cert):
self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted'))
return
def lineReceived(self, line):
if line in ["quit", "exit"]:
self.transport.loseConnection()
elif line == "summary":
defer = self.factory.dispatcher.relay_factory.get_summary()
self._add_callbacks(defer)
elif line == "sessions":
defer = self.factory.dispatcher.relay_factory.get_statistics()
self._add_callbacks(defer)
elif line == "version":
self.reply(__version__)
else:
log.error("Unknown command on management interface: %s" % line)
self.reply("error")
class ControlFactory(Factory):
noisy = False
def __init__(self, dispatcher):
self.dispatcher = dispatcher
self.protocols = []
self.shutting_down = False
def buildProtocol(self, addr):
prot = Factory.buildProtocol(self, addr)
self.protocols.append(prot)
return prot
def connection_lost(self, prot):
self.protocols.remove(prot)
if self.shutting_down and len(self.protocols) == 0:
self.defer.callback(None)
def shutdown(self):
if self.shutting_down:
return
self.shutting_down = True
if len(self.protocols) == 0:
return succeed(None)
else:
for prot in self.protocols:
if prot.in_progress == 0:
prot.transport.loseConnection()
self.defer = Deferred()
return self.defer
class OpenSIPSControlFactory(ControlFactory):
protocol = OpenSIPSControlProtocol
class ManagementControlFactory(ControlFactory):
protocol = ManagementControlProtocol
class RelayError(Exception):
pass
class ConnectionReplaced(ConnectionDone):
pass
class RelayServerProtocol(LineOnlyReceiver):
noisy = False
MAX_LENGTH = 4096*1024 ## (4MB)
def __init__(self):
self.commands = {}
self.halting = False
self.timedout = False
self.disconnect_timer = None
self.sequence_number = 0
self.authenticated = False
@property
def active(self):
return not self.halting and not self.timedout
def send_command(self, command, headers):
log.debug('Issuing "%s" command to relay at %s' % (command, self.ip))
seq = str(self.sequence_number)
self.sequence_number += 1
defer = Deferred()
- timer = reactor.callLater(Config.relay_timeout, self._timeout, seq, defer)
+ timer = reactor.callLater(DispatcherConfig.relay_timeout, self._timeout, seq, defer)
self.commands[seq] = (command, defer, timer)
self.transport.write("\r\n".join([" ".join([command, seq])] + headers + ["", ""]))
return defer
def _timeout(self, seq, defer):
del self.commands[seq]
defer.errback(RelayError("Relay at %s timed out" % self.ip))
if self.timedout is False:
self.timedout = True
- self.disconnect_timer = reactor.callLater(Config.relay_recover_interval, self.transport.connectionLost, failure.Failure(TCPTimedOutError()))
+ self.disconnect_timer = reactor.callLater(DispatcherConfig.relay_recover_interval, self.transport.connectionLost, failure.Failure(TCPTimedOutError()))
def connectionMade(self):
- if Config.passport is not None:
+ if DispatcherConfig.passport is not None:
peer_cert = self.transport.getPeerCertificate()
- if not Config.passport.accept(peer_cert):
+ if not DispatcherConfig.passport.accept(peer_cert):
self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted'))
return
self.authenticated = True
self.factory.new_relay(self)
def lineReceived(self, line):
try:
first, rest = line.split(" ", 1)
except ValueError:
first = line
rest = ""
if first == "expired":
try:
stats = cjson.decode(rest)
except cjson.DecodeError:
log.error("Error decoding JSON from relay at %s" % self.ip)
else:
call_id = stats['call_id']
session = self.factory.sessions.get(call_id, None)
if session is None:
log.error("Unknown session with call_id %s expired at relay %s" % (call_id, self.ip))
return
if session.relay_ip != self.ip:
log.error("session with call_id %s expired at relay %s, but is actually at relay %s, ignoring" % (call_id, self.ip, session.relay_ip))
return
log.msg("session with call_id %s from relay %s did timeout" % (call_id, session.relay_ip))
stats["dialog_id"] = session.dialog_id
stats["timed_out"] = True
all_streams_ice = all(stream_info["status"] == "unselected ICE candidate" for stream_info in stats["streams"])
stats["all_streams_ice"] = all_streams_ice
self.factory.dispatcher.update_statistics(stats)
if session.dialog_id is not None and stats["start_time"] is not None and not all_streams_ice:
self.factory.dispatcher.opensips_management.end_dialog(session.dialog_id)
session.expire_time = time()
else:
del self.factory.sessions[call_id]
return
elif first == "ping":
if self.timedout is True:
self.timedout = False
if self.disconnect_timer.active():
self.disconnect_timer.cancel()
self.disconnect_timer = None
self.transport.write("pong\r\n")
return
try:
command, defer, timer = self.commands.pop(first)
except KeyError:
log.error("Got unexpected response from relay at %s: %s" % (self.ip, line))
return
timer.cancel()
if rest == "error":
defer.errback(RelayError("Received error from relay at %s in response to `%s' command" % (self.ip, command)))
elif rest == "halting":
self.halting = True
defer.errback(RelayError("Relay at %s is shutting down" % self.ip))
elif command == "remove":
try:
stats = cjson.decode(rest)
except cjson.DecodeError:
log.error("Error decoding JSON from relay at %s" % self.ip)
else:
call_id = stats['call_id']
session = self.factory.sessions[call_id]
stats["dialog_id"] = session.dialog_id
stats["timed_out"] = False
self.factory.dispatcher.update_statistics(stats)
del self.factory.sessions[call_id]
defer.callback("removed")
else: # update command
defer.callback(rest)
def connectionLost(self, reason):
if reason.type == ConnectionDone:
log.msg("Connection with relay at %s was closed" % self.ip)
elif reason.type == ConnectionReplaced:
log.warn("Old connection with relay at %s was lost" % self.ip)
else:
log.error("Connection with relay at %s was lost: %s" % (self.ip, reason.value))
for command, defer, timer in self.commands.itervalues():
timer.cancel()
defer.errback(RelayError("Relay at %s disconnected" % self.ip))
if self.timedout is True:
self.timedout = False
if self.disconnect_timer.active():
self.disconnect_timer.cancel()
self.disconnect_timer = None
self.factory.connection_lost(self)
class DialogID(str):
def __new__(cls, did):
if did is None:
return None
try:
h_entry, h_id = did.split(':')
except:
log.error("invalid dialog_id value: `%s'" % did)
return None
instance = str.__new__(cls, did)
instance.h_entry = h_entry
instance.h_id = h_id
return instance
class RelaySession(object):
def __init__(self, relay_ip, command_headers):
self.relay_ip = relay_ip
self.dialog_id = DialogID(command_headers.get('dialog_id'))
self.expire_time = None
class RelayFactory(Factory):
noisy = False
protocol = RelayServerProtocol
def __init__(self, dispatcher):
self.dispatcher = dispatcher
self.relays = {}
self.shutting_down = False
state_file = process.runtime_file("dispatcher_state")
try:
self.sessions = pickle.load(open(state_file))
except:
self.sessions = {}
self.cleanup_timers = {}
else:
- self.cleanup_timers = dict((ip, reactor.callLater(Config.cleanup_dead_relays_after, self._do_cleanup, ip)) for ip in set(session.relay_ip for session in self.sessions.itervalues()))
+ self.cleanup_timers = dict((ip, reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, ip)) for ip in set(session.relay_ip for session in self.sessions.itervalues()))
unlink(state_file)
self.expired_cleaner = RecurrentCall(600, self._remove_expired_sessions)
def _remove_expired_sessions(self):
- now, limit = time(), Config.cleanup_expired_sessions_after
+ now, limit = time(), DispatcherConfig.cleanup_expired_sessions_after
obsolete = [k for k, s in ifilter(lambda (k, s): s.expire_time and (now-s.expire_time>=limit), self.sessions.iteritems())]
if obsolete:
[self.sessions.pop(call_id) for call_id in obsolete]
log.warn("found %d expired sessions which were not removed during the last %d hours" % (len(obsolete), round(limit/3600.0)))
return KeepRunning
def buildProtocol(self, addr):
ip = addr.host
log.debug("Connection from relay at %s" % ip)
prot = Factory.buildProtocol(self, addr)
prot.ip = ip
return prot
def new_relay(self, relay):
old_relay = self.relays.pop(relay.ip, None)
if old_relay is not None:
log.warn("Relay at %s reconnected, closing old connection" % relay.ip)
reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced("relay reconnected")))
self.relays[relay.ip] = relay
timer = self.cleanup_timers.pop(relay.ip, None)
if timer is not None:
timer.cancel()
defer = relay.send_command("sessions", [])
defer.addCallback(self._cb_purge_sessions, relay.ip)
def _cb_purge_sessions(self, result, relay_ip):
relay_sessions = cjson.decode(result)
relay_call_ids = [session["call_id"] for session in relay_sessions]
for session_id, session in self.sessions.items():
if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids:
log.warn("Session %s is no longer on relay %s, statistics are probably lost" % (session_id, relay_ip))
if session.dialog_id is not None:
self.dispatcher.opensips_management.end_dialog(session.dialog_id)
del self.sessions[session_id]
def send_command(self, command, headers):
try:
parsed_headers = dict(header.split(": ", 1) for header in headers)
except:
raise RelayError("Could not parse headers from OpenSIPs")
try:
call_id = parsed_headers["call_id"]
except KeyError:
raise RelayError("Missing call_id header")
session = self.sessions.get(call_id, None)
if session and session.expire_time is None:
relay = session.relay_ip
if relay not in self.relays:
raise RelayError("Relay for this session (%s) is no longer connected" % relay)
return self.relays[relay].send_command(command, headers)
## We do not have a session for this call_id or the session is already expired
if command == "update":
preferred_relay = parsed_headers.get("media_relay")
try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay)
random.shuffle(try_relays)
if preferred_relay is not None:
protocol = self.relays.get(preferred_relay)
if protocol is not None and protocol.active:
try_relays.appendleft(protocol)
else:
log.warn("user requested media_relay %s is not available" % preferred_relay)
defer = self._try_next(try_relays, command, headers)
defer.addCallback(self._add_session, try_relays, call_id, parsed_headers)
return defer
elif command == 'remove' and session:
## This is the remove we received for an expired session for which we triggered dialog termination
del self.sessions[call_id]
return 'removed'
else:
raise RelayError("Got `%s' command from OpenSIPS for unknown session with call-id `%s'" % (command, call_id))
def _add_session(self, result, try_relays, call_id, parsed_headers):
self.sessions[call_id] = RelaySession(try_relays[0].ip, parsed_headers)
return result
def _relay_error(self, failure, try_relays, command, headers):
failure.trap(RelayError)
failed_relay = try_relays.popleft()
log.warn("Relay %s failed: %s" % (failed_relay, failure.value))
return self._try_next(try_relays, command, headers)
def _try_next(self, try_relays, command, headers):
if len(try_relays) == 0:
raise RelayError("No suitable relay found")
defer = try_relays[0].send_command(command, headers)
defer.addErrback(self._relay_error, try_relays, command, headers)
return defer
def get_summary(self):
defer = DeferredList([relay.send_command("summary", []).addErrback(self._summary_error, ip) for ip, relay in self.relays.iteritems()])
defer.addCallback(self._got_summaries)
return defer
def _summary_error(self, failure, ip):
log.error("Error processing query at relay %s: %s" % (ip, failure.value))
return cjson.encode(dict(status="error", ip=ip))
def _got_summaries(self, results):
return "[%s]" % ', '.join(result for succeeded, result in results if succeeded)
def get_statistics(self):
defer = DeferredList([relay.send_command("sessions", []) for relay in self.relays.itervalues()])
defer.addCallback(self._got_statistics)
return defer
def _got_statistics(self, results):
return "[%s]" % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result!='[]')
def connection_lost(self, relay):
if relay not in self.relays.itervalues():
return
if relay.authenticated:
del self.relays[relay.ip]
if self.shutting_down:
if len(self.relays) == 0:
self.defer.callback(None)
else:
- self.cleanup_timers[relay.ip] = reactor.callLater(Config.cleanup_dead_relays_after, self._do_cleanup, relay.ip)
+ self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip)
def _do_cleanup(self, ip):
log.debug("Doing cleanup for old relay %s" % ip)
del self.cleanup_timers[ip]
for call_id in [call_id for call_id, session in self.sessions.items() if session.relay_ip == ip]:
del self.sessions[call_id]
def shutdown(self):
if self.shutting_down:
return
self.shutting_down = True
for timer in self.cleanup_timers.itervalues():
timer.cancel()
if len(self.relays) == 0:
retval = succeed(None)
else:
for prot in self.relays.itervalues():
prot.transport.loseConnection()
self.defer = Deferred()
retval = self.defer
retval.addCallback(self._save_state)
return retval
def _save_state(self, result):
pickle.dump(self.sessions, open(process.runtime_file("dispatcher_state"), "w"))
class Dispatcher(object):
def __init__(self):
- self.accounting = [__import__("mediaproxy.interfaces.accounting.%s" % mod.lower(), globals(), locals(), [""]).Accounting() for mod in set(Config.accounting)]
+ self.accounting = [__import__("mediaproxy.interfaces.accounting.%s" % mod.lower(), globals(), locals(), [""]).Accounting() for mod in set(DispatcherConfig.accounting)]
self.cred = X509Credentials(cert_name='dispatcher')
self.relay_factory = RelayFactory(self)
- dispatcher_addr, dispatcher_port = Config.listen
+ dispatcher_addr, dispatcher_port = DispatcherConfig.listen
self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.cred, interface=dispatcher_addr)
self.opensips_factory = OpenSIPSControlFactory(self)
- socket_path = process.runtime_file(Config.socket_path)
+ socket_path = process.runtime_file(DispatcherConfig.socket_path)
unlink(socket_path)
self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory)
self.opensips_management = opensips.ManagementInterface()
self.management_factory = ManagementControlFactory(self)
- management_addr, management_port = Config.listen_management
- if Config.management_use_tls:
+ management_addr, management_port = DispatcherConfig.listen_management
+ if DispatcherConfig.management_use_tls:
self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.cred, interface=management_addr)
else:
self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr)
def run(self):
process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP)
process.signals.add_handler(signal.SIGINT, self._handle_SIGINT)
process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM)
for accounting_module in self.accounting:
accounting_module.start()
reactor.run(installSignalHandlers=False)
def send_command(self, command, headers):
return maybeDeferred(self.relay_factory.send_command, command, headers)
def update_statistics(self, stats):
log.debug("Got statistics: %s" % stats)
if stats["start_time"] is not None:
for accounting in self.accounting:
try:
accounting.do_accounting(stats)
except Exception, e:
log.error("An unhandled error occured while doing accounting: %s" % e)
log.err()
def _handle_SIGHUP(self, *args):
log.msg("Received SIGHUP, shutting down.")
reactor.callFromThread(self._shutdown)
def _handle_SIGINT(self, *args):
if process._daemon:
log.msg("Received SIGINT, shutting down.")
else:
log.msg("Received KeyboardInterrupt, exiting.")
reactor.callFromThread(self._shutdown)
def _handle_SIGTERM(self, *args):
log.msg("Received SIGTERM, shutting down.")
reactor.callFromThread(self._shutdown)
def _shutdown(self):
defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None])
defer.addCallback(lambda x: self.opensips_factory.shutdown())
defer.addCallback(lambda x: self.management_factory.shutdown())
defer.addCallback(lambda x: self.relay_factory.shutdown())
defer.addCallback(lambda x: self._stop())
def _stop(self):
for act in self.accounting:
act.stop()
reactor.stop()
diff --git a/mediaproxy/interfaces/accounting/database.py b/mediaproxy/interfaces/accounting/database.py
index 697db1d..512f884 100644
--- a/mediaproxy/interfaces/accounting/database.py
+++ b/mediaproxy/interfaces/accounting/database.py
@@ -1,104 +1,91 @@
# Copyright (C) 2008 AG Projects
# Author: Ruud Klaver
#
"""Implementation of database accounting"""
import cjson
from application import log
from application.python.queue import EventQueue
-from application.configuration import ConfigSection
from sqlobject import SQLObject, connectionForURI, sqlhub
from sqlobject import StringCol, BLOBCol, DatabaseIndex
from sqlobject.dberrors import DatabaseError, ProgrammingError, OperationalError
-from mediaproxy import configuration_filename
+from mediaproxy.configuration import DatabaseConfig
-class Config(ConfigSection):
- __cfgfile__ = configuration_filename
- __section__ = 'Database'
-
- dburi = ""
- sessions_table = "media_sessions"
- callid_column = "call_id"
- fromtag_column = "from_tag"
- totag_column = "to_tag"
- info_column = "info"
-
-
-if not Config.dburi:
+if not DatabaseConfig.dburi:
raise RuntimeError("Database accounting is enabled, but the database URI is not specified in config.ini")
-connection = connectionForURI(Config.dburi)
+connection = connectionForURI(DatabaseConfig.dburi)
sqlhub.processConnection = connection
class MediaSessions(SQLObject):
class sqlmeta:
- table = Config.sessions_table
- createSQL = {'mysql': 'ALTER TABLE %s ENGINE MyISAM' % Config.sessions_table}
+ table = DatabaseConfig.sessions_table
+ createSQL = {'mysql': 'ALTER TABLE %s ENGINE MyISAM' % DatabaseConfig.sessions_table}
cacheValues = False
- call_id = StringCol(length=255, dbName=Config.callid_column, notNone=True)
- from_tag = StringCol(length=64, dbName=Config.fromtag_column, notNone=True)
- to_tag = StringCol(length=64, dbName=Config.totag_column)
- info = BLOBCol(length=2**24-1, dbName=Config.info_column) # 2**24-1 makes it a mediumblob in mysql, that can hold 16 million bytes
+ call_id = StringCol(length=255, dbName=DatabaseConfig.callid_column, notNone=True)
+ from_tag = StringCol(length=64, dbName=DatabaseConfig.fromtag_column, notNone=True)
+ to_tag = StringCol(length=64, dbName=DatabaseConfig.totag_column)
+ info = BLOBCol(length=2**24-1, dbName=DatabaseConfig.info_column) # 2**24-1 makes it a mediumblob in mysql, that can hold 16 million bytes
## Indexes
callid_idx = DatabaseIndex('call_id', 'from_tag', 'to_tag', unique=True)
try:
MediaSessions.createTable(ifNotExists=True)
except OperationalError, e:
- log.error("cannot create the `%s' table: %s" % (Config.sessions_table, e))
+ log.error("cannot create the `%s' table: %s" % (DatabaseConfig.sessions_table, e))
log.msg("please make sure that the `%s' user has the CREATE and ALTER rights on the `%s' database" % (connection.user, connection.db))
log.msg("then restart the dispatcher, or you can create the table yourself using the following definition:")
log.msg("----------------- >8 -----------------")
sql, constraints = MediaSessions.createTableSQL()
statements = ';\n'.join([sql] + constraints) + ';'
log.msg(statements)
log.msg("----------------- >8 -----------------")
#raise RuntimeError(str(e))
class Accounting(object):
def __init__(self):
self.handler = DatabaseAccounting()
def start(self):
self.handler.start()
def do_accounting(self, stats):
self.handler.put(stats)
def stop(self):
self.handler.stop()
self.handler.join()
class DatabaseAccounting(EventQueue):
def __init__(self):
EventQueue.__init__(self, self.do_accounting)
def do_accounting(self, stats):
sqlrepr = connection.sqlrepr
- names = ', '.join([Config.callid_column, Config.fromtag_column, Config.totag_column, Config.info_column])
+ names = ', '.join([DatabaseConfig.callid_column, DatabaseConfig.fromtag_column, DatabaseConfig.totag_column, DatabaseConfig.info_column])
values = ', '.join((sqlrepr(v) for v in [stats["call_id"], stats["from_tag"], stats["to_tag"], cjson.encode(stats)]))
- q = """INSERT INTO %s (%s) VALUES (%s)""" % (Config.sessions_table, names, values)
+ q = """INSERT INTO %s (%s) VALUES (%s)""" % (DatabaseConfig.sessions_table, names, values)
try:
try:
connection.query(q)
except ProgrammingError, e:
try:
MediaSessions.createTable(ifNotExists=True)
except OperationalError:
raise e
else:
connection.query(q)
except DatabaseError, e:
log.error("failed to insert record into database: %s" % e)
diff --git a/mediaproxy/interfaces/accounting/radius.py b/mediaproxy/interfaces/accounting/radius.py
index bae38a8..30a25cc 100644
--- a/mediaproxy/interfaces/accounting/radius.py
+++ b/mediaproxy/interfaces/accounting/radius.py
@@ -1,135 +1,127 @@
# Copyright (C) 2008 AG Projects
# Author: Ruud Klaver
#
"""Implementation of RADIUS accounting"""
from application import log
from application.process import process
from application.python.queue import EventQueue
-from application.configuration import ConfigSection
import pyrad.client
import pyrad.dictionary
-from mediaproxy import configuration_filename
-
-class Config(ConfigSection):
- __cfgfile__ = configuration_filename
- __section__ = 'Radius'
-
- config_file = "/etc/opensips/radius/client.conf"
- additional_dictionary = "radius/dictionary"
+from mediaproxy.configuration import RadiusConfig
try:
from pyrad.dictfile import DictFile
except ImportError:
# helper class to make pyrad support the $INCLUDE statement in dictionary files
class RadiusDictionaryFile(object):
def __init__(self, base_file_name):
self.file_names = [base_file_name]
self.fd_stack = [open(base_file_name)]
def readlines(self):
while True:
line = self.fd_stack[-1].readline()
if line:
if line.startswith("$INCLUDE"):
file_name = line.rstrip("\n").split(None, 1)[1]
if file_name not in self.file_names:
self.file_names.append(file_name)
self.fd_stack.append(open(file_name))
continue
else:
yield line
else:
self.fd_stack.pop()
if len(self.fd_stack) == 0:
return
else:
del DictFile
class RadiusDictionaryFile(str):
pass
class Accounting(object):
def __init__(self):
self.handler = RadiusAccounting()
def start(self):
self.handler.start()
def do_accounting(self, stats):
self.handler.put(stats)
def stop(self):
self.handler.stop()
self.handler.join()
class RadiusAccounting(EventQueue, pyrad.client.Client):
def __init__(self):
- main_config_file = process.config_file(Config.config_file)
+ main_config_file = process.config_file(RadiusConfig.config_file)
if main_config_file is None:
- raise RuntimeError("Cannot find the radius configuration file: `%s'" % Config.config_file)
+ raise RuntimeError("Cannot find the radius configuration file: `%s'" % RadiusConfig.config_file)
try:
config = dict(line.rstrip("\n").split(None, 1) for line in open(main_config_file) if len(line.split(None, 1)) == 2 and not line.startswith("#"))
secrets = dict(line.rstrip("\n").split(None, 1) for line in open(config["servers"]) if len(line.split(None, 1)) == 2 and not line.startswith("#"))
server = config["acctserver"]
try:
server, acctport = server.split(":")
acctport = int(acctport)
except ValueError:
acctport = 1813
secret = secrets[server]
dicts = [RadiusDictionaryFile(config["dictionary"])]
- if Config.additional_dictionary:
- additional_dictionary = process.config_file(Config.additional_dictionary)
+ if RadiusConfig.additional_dictionary:
+ additional_dictionary = process.config_file(RadiusConfig.additional_dictionary)
if additional_dictionary:
dicts.append(RadiusDictionaryFile(additional_dictionary))
else:
- log.warn("Could not load additional RADIUS dictionary file: `%s'" % Config.additional_dictionary)
+ log.warn("Could not load additional RADIUS dictionary file: `%s'" % RadiusConfig.additional_dictionary)
raddict = pyrad.dictionary.Dictionary(*dicts)
timeout = int(config["radius_timeout"])
retries = int(config["radius_retries"])
except Exception:
log.fatal("cannot read the RADIUS configuration file")
raise
pyrad.client.Client.__init__(self, server, 1812, acctport, secret, raddict)
self.timeout = timeout
self.retries = retries
if "bindaddr" in config and config["bindaddr"] != "*":
self.bind((config["bindaddr"], 0))
EventQueue.__init__(self, self.do_accounting)
def do_accounting(self, stats):
attrs = {}
attrs["Acct-Status-Type"] = "Update"
attrs["User-Name"] = "mediaproxy@default"
attrs["Acct-Session-Id"] = stats["call_id"]
attrs["Acct-Session-Time"] = stats["duration"]
attrs["Acct-Input-Octets"] = sum(stream_stats['caller_bytes'] for stream_stats in stats['streams'])
attrs["Acct-Output-Octets"] = sum(stream_stats['callee_bytes'] for stream_stats in stats['streams'])
attrs["Sip-From-Tag"] = stats["from_tag"]
attrs["Sip-To-Tag"] = stats["to_tag"] or ""
attrs["NAS-IP-Address"] = stats["streams"][0]["caller_local"].split(":")[0]
attrs["Sip-User-Agents"] = (stats["caller_ua"] + "+" + stats["callee_ua"])[:253]
attrs["Sip-Applications"] = ', '.join(sorted(set(stream['media_type'] for stream in stats['streams'] if stream['start_time'] != stream['end_time'])))[:253]
attrs["Media-Codecs"] = ', '.join(stream['caller_codec'] for stream in stats['streams'])[:253]
if stats["timed_out"] and not stats.get("all_streams_ice", False):
attrs["Media-Info"] = "timeout"
elif stats.get("all_streams_ice", False):
attrs["Media-Info"] = "ICE session"
else:
attrs["Media-Info"] = ""
for stream in stats["streams"]:
if stream["post_dial_delay"] is not None:
attrs["Acct-Delay-Time"] = int(stream["post_dial_delay"])
break
try:
self.SendPacket(self.CreateAcctPacket(**attrs))
except Exception, e:
log.error("failed to send radius accounting record: %s" % e)
diff --git a/mediaproxy/interfaces/opensips.py b/mediaproxy/interfaces/opensips.py
index de712ee..83ccbb0 100644
--- a/mediaproxy/interfaces/opensips.py
+++ b/mediaproxy/interfaces/opensips.py
@@ -1,169 +1,159 @@
# Copyright (C) 2006-2008 AG Projects.
#
"""The OpenSIPS Management Interface"""
import socket
from collections import deque
from twisted.internet import reactor, defer
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.error import CannotListenError
from twisted.python.failure import Failure
-from application.configuration import ConfigSection
from application.python.types import Singleton
from application.process import process
from application.system import unlink
from application import log
-from mediaproxy import configuration_filename
-
-
-class OpenSIPSConfig(ConfigSection):
- __cfgfile__ = configuration_filename
- __section__ = 'OpenSIPS'
-
- socket_path = '/var/run/opensips/socket'
- max_connections = 10
-
+from mediaproxy.configuration import OpenSIPSConfig
class Error(Exception): pass
class CommandError(Error): pass
class TimeoutError(Error): pass
class NegativeReplyError(Error): pass
class Request(object):
def __init__(self, command):
self.command = command
self.deferred = defer.Deferred()
class UNIXSocketProtocol(DatagramProtocol):
noisy = False
def datagramReceived(self, data, address):
deferred = self.transport.deferred
if deferred is None or deferred.called:
return
# accumulate in a buffer until message end (do this later when implemented by opensips) -Dan
if not data:
failure = Failure(CommandError("Empty reply from OpenSIPS"))
deferred.errback(failure)
return
try:
status, msg = data.split('\n', 1)
except ValueError:
failure = Failure(CommandError("Missing line terminator after status line in OpenSIPS reply"))
deferred.errback(failure)
return
if status.upper() == '200 OK':
deferred.callback(msg)
else:
deferred.errback(Failure(NegativeReplyError(status)))
class UNIXSocketConnection(object):
timeout = 3
def __init__(self, socket_path):
self._initialized = False
self.path = socket_path
self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol())
reactor.addSystemEventTrigger('during', 'shutdown', self.close)
self.transport.deferred = None ## placeholder for the deferred used by a request
self._initialized = True
def close(self):
if self._initialized:
self.transport.stopListening()
unlink(self.path)
def _get_deferred(self):
return self.transport.deferred
def _set_deferred(self, d):
self.transport.deferred = d
deferred = property(_get_deferred, _set_deferred)
def _did_timeout(self, deferred):
if deferred.called:
return
deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout")))
def send(self, request):
self.deferred = request.deferred
try:
self.transport.write(request.command, OpenSIPSConfig.socket_path)
except socket.error, why:
log.error("cannot write request to `%s': %s" % (OpenSIPSConfig.socket_path, why[1]))
self.deferred.errback(Failure(CommandError("Cannot send request to OpenSIPS")))
else:
reactor.callLater(self.timeout, self._did_timeout, self.deferred)
class UNIXSocketConnectionPool(object):
"""Pool of UNIX socket connection to OpenSIPS"""
def __init__(self, max_connections=10, pool_id=''):
assert max_connections > 0, 'maximum should be > 0'
self.max = max_connections
self.id = pool_id
self.workers = 0
self.waiters = deque()
self.connections = deque()
def _create_connections_as_needed(self):
while self.workers < self.max and len(self.waiters) > len(self.connections):
socket_name = "opensips_%s%02d.sock" % (self.id, self.workers+1)
socket_path = process.runtime_file(socket_name)
unlink(socket_path)
try:
conn = UNIXSocketConnection(socket_path)
except CannotListenError, why:
log.error("cannot create an OpenSIPS UNIX socket connection: %s" % str(why))
break
self.connections.append(conn)
self.workers += 1
def _release_connection(self, result, conn):
self.connections.append(conn)
self._process_waiters()
return result
def _process_waiters(self):
while self.waiters:
try:
conn = self.connections.popleft()
except IndexError:
return
request = self.waiters.popleft()
request.deferred.addBoth(self._release_connection, conn)
conn.send(request)
def defer_to_connection(self, command):
request = Request(command)
self.waiters.append(request)
self._create_connections_as_needed()
self._process_waiters()
return request.deferred
class ManagementInterface(object):
__metaclass__ = Singleton
def __init__(self):
self.pool = UNIXSocketConnectionPool(OpenSIPSConfig.max_connections)
## Reply handlers __RH_xxx
def __RH_end_dialog(self, result):
if isinstance(result, Failure):
log.error("failed to end dialog: %s" % result.value)
return False
return True
def end_dialog(self, dialog_id):
cmd = ':dlg_end_dlg:\n%s\n%s\n\n' % (dialog_id.h_entry, dialog_id.h_id)
return self.pool.defer_to_connection(cmd).addBoth(self.__RH_end_dialog)
diff --git a/mediaproxy/mediacontrol.py b/mediaproxy/mediacontrol.py
index c03285b..99a1f61 100644
--- a/mediaproxy/mediacontrol.py
+++ b/mediaproxy/mediacontrol.py
@@ -1,811 +1,796 @@
# Copyright (C) 2008 AG Projects
# Author: Ruud Klaver
#
import struct
from time import time
from collections import deque
from operator import attrgetter
from itertools import chain
-from zope.interface import implements
+from application import log
from twisted.internet import reactor
from twisted.internet.interfaces import IReadDescriptor
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.error import CannotListenError
from twisted.python.log import Logger
+from zope.interface import implements
-from application import log
-from application.system import host
-from application.configuration import ConfigSection, ConfigSetting
-from application.configuration.datatypes import IPAddress
-
-from mediaproxy import configuration_filename
+from mediaproxy.configuration import RelayConfig
from mediaproxy.interfaces.system import _conntrack
from mediaproxy.iputils import is_routable_ip
from mediaproxy.scheduler import RecurrentCall, KeepRunning
UDP_TIMEOUT_FILE = "/proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream"
rtp_payloads = {
0: "G711u", 1: "1016", 2: "G721", 3: "GSM", 4: "G723", 5: "DVI4", 6: "DVI4",
7: "LPC", 8: "G711a", 9: "G722", 10: "L16", 11: "L16", 14: "MPA", 15: "G728",
18: "G729", 25: "CelB", 26: "JPEG", 28: "nv", 31: "H261", 32: "MPV", 33: "MP2T",
34: "H263"
}
class RelayPortsExhaustedError(Exception):
pass
-class Config(ConfigSection):
- __cfgfile__ = configuration_filename
- __section__ = 'Relay'
-
- relay_ip = ConfigSetting(type=IPAddress, value=host.default_ip)
- stream_timeout = 90
- on_hold_timeout = 7200
- traffic_sampling_period = 15
- userspace_transmit_every = 1
-
-
-if Config.relay_ip is None:
+if RelayConfig.relay_ip is None:
raise RuntimeError("Could not determine default host IP; either add default route or specify relay IP manually")
class Address(object):
"""Representation of an endpoint address"""
def __init__(self, host, port, in_use=True, got_rtp=False):
self.host = host
self.port = port
self.in_use = self.__nonzero__() and in_use
self.got_rtp = got_rtp
def __len__(self):
return 2
def __nonzero__(self):
return None not in (self.host, self.port)
def __getitem__(self, index):
return (self.host, self.port)[index]
def __contains__(self, item):
return item in (self.host, self.port)
def __iter__(self):
yield self.host
yield self.port
def __str__(self):
return self.__nonzero__() and ("%s:%d" % (self.host, self.port)) or "Unknown"
def __repr__(self):
return "%s(%r, %r, in_use=%r, got_rtp=%r)" % (self.__class__.__name__, self.host, self.port, self.in_use, self.got_rtp)
def forget(self):
self.host, self.port, self.in_use, self.got_rtp = None, None, False, False
@property
def unknown(self):
return None in (self.host, self.port)
@property
def obsolete(self):
return self.__nonzero__() and not self.in_use
class Counters(dict):
def __add__(self, other):
n = Counters(self)
for k, v in other.iteritems():
n[k] += v
return n
def __iadd__(self, other):
for k, v in other.iteritems():
self[k] += v
return self
@property
def caller_bytes(self):
return self['caller_bytes']
@property
def callee_bytes(self):
return self['callee_bytes']
@property
def caller_packets(self):
return self['caller_packets']
@property
def callee_packets(self):
return self['callee_packets']
@property
def relayed_bytes(self):
return self['caller_bytes'] + self['callee_bytes']
@property
def relayed_packets(self):
return self['caller_packets'] + self['callee_packets']
class StreamListenerProtocol(DatagramProtocol):
noisy = False
def __init__(self):
self.cb_func = None
self.sdp = None
self.send_packet_count = 0
self.stun_queue = []
def datagramReceived(self, data, (host, port)):
if self.cb_func is not None:
self.cb_func(host, port, data)
def set_remote_sdp(self, ip, port):
if is_routable_ip(ip):
self.sdp = ip, port
else:
self.sdp = None
def send(self, data, is_stun, ip=None, port=None):
if is_stun:
self.stun_queue.append(data)
if ip is None or port is None:
# this means that we have not received any packets from this host yet,
# so we have not learnt its address
if self.sdp is None:
# we can't do anything if we haven't received the SDP IP yet or
# it was in a private range
return
ip, port = self.sdp
# we learnt the IP, empty the STUN packets queue
if self.stun_queue:
for data in self.stun_queue:
self.transport.write(data, (ip, port))
self.stun_queue = []
if not is_stun:
- if not self.send_packet_count % Config.userspace_transmit_every:
+ if not self.send_packet_count % RelayConfig.userspace_transmit_every:
self.transport.write(data, (ip, port))
self.send_packet_count += 1
def _stun_test(data):
# Check if data is a STUN request and if it's a binding request
if len(data) < 20:
return False, False
msg_type, msg_len, magic = struct.unpack("!HHI", data[:8])
if msg_type & 0xc == 0 and magic == 0x2112A442:
if msg_type == 0x0001:
return True, True
else:
return True, False
else:
return False, False
class MediaSubParty(object):
def __init__(self, substream, listener):
self.substream = substream
self.listener = listener
self.listener.protocol.cb_func = self.got_data
self.remote = Address(None, None)
host = self.listener.protocol.transport.getHost()
self.local = Address(host.host, host.port)
self.timer = None
self.codec = "Unknown"
self.got_stun_probing = False
self.reset()
def reset(self):
if self.timer and self.timer.active():
self.timer.cancel()
- self.timer = reactor.callLater(Config.stream_timeout, self.substream.expired, "no-traffic timeout", Config.stream_timeout)
+ self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, "no-traffic timeout", RelayConfig.stream_timeout)
self.remote.in_use = False # keep remote address around but mark it as obsolete
self.remote.got_rtp = False
self.got_stun_probing = False
self.listener.protocol.send_packet_count = 0
def before_hold(self):
if self.timer and self.timer.active():
self.timer.cancel()
- self.timer = reactor.callLater(Config.on_hold_timeout, self.substream.expired, "on hold timeout", Config.on_hold_timeout)
+ self.timer = reactor.callLater(RelayConfig.on_hold_timeout, self.substream.expired, "on hold timeout", RelayConfig.on_hold_timeout)
def after_hold(self):
if self.timer and self.timer.active():
self.timer.cancel()
if not self.remote.in_use:
- self.timer = reactor.callLater(Config.stream_timeout, self.substream.expired, "no-traffic timeout", Config.stream_timeout)
+ self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, "no-traffic timeout", RelayConfig.stream_timeout)
def got_data(self, host, port, data):
if (host, port) == tuple(self.remote):
if self.remote.obsolete:
# the received packet matches the previously used IP/port,
# which has been made obsolete, so ignore it
return
else:
if self.remote.in_use:
# the received packet is different than the recorded IP/port,
# so we will discard it
return
# we have learnt the remote IP/port
self.remote.host, self.remote.port = host, port
self.remote.in_use = True
log.debug("Got traffic information for stream: %s" % self.substream.stream)
is_stun, is_binding_request = _stun_test(data)
self.substream.send_data(self, data, is_stun)
if not self.remote.got_rtp and not is_stun:
# This is the first RTP packet received
self.remote.got_rtp = True
if self.timer:
if self.timer.active():
self.timer.cancel()
self.timer = None
if self.codec == "Unknown" and self.substream is self.substream.stream.rtp:
try:
pt = ord(data[1]) & 127
except IndexError:
pass
else:
if pt > 95:
self.codec = "Dynamic(%d)" % pt
elif pt in rtp_payloads:
self.codec = rtp_payloads[pt]
else:
self.codec = "Unknown(%d)" % pt
self.substream.check_create_conntrack()
if is_binding_request:
self.got_stun_probing = True
def cleanup(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = None
self.listener.protocol.cb_func = None
self.substream = None
class MediaSubStream(object):
def __init__(self, stream, listener_caller, listener_callee):
self.stream = stream
self.forwarding_rule = None
self.caller = MediaSubParty(self, listener_caller)
self.callee = MediaSubParty(self, listener_callee)
self._counters = Counters(caller_bytes=0, callee_bytes=0, caller_packets=0, callee_packets=0)
@property
def counters(self):
"""Accumulated counters from all the forwarding rules the stream had"""
if self.forwarding_rule is None:
return self._counters
else:
try:
return self._counters + self.forwarding_rule.counters
except _conntrack.Error:
return self._counters
def _stop_relaying(self):
if self.forwarding_rule is not None:
try:
self._counters += self.forwarding_rule.counters
except _conntrack.Error:
pass
self.forwarding_rule = None
def reset(self, party):
if party == "caller":
self.caller.reset()
else:
self.callee.reset()
self._stop_relaying()
def check_create_conntrack(self):
if self.stream.first_media_time is None:
self.stream.first_media_time = time()
if self.caller.remote.in_use and self.caller.remote.got_rtp and self.callee.remote.in_use and self.callee.remote.got_rtp:
self.forwarding_rule = _conntrack.ForwardingRule(self.caller.remote, self.caller.local, self.callee.remote, self.callee.local, self.stream.session.mark)
self.forwarding_rule.expired_func = self.conntrack_expired
def send_data(self, source, data, is_stun):
if source is self.caller:
dest = self.callee
else:
dest = self.caller
if dest.remote:
# if we have already learnt the remote address of the destination, use that
ip, port = dest.remote.host, dest.remote.port
dest.listener.protocol.send(data, is_stun, ip, port)
else:
# otherwise use the IP/port specified in the SDP, if public
dest.listener.protocol.send(data, is_stun)
def conntrack_expired(self):
try:
timeout_wait = int(open(UDP_TIMEOUT_FILE).read())
except:
timeout_wait = 0
self.expired("conntrack timeout", timeout_wait)
def expired(self, reason, timeout_wait):
self._stop_relaying()
self.stream.substream_expired(self, reason, timeout_wait)
def cleanup(self):
self.caller.cleanup()
self.callee.cleanup()
self._stop_relaying()
self.stream = None
class MediaParty(object):
def __init__(self, stream):
self.manager = stream.session.manager
self._remote_sdp = None
self.is_on_hold = False
self.uses_ice = False
while True:
self.listener_rtp = None
self.ports = port_rtp, port_rtcp = self.manager.get_ports()
try:
- self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=Config.relay_ip)
- self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=Config.relay_ip)
+ self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=RelayConfig.relay_ip)
+ self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=RelayConfig.relay_ip)
except CannotListenError:
if self.listener_rtp is not None:
self.listener_rtp.stopListening()
self.manager.set_bad_ports(self.ports)
log.warn("Cannot use port pair %d/%d" % self.ports)
else:
break
def _get_remote_sdp(self):
return self._remote_sdp
def _set_remote_sdp(self, (ip, port)):
self._remote_sdp = ip, port
self.listener_rtp.protocol.set_remote_sdp(ip, port)
remote_sdp = property(_get_remote_sdp, _set_remote_sdp)
def cleanup(self):
self.listener_rtp.stopListening()
self.listener_rtcp.stopListening()
self.manager.free_ports(self.ports)
self.manager = None
class MediaStream(object):
def __init__(self, session, media_type, media_ip, media_port, direction, media_parameters, initiating_party):
self.is_alive = True
self.session = session
self.media_type = media_type
self.caller = MediaParty(self)
self.callee = MediaParty(self)
self.rtp = MediaSubStream(self, self.caller.listener_rtp, self.callee.listener_rtp)
self.rtcp = MediaSubStream(self, self.caller.listener_rtcp, self.callee.listener_rtcp)
getattr(self, initiating_party).remote_sdp = (media_ip, media_port)
getattr(self, initiating_party).uses_ice = (media_parameters.get("ice", "no") == "yes")
self.check_hold(initiating_party, direction, media_ip)
self.create_time = time()
self.first_media_time = None
self.start_time = None
self.end_time = None
self.status = "active"
self.timeout_wait = 0
def __str__(self):
if self.caller.remote_sdp is None:
src = "Unknown"
else:
src = "%s:%d" % self.caller.remote_sdp
if self.caller.is_on_hold:
src += " ON HOLD"
if self.caller.uses_ice:
src += " (ICE)"
if self.callee.remote_sdp is None:
dst = "Unknown"
else:
dst = "%s:%d" % self.callee.remote_sdp
if self.callee.is_on_hold:
dst += " ON HOLD"
if self.callee.uses_ice:
dst += " (ICE)"
rtp = self.rtp
rtcp = self.rtcp
return "(%s) %s (RTP: %s, RTCP: %s) <-> %s <-> %s <-> %s (RTP: %s, RTCP: %s)" % (
self.media_type, src, rtp.caller.remote, rtcp.caller.remote, rtp.caller.local, rtp.callee.local, dst, rtp.callee.remote, rtcp.callee.remote)
@property
def counters(self):
return self.rtp.counters + self.rtcp.counters
@property
def is_on_hold(self):
return self.caller.is_on_hold or self.callee.is_on_hold
def check_hold(self, party, direction, ip):
previous_hold = self.is_on_hold
party = getattr(self, party)
if direction == "sendonly" or direction == "inactive":
party.is_on_hold = True
elif ip == "0.0.0.0":
party.is_on_hold = True
else:
party.is_on_hold = False
if previous_hold and not self.is_on_hold:
for substream in [self.rtp, self.rtcp]:
for subparty in [substream.caller, substream.callee]:
self.status = "active"
subparty.after_hold()
if not previous_hold and self.is_on_hold:
for substream in [self.rtp, self.rtcp]:
for subparty in [substream.caller, substream.callee]:
self.status = "on hold"
subparty.before_hold()
def reset(self, party, media_ip, media_port):
self.rtp.reset(party)
self.rtcp.reset(party)
getattr(self, party).remote_sdp = (media_ip, media_port)
def substream_expired(self, substream, reason, timeout_wait):
if substream is self.rtp and reason == "no-traffic timeout" and self.caller.uses_ice and self.callee.uses_ice and (substream.caller.got_stun_probing or substream.callee.got_stun_probing):
reason = "unselected ICE candidate"
if substream is self.rtcp or (self.is_on_hold and reason=='conntrack timeout'):
# Forget about the remote addresses, this will cause any
# re-occurence of the same traffic to be forwarded again
substream.caller.remote.forget()
substream.caller.listener.protocol.send_packet_count = 0
substream.callee.remote.forget()
substream.callee.listener.protocol.send_packet_count = 0
else:
session = self.session
self.cleanup(reason)
self.timeout_wait = timeout_wait
session.stream_expired(self)
def cleanup(self, status="closed"):
if self.is_alive:
self.is_alive = False
self.status = status
self.caller.cleanup()
self.callee.cleanup()
self.rtp.cleanup()
self.rtcp.cleanup()
self.session = None
self.end_time = time()
class Session(object):
def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark = 0):
self.manager = manager
self.dispatcher = dispatcher
self.call_id = call_id
self.from_tag = from_tag
self.to_tag = None
self.mark = mark
self.from_uri = from_uri
self.to_uri = to_uri
self.caller_ua = None
self.callee_ua = None
self.cseq = None
self.previous_cseq = None
self.streams = {}
self.start_time = None
self.end_time = None
self.update_media(cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq)
def __str__(self):
return "%s: %s (%s) --> %s" % (self.call_id, self.from_uri, self.from_tag, self.to_uri)
def update_media(self, cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq):
if self.cseq is None:
old_cseq = (0,0)
else:
old_cseq = self.cseq
if is_caller_cseq:
cseq = (cseq, old_cseq[1])
if self.to_tag is None and to_tag is not None:
self.to_tag = to_tag
else:
cseq = (old_cseq[0], cseq)
if is_downstream:
party = "caller"
if self.caller_ua is None:
self.caller_ua = user_agent
else:
party = "callee"
if self.callee_ua is None:
self.callee_ua = user_agent
if self.cseq is None or cseq > self.cseq:
if not media_list:
return
log.debug("Received new SDP offer")
self.streams[cseq] = new_streams = []
if self.cseq is None:
old_streams = []
else:
old_streams = self.streams[self.cseq]
for media_type, media_ip, media_port, media_direction, media_parameters in media_list:
stream = None
for old_stream in old_streams:
old_remote = getattr(old_stream, party).remote_sdp
if old_remote is not None:
old_ip, old_port = old_remote
else:
old_ip, old_port = None, None
if old_stream.is_alive and old_stream.media_type == media_type and ((media_ip, media_port) in ((old_ip, old_port), ('0.0.0.0', old_port), (old_ip, 0))):
stream = old_stream
stream.check_hold(party, media_direction, media_ip)
log.debug("Found matching existing stream: %s" % stream)
break
if stream is None:
stream = MediaStream(self, media_type, media_ip, media_port, media_direction, media_parameters, party)
log.debug("Added new stream: %s" % stream)
if media_port == 0:
stream.cleanup()
log.debug("Stream explicitly closed: %s" % stream)
new_streams.append(stream)
if self.previous_cseq is not None:
for stream in self.streams[self.previous_cseq]:
if stream not in self.streams[self.cseq] + new_streams:
stream.cleanup()
self.previous_cseq = self.cseq
self.cseq = cseq
elif self.cseq == cseq:
log.debug("Received updated SDP answer")
now = time()
if self.start_time is None:
self.start_time = now
current_streams = self.streams[cseq]
for stream in current_streams:
if stream.start_time is None:
stream.start_time = now
if to_tag is not None and not media_list:
return
if len(media_list) < len(current_streams):
for stream in current_streams[len(media_list):]:
log.debug("Stream rejected by not being included in the SDP answer: %s" % stream)
stream.cleanup("rejected")
if stream.start_time is None:
stream.start_time = now
for stream, (media_type, media_ip, media_port, media_direction, media_parameters) in zip(current_streams, media_list):
if stream.media_type != media_type:
raise ValueError('Media types do not match: "%s" and "%s"' % (stream.media_type, media_type))
if media_port == 0:
log.debug("Stream explicitly rejected: %s" % stream)
stream.cleanup("rejected")
continue
stream.check_hold(party, media_direction, media_ip)
party_info = getattr(stream, party)
party_info.uses_ice = (media_parameters.get("ice", "no") == "yes")
if party_info.remote_sdp is None or party_info.remote_sdp[0] == "0.0.0.0":
party_info.remote_sdp = (media_ip, media_port)
log.debug("Got initial answer from %s for stream: %s" % (party, stream))
else:
if party_info.remote_sdp[1] != media_port or (party_info.remote_sdp[0] != media_ip != '0.0.0.0'):
stream.reset(party, media_ip, media_port)
log.debug("Updated %s for stream: %s" % (party, stream))
else:
log.debug("Unchanged stream: %s" % stream)
if self.previous_cseq is not None:
for stream in [stream for stream in self.streams[self.previous_cseq] if stream not in current_streams]:
log.debug("Removing old stream: %s" % stream)
stream.cleanup()
else:
log.debug("Received old CSeq %d:%d, ignoring" % cseq)
def get_local_media(self, is_downstream, cseq, is_caller_cseq):
if is_caller_cseq:
pos = 0
else:
pos = 1
try:
cseq = max(key for key in self.streams.keys() if key[pos] == cseq)
except ValueError:
return None
if is_downstream:
retval = [(stream.status in ["active", "on hold"]) and tuple(stream.rtp.callee.local) or (stream.rtp.callee.local.host, 0) for stream in self.streams[cseq]]
else:
retval = [(stream.status in ["active", "on hold"]) and tuple(stream.rtp.caller.local) or (stream.rtp.caller.local.host, 0) for stream in self.streams[cseq]]
return retval
def cleanup(self):
self.end_time = time()
for cseq in [self.previous_cseq, self.cseq]:
if cseq is not None:
for stream in self.streams[cseq]:
stream.cleanup()
def stream_expired(self, stream):
active_streams = set()
for cseq in [self.previous_cseq, self.cseq]:
if cseq is not None:
active_streams.update([stream for stream in self.streams[cseq] if stream.is_alive])
if len(active_streams) == 0:
self.manager.session_expired(self.call_id, self.from_tag)
@property
def duration(self):
if self.start_time is not None:
if self.end_time is not None:
return int(self.end_time - self.start_time)
else:
return int(time() - self.start_time)
else:
return 0
@property
def relayed_bytes(self):
return sum(stream.counters.relayed_bytes for stream in set(chain(*self.streams.itervalues())))
@property
def statistics(self):
all_streams = set(chain(*self.streams.itervalues()))
attributes = ('call_id', 'from_tag', 'from_uri', 'to_tag', 'to_uri', 'start_time', 'duration')
stats = dict((name, getattr(self, name)) for name in attributes)
stats['caller_ua'] = self.caller_ua or 'Unknown'
stats['callee_ua'] = self.callee_ua or 'Unknown'
stats['streams'] = streams = []
stream_attributes = ('media_type', 'status', 'timeout_wait')
for stream in sorted(all_streams, key=attrgetter('start_time')):
info = dict((name, getattr(stream, name)) for name in stream_attributes)
info['caller_codec'] = stream.rtp.caller.codec
info['callee_codec'] = stream.rtp.callee.codec
if stream.start_time is None:
info['start_time'] = info['end_time'] = None
elif self.start_time is None:
info['start_time'] = info['end_time'] = 0
else:
info['start_time'] = max(int(stream.start_time - self.start_time), 0)
if stream.status == 'rejected':
info['end_time'] = info['start_time']
else:
if stream.end_time is None:
info['end_time'] = stats['duration']
else:
info['end_time'] = min(int(stream.end_time - self.start_time), self.duration)
if stream.first_media_time is None:
info['post_dial_delay'] = None
else:
info['post_dial_delay'] = stream.first_media_time - stream.create_time
caller = stream.rtp.caller
callee = stream.rtp.callee
info.update(stream.counters)
info['caller_local'] = str(caller.local)
info['callee_local'] = str(callee.local)
info['caller_remote'] = str(caller.remote)
info['callee_remote'] = str(callee.remote)
streams.append(info)
return stats
class SessionManager(Logger):
implements(IReadDescriptor)
def __init__(self, relay, start_port, end_port):
self.relay = relay
self.ports = deque((i, i+1) for i in xrange(start_port, end_port, 2))
self.bad_ports = deque()
self.sessions = {}
self.watcher = _conntrack.ExpireWatcher()
self.active_byte_counter = 0 # relayed byte counter for sessions active during last speed measurement
self.closed_byte_counter = 0 # relayed byte counter for sessions closed after last speed measurement
self.bps_relayed = 0
- if Config.traffic_sampling_period > 0:
- self.speed_calculator = RecurrentCall(Config.traffic_sampling_period, self._measure_speed)
+ if RelayConfig.traffic_sampling_period > 0:
+ self.speed_calculator = RecurrentCall(RelayConfig.traffic_sampling_period, self._measure_speed)
else:
self.speed_calculator = None
reactor.addReader(self)
def _measure_speed(self):
start_time = time()
current_byte_counter = sum(session.relayed_bytes for session in self.sessions.itervalues())
- self.bps_relayed = 8 * (current_byte_counter + self.closed_byte_counter - self.active_byte_counter) / Config.traffic_sampling_period
+ self.bps_relayed = 8 * (current_byte_counter + self.closed_byte_counter - self.active_byte_counter) / RelayConfig.traffic_sampling_period
self.active_byte_counter = current_byte_counter
self.closed_byte_counter = 0
us_taken = int((time() - start_time) * 1000000)
if us_taken > 10000:
log.warn("Aggregate speed calculation time exceeded 10ms: %d us for %d sessions" % (us_taken, len(self.sessions)))
return KeepRunning
# implemented for IReadDescriptor
def fileno(self):
return self.watcher.fd
def doRead(self):
stream = self.watcher.read()
if stream:
stream.expired_func()
def connectionLost(self, reason):
reactor.removeReader(self)
# port management
def get_ports(self):
if len(self.bad_ports) > len(self.ports):
log.debug("Excessive amount of bad ports, doing cleanup")
self.ports.extend(self.bad_ports)
self.bad_ports = deque()
try:
return self.ports.popleft()
except IndexError:
raise RelayPortsExhaustedError()
def set_bad_ports(self, ports):
self.bad_ports.append(ports)
def free_ports(self, ports):
self.ports.append(ports)
# called by higher level
def _find_session_key(self, call_id, from_tag, to_tag):
key_from = (call_id, from_tag)
if key_from in self.sessions:
return key_from
if to_tag:
key_to = (call_id, to_tag)
if key_to in self.sessions:
return key_to
return None
def has_session(self, call_id, from_tag, to_tag=None, **kw):
return any((call_id, tag) in self.sessions for tag in (from_tag, to_tag) if tag is not None)
def update_session(self, dispatcher, call_id, from_tag, from_uri, to_uri, cseq, user_agent, type, media=[], to_tag=None, **kw):
key = self._find_session_key(call_id, from_tag, to_tag)
if key:
session = self.sessions[key]
log.debug("updating existing session %s" % session)
is_downstream = (session.from_tag != from_tag) ^ (type == "request")
is_caller_cseq = (session.from_tag == from_tag)
session.update_media(cseq, to_tag, user_agent, media, is_downstream, is_caller_cseq)
elif type == "reply" and not media:
return None
else:
is_downstream = type == "request"
is_caller_cseq = True
session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq)
self.sessions[(call_id, from_tag)] = session
self.relay.add_session(dispatcher)
log.debug("created new session %s" % session)
return session.get_local_media(is_downstream, cseq, is_caller_cseq)
def remove_session(self, call_id, from_tag, to_tag=None, **kw):
key = self._find_session_key(call_id, from_tag, to_tag)
try:
session = self.sessions[key]
except KeyError:
log.warn("The dispatcher tried to remove a session which is no longer present on the relay")
return None
log.debug("removing session %s" % session)
session.cleanup()
self.closed_byte_counter += session.relayed_bytes
del self.sessions[key]
reactor.callLater(0, self.relay.remove_session, session.dispatcher)
return session
def session_expired(self, call_id, from_tag):
key = (call_id, from_tag)
try:
session = self.sessions[key]
except KeyError:
log.warn("A session expired that was no longer present on the relay")
return
log.debug("expired session %s" % session)
session.cleanup()
self.closed_byte_counter += session.relayed_bytes
del self.sessions[key]
self.relay.session_expired(session)
self.relay.remove_session(session.dispatcher)
def cleanup(self):
if self.speed_calculator is not None:
self.speed_calculator.cancel()
for key in self.sessions.keys():
self.session_expired(*key)
@property
def statistics(self):
return [session.statistics for session in self.sessions.itervalues()]
@property
def stream_count(self):
stream_count = {}
for session in self.sessions.itervalues():
for stream in set(chain(*session.streams.itervalues())):
if stream.is_alive:
stream_count[stream.media_type] = stream_count.get(stream.media_type, 0) + 1
return stream_count
diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py
index a22d76e..70d745b 100644
--- a/mediaproxy/relay.py
+++ b/mediaproxy/relay.py
@@ -1,446 +1,389 @@
# Copyright (C) 2008 AG Projects
# Author: Ruud Klaver
#
"""Implementation of the MediaProxy relay"""
import cjson
import signal
import resource
-import re
from time import time
try: from twisted.internet import epollreactor; epollreactor.install()
except: raise RuntimeError("mandatory epoll reactor support is missing from the twisted framework")
+from application import log
+from application.process import process
+from gnutls.errors import CertificateError, CertificateSecurityError
from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError
from twisted.internet.protocol import ClientFactory
from twisted.internet.defer import DeferredList, succeed
from twisted.internet import reactor
from twisted.python import failure
from twisted.names import dns
from twisted.names.client import lookupService
from twisted.names.error import DomainError
-from gnutls.errors import CertificateError, CertificateSecurityError
-
-from application import log
-from application.configuration import ConfigSection, ConfigSetting
-from application.configuration.datatypes import IPAddress
-from application.process import process
-from application.system import host
-
-from mediaproxy.tls import X509Credentials, X509NameValidator
+from mediaproxy import __version__
+from mediaproxy.configuration import RelayConfig
from mediaproxy.headers import DecodingDict, DecodingError
from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError
from mediaproxy.scheduler import RecurrentCall, KeepRunning
-from mediaproxy import __version__, configuration_filename, default_dispatcher_port
-
-
-class DispatcherAddress(tuple):
- def __new__(cls, value):
- match = re.search(r"^(?P.+?):(?P\d+)$", value)
- if match:
- address = str(match.group("address"))
- port = int(match.group("port"))
- else:
- address = value
- port = default_dispatcher_port
- try:
- address = IPAddress(address)
- is_domain = False
- except ValueError:
- is_domain = True
- return tuple.__new__(cls, (address, port, is_domain))
-
-class DispatcherAddressList(list):
- def __init__(cls, value):
- list.__init__(cls, (DispatcherAddress(dispatcher) for dispatcher in re.split(r'\s*,\s*|\s+', value)))
-
-class PortRange(object):
- """A port range in the form start:end with start and end being even numbers in the [1024, 65536] range"""
- def __init__(self, value):
- self.start, self.end = [int(p) for p in value.split(':', 1)]
- allowed = xrange(1024, 65537, 2)
- if not (self.start in allowed and self.end in allowed and self.start < self.end):
- raise ValueError("bad range: %r: ports must be even numbers in the range [1024, 65536] with start < end" % value)
- def __repr__(self):
- return "%s('%d:%d')" % (self.__class__.__name__, self.start, self.end)
-
-class PositiveInteger(int):
- def __new__(cls, value):
- instance = int.__new__(cls, value)
- if instance < 1:
- raise ValueError("value must be a positive integer")
- return instance
-
-
-class Config(ConfigSection):
- __cfgfile__ = configuration_filename
- __section__ = 'Relay'
-
- dispatchers = ConfigSetting(type=DispatcherAddressList, value=[])
- relay_ip = ConfigSetting(type=IPAddress, value=host.default_ip)
- port_range = PortRange("50000:60000")
- dns_check_interval = PositiveInteger(60)
- keepalive_interval = PositiveInteger(10)
- reconnect_delay = PositiveInteger(10)
- passport = ConfigSetting(type=X509NameValidator, value=None)
-
+from mediaproxy.tls import X509Credentials
## Increase the system limit for the maximum number of open file descriptors
## to be able to handle connections to all ports in port_range
try:
- fd_limit = Config.port_range.end - Config.port_range.start + 1000
+ fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000
resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit))
except ValueError:
raise RuntimeError("Cannot set resource limit for maximum open file descriptors to %d" % fd_limit)
else:
new_limits = resource.getrlimit(resource.RLIMIT_NOFILE)
if new_limits < (fd_limit, fd_limit):
raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit))
else:
log.msg("Set resource limit for maximum open file descriptors to %d" % fd_limit)
class RelayClientProtocol(LineOnlyReceiver):
noisy = False
required_headers = {'update': set(['call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type']),
'remove': set(['call_id', 'from_tag']),
'summary': set(),
'sessions': set()}
def __init__(self):
self.command = None
self.seq = None
self._connection_watcher = None
self._queued_keepalives = 0
def _send_keepalive(self):
if self._queued_keepalives >= 3:
# 3 keepalives in a row didn't get an answer. assume connection is down.
log.error("missed 3 keepalive answers in a row. assuming the connection is down.")
# do not use loseConnection() as it waits to flush the output buffers.
reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError()))
return None
self.transport.write("ping\r\n")
self._queued_keepalives += 1
return KeepRunning
def connectionMade(self):
peer = self.transport.getPeer()
log.debug("Connected to dispatcher at %s:%d" % (peer.host, peer.port))
- if Config.passport is not None:
+ if RelayConfig.passport is not None:
peer_cert = self.transport.getPeerCertificate()
- if not Config.passport.accept(peer_cert):
+ if not RelayConfig.passport.accept(peer_cert):
self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted'))
- self._connection_watcher = RecurrentCall(Config.keepalive_interval, self._send_keepalive)
+ self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive)
def connectionLost(self, reason):
if self._connection_watcher is not None:
self._connection_watcher.cancel()
self._connection_watcher = None
self._queued_keepalives = 0
def lineReceived(self, line):
if line == 'pong':
self._queued_keepalives -= 1
return
if self.command is None:
try:
command, seq = line.split()
except ValueError:
log.error("Could not decode command/sequence number pair from dispatcher: %s" % line)
return
if command in self.required_headers:
self.command = command
self.seq = seq
self.headers = DecodingDict()
else:
log.error("Unknown command: %s" % command)
self.transport.write("%s error\r\n" % seq)
elif line == "":
try:
missing_headers = self.required_headers[self.command].difference(self.headers)
if missing_headers:
for header in missing_headers:
log.error("Missing mandatory header '%s' from '%s' command" % (header, self.command))
response = "error"
else:
try:
response = self.factory.parent.got_command(self.factory.host, self.command, self.headers)
except:
log.err()
response = "error"
finally:
self.transport.write("%s %s\r\n" % (self.seq, response))
self.command = None
else:
try:
name, value = line.split(": ", 1)
except ValueError:
log.error("Unable to parse header: %s" % line)
else:
try:
self.headers[name] = value
except DecodingError, e:
log.error("Could not decode header: %s" % e)
class DispatcherConnectingFactory(ClientFactory):
noisy = False
protocol = RelayClientProtocol
def __init__(self, parent, host, port):
self.parent = parent
self.host = (host, port)
self.delayed = None
self.connection_lost = False
def __eq__(self, other):
return self.host == other.host
def clientConnectionFailed(self, connector, reason):
- log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (Config.reconnect_delay, reason.value))
+ log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value))
if self.parent.connector_needs_reconnect(connector):
- self.delayed = reactor.callLater(Config.reconnect_delay, connector.connect)
+ self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect)
def clientConnectionLost(self, connector, reason):
self.cancel_delayed()
if reason.type != ConnectionDone:
log.error("Connection with dispatcher at %(host)s:%(port)d was lost: %%s" % connector.__dict__ % reason.value)
else:
log.msg("Connection with dispatcher at %(host)s:%(port)d was closed" % connector.__dict__)
if self.parent.connector_needs_reconnect(connector):
if isinstance(reason.value, CertificateError) or self.connection_lost:
- self.delayed = reactor.callLater(Config.reconnect_delay, connector.connect)
+ self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect)
else:
- self.delayed = reactor.callLater(min(Config.reconnect_delay, 1), connector.connect)
+ self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect)
self.connection_lost = True
def buildProtocol(self, addr):
self.delayed = reactor.callLater(5, self._connected_successfully)
return ClientFactory.buildProtocol(self, addr)
def _connected_successfully(self):
self.connection_lost = False
def cancel_delayed(self):
if self.delayed:
if self.delayed.active():
self.delayed.cancel()
self.delayed = None
class SRVMediaRelayBase(object):
def __init__(self):
- self.srv_monitor = RecurrentCall(Config.dns_check_interval, self._do_lookup)
+ self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup)
self._do_lookup()
def _do_lookup(self):
defers = []
- for addr, port, is_domain in Config.dispatchers:
+ for addr, port, is_domain in RelayConfig.dispatchers:
if is_domain:
defer = lookupService("_sip._udp.%s" % addr)
defer.addCallback(self._cb_got_srv, port)
defer.addErrback(self._eb_no_srv, addr, port)
defers.append(defer)
else:
defers.append(succeed((addr, port)))
defer = DeferredList(defers)
defer.addCallback(self._cb_got_all)
return KeepRunning
def _cb_got_srv(self, (answers, auth, add), port):
for answer in answers:
if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."):
return str(answer.payload.target), port
raise DomainError
def _eb_no_srv(self, failure, addr, port):
failure.trap(DomainError)
return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr)
def _eb_no_dns(self, failure, addr):
failure.trap(DNSLookupError)
log.error("Could resolve neither SRV nor A record for '%s'" % addr)
def _cb_got_all(self, results):
if not self.shutting_down:
dispatchers = [result[1] for result in results if result[0] and result[1] is not None]
self.update_dispatchers(dispatchers)
def update_dispatchers(self, dispatchers):
raise NotImplementedError()
def run(self):
process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP)
process.signals.add_handler(signal.SIGINT, self._handle_SIGINT)
process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM)
reactor.run(installSignalHandlers=False)
def _handle_SIGHUP(self, *args):
log.msg("Received SIGHUP, shutting down after all sessions have expired.")
reactor.callFromThread(self.shutdown, graceful=True)
def _handle_SIGINT(self, *args):
if process._daemon:
log.msg("Received SIGINT, shutting down.")
else:
log.msg("Received KeyboardInterrupt, exiting.")
reactor.callFromThread(self.shutdown)
def _handle_SIGTERM(self, *args):
log.msg("Received SIGTERM, shutting down.")
reactor.callFromThread(self.shutdown)
def shutdown(self, graceful=False):
raise NotImplementedError()
def on_shutdown(self):
pass
def _shutdown(self):
reactor.stop()
self.on_shutdown()
try:
from mediaproxy.sipthor import SIPThorMediaRelayBase
MediaRelayBase = SIPThorMediaRelayBase
except ImportError:
MediaRelayBase = SRVMediaRelayBase
class MediaRelay(MediaRelayBase):
def __init__(self):
self.cred = X509Credentials(cert_name='relay')
- self.session_manager = SessionManager(self, Config.port_range.start, Config.port_range.end)
+ self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end)
self.dispatchers = set()
self.dispatcher_session_count = {}
self.dispatcher_connectors = {}
self.old_connectors = {}
self.shutting_down = False
self.graceful_shutdown = False
self.start_time = time()
MediaRelayBase.__init__(self)
@property
def status(self):
if self.graceful_shutdown or self.shutting_down:
return 'halting'
else:
return 'active'
def update_dispatchers(self, dispatchers):
dispatchers = set(dispatchers)
for new_dispatcher in dispatchers.difference(self.dispatchers):
if new_dispatcher in self.old_connectors.iterkeys():
log.debug('Restoring old dispatcher at %s:%d' % new_dispatcher)
self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher)
else:
log.debug('Adding new dispatcher at %s:%d' % new_dispatcher)
dispatcher_addr, dispatcher_port = new_dispatcher
factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port)
self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.cred)
for old_dispatcher in self.dispatchers.difference(dispatchers):
log.debug('Removing old dispatcher at %s:%d' % old_dispatcher)
self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher)
self._check_disconnect(old_dispatcher)
self.dispatchers = dispatchers
def got_command(self, dispatcher, command, headers):
if command == "summary":
- summary = {'ip' : Config.relay_ip,
+ summary = {'ip' : RelayConfig.relay_ip,
'version' : __version__,
'status' : self.status,
'uptime' : int(time() - self.start_time),
'session_count' : len(self.session_manager.sessions),
'stream_count' : self.session_manager.stream_count,
'bps_relayed' : self.session_manager.bps_relayed}
return cjson.encode(summary)
elif command == "sessions":
return cjson.encode(self.session_manager.statistics)
elif command == "update":
if self.graceful_shutdown or self.shutting_down:
if not self.session_manager.has_session(**headers):
log.debug("cannot add new session: media-relay is shutting down")
return 'halting'
try:
local_media = self.session_manager.update_session(dispatcher, **headers)
except RelayPortsExhaustedError:
log.error("Could not reserve relay ports for session, all allocated ports are being used")
return "error"
if local_media:
return " ".join([local_media[0][0]] + [str(media[1]) for media in local_media])
else: # remove
session = self.session_manager.remove_session(**headers)
if session is None:
return "error"
else:
return cjson.encode(session.statistics)
def session_expired(self, session):
connector = self.dispatcher_connectors.get(session.dispatcher)
if connector is None:
connector = self.old_connectors.get(session.dispatcher)
if connector and connector.state == "connected":
connector.transport.write(" ".join(["expired", cjson.encode(session.statistics)]) + "\r\n")
else:
log.warn("dispatcher for expired session is no longer online, statistics are lost!")
def add_session(self, dispatcher):
self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1
def remove_session(self, dispatcher):
self.dispatcher_session_count[dispatcher] -= 1
if self.dispatcher_session_count[dispatcher] == 0:
del self.dispatcher_session_count[dispatcher]
if self.graceful_shutdown and not self.dispatcher_session_count:
self.shutdown()
elif dispatcher in self.old_connectors:
self._check_disconnect(dispatcher)
def _check_disconnect(self, dispatcher):
connector = self.old_connectors[dispatcher]
if self.dispatcher_session_count.get(dispatcher, 0) == 0:
old_state = connector.state
connector.factory.cancel_delayed()
connector.disconnect()
if old_state == "disconnected":
del self.old_connectors[dispatcher]
if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0:
self._shutdown()
def connector_needs_reconnect(self, connector):
if connector in self.dispatcher_connectors.values():
return True
else:
for dispatcher, old_connector in self.old_connectors.items():
if old_connector is connector:
if self.dispatcher_session_count.get(dispatcher, 0) > 0:
return True
else:
del self.old_connectors[dispatcher]
break
if self.shutting_down:
if len(self.old_connectors) == 0:
self._shutdown()
return False
def shutdown(self, graceful=False):
if graceful:
self.graceful_shutdown = True
if self.dispatcher_session_count:
return
if not self.shutting_down:
self.shutting_down = True
self.srv_monitor.cancel()
self.session_manager.cleanup()
if len(self.dispatcher_connectors) + len(self.old_connectors) == 0:
self._shutdown()
else:
self.update_dispatchers([])
diff --git a/mediaproxy/sipthor.py b/mediaproxy/sipthor.py
index 561a500..b022050 100644
--- a/mediaproxy/sipthor.py
+++ b/mediaproxy/sipthor.py
@@ -1,85 +1,65 @@
# Copyright (C) 2008 AG Projects
# Author: Ruud Klaver
#
"""SIP Thor backend"""
from application import log
-from application.configuration import ConfigSection, ConfigSetting
-from application.system import host
-
from gnutls.constants import COMP_LZO, COMP_DEFLATE, COMP_NULL
from thor.entities import ThorEntities, GenericThorEntity
from thor.eventservice import EventServiceClient, ThorEvent
from thor.tls import X509Credentials
+from mediaproxy import __version__
+from mediaproxy.configuration import ThorNetworkConfig
+from mediaproxy.configuration.datatypes import DispatcherIPAddress
from mediaproxy.relay import SRVMediaRelayBase
-from mediaproxy import configuration_filename, default_dispatcher_port, __version__
-
-
-class SIPThorDomain(str):
- """A SIP Thor domain name or the keyword None"""
- def __new__(cls, name):
- if name is None:
- return None
- elif not isinstance(name, basestring):
- raise TypeError("domain name must be a string, unicode or None")
- if name.lower() == 'none':
- return None
- return name
-
-class ThorNetworkConfig(ConfigSection):
- __cfgfile__ = configuration_filename
- __section__ = 'ThorNetwork'
-
- domain = ConfigSetting(type=SIPThorDomain, value=None)
- node_ip = host.default_ip
if ThorNetworkConfig.domain is None:
## SIP Thor is installed but disabled. Fake an ImportError to start in standalone media relay mode.
log.warn("SIP Thor is installed but disabled from the configuration")
raise ImportError("SIP Thor is disabled")
class SIPThorMediaRelayBase(EventServiceClient, SRVMediaRelayBase):
topics = ["Thor.Members"]
def __init__(self):
self.node = GenericThorEntity(ThorNetworkConfig.node_ip, ["media_relay"], version=__version__)
self.presence_message = ThorEvent('Thor.Presence', self.node.id)
self.shutdown_message = ThorEvent('Thor.Leave', self.node.id)
credentials = X509Credentials(cert_name='relay')
credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL)
self.sipthor_dispatchers = []
self.additional_dispatchers = []
EventServiceClient.__init__(self, ThorNetworkConfig.domain, credentials)
SRVMediaRelayBase.__init__(self)
def handle_event(self, event):
if not self.shutting_down:
sip_proxy_ips = [node.ip for node in ThorEntities(event.message, role="sip_proxy")]
- self.sipthor_dispatchers = [(ip, default_dispatcher_port) for ip in sip_proxy_ips]
+ self.sipthor_dispatchers = [(ip, DispatcherIPAddress.default_port) for ip in sip_proxy_ips]
self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers)
def _cb_got_all(self, results):
if not self.shutting_down:
self.additional_dispatchers = [result[1] for result in results if result[0] and result[1] is not None]
self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers)
def update_dispatchers(self, dispatchers):
raise NotImplementedError()
def _handle_SIGHUP(self, *args):
SRVMediaRelayBase._handle_SIGHUP(self, *args)
def _handle_SIGINT(self, *args):
SRVMediaRelayBase._handle_SIGINT(self, *args)
def _handle_SIGTERM(self, *args):
SRVMediaRelayBase._handle_SIGTERM(self, *args)
def shutdown(self, graceful=False):
raise NotImplementedError()
diff --git a/mediaproxy/tls.py b/mediaproxy/tls.py
index 777ad3c..23078fd 100644
--- a/mediaproxy/tls.py
+++ b/mediaproxy/tls.py
@@ -1,135 +1,86 @@
# Copyright (C) 2007-2008 AG Projects.
#
"""TLS support"""
-__all__ = ['X509Credentials', 'X509NameValidator']
+__all__ = ['X509Credentials']
import os
import stat
-import re
+from application.process import process
from gnutls import crypto
from gnutls.interfaces import twisted
-from application.process import process
-from application.configuration import ConfigSection
-
-from mediaproxy import configuration_filename
-
-
-class TLSConfig(ConfigSection):
- __cfgfile__ = configuration_filename
- __section__ = 'TLS'
-
- certs_path = 'tls'
- verify_interval = 300
-
-
-
-class X509NameValidator(crypto.X509Name):
- def __new__(cls, dname):
- if dname.lower() == 'none':
- return None
- return crypto.X509Name.__new__(cls, dname)
-
- def __init__(self, dname):
- str.__init__(self)
- pairs = [x.replace('\,', ',') for x in re.split(r'(?['"])(?P.+?)(?P=quote)""", open('mediaproxy/__init__.py').read()).group('version')
def get_link_libraries():
libiptc = CDLL(find_library('iptc'))
libip4tc = CDLL(find_library('ip4tc'))
try:
libiptc.iptc_commit
except AttributeError:
try:
libip4tc.iptc_commit
except AttributeError:
print 'No valid iptc library was found on the system. Please install iptables development libraries.'
sys.exit(1)
else:
return ['netfilter_conntrack', 'ip4tc']
else:
return ['netfilter_conntrack', 'iptc']
def setup(*args, **kwargs):
"""Mangle setup to ignore media-relay on non-linux platforms"""
if not sys.platform.startswith('linux2'):
print "WARNING: skipping the media relay component as this is a non-linux platform"
kwargs.pop('ext_modules', None)
kwargs['scripts'].remove('media-relay')
_setup(*args, **kwargs)
setup(name = "mediaproxy",
version = get_version(),
author = "Ruud Klaver",
author_email = "support@ag-projects.com",
maintainer = "AG Projects",
maintainer_email = "support@ag-projects.com",
url = "http://www.ag-projects.com/MediaProxy.html",
description = title,
long_description = description,
license = "GPL",
platforms = ["Linux"],
classifiers = [
#"Development Status :: 1 - Planning",
#"Development Status :: 2 - Pre-Alpha",
#"Development Status :: 3 - Alpha",
#"Development Status :: 4 - Beta",
"Development Status :: 5 - Production/Stable",
#"Development Status :: 6 - Mature",
#"Development Status :: 7 - Inactive",
"Intended Audience :: Service Providers",
"License :: GNU General Public License (GPL)",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python",
"Programming Language :: C"
],
- packages = ['mediaproxy', 'mediaproxy.interfaces', 'mediaproxy.interfaces.accounting', 'mediaproxy.interfaces.system'],
+ packages = ['mediaproxy', 'mediaproxy.configuration', 'mediaproxy.interfaces', 'mediaproxy.interfaces.accounting', 'mediaproxy.interfaces.system'],
scripts = ['media-relay', 'media-dispatcher'],
ext_modules = [
Extension(name = 'mediaproxy.interfaces.system._conntrack',
sources = ['mediaproxy/interfaces/system/_conntrack.c'],
libraries = get_link_libraries(),
define_macros = [('MODULE_VERSION', get_version())])
]
)