diff --git a/media-dispatcher b/media-dispatcher
index ad68c38..c774e74 100644
--- a/media-dispatcher
+++ b/media-dispatcher
@@ -1,67 +1,64 @@
#!/usr/bin/env python
-# Copyright (C) 2008 AG Projects
-#
-
"""MediaProxy Dispatcher component"""
if __name__ == "__main__":
import sys
from optparse import OptionParser
from application.process import process, ProcessError
from application.configuration import ConfigFile, datatypes
from application import log
import mediaproxy
name = "media-dispatcher"
fullname = "MediaProxy Dispatcher"
description = "MediaProxy Dispatcher component"
default_pid = mediaproxy.runtime_directory + '/dispatcher.pid'
parser = OptionParser(version="%%prog %s" % mediaproxy.__version__)
parser.add_option("--no-fork", action="store_false", dest="fork", default=1, help="run the process in the foreground (for debugging)")
parser.add_option("--pid", dest="pid_file", default=default_pid, help="pid file (%s)" % default_pid, metavar="File")
(options, args) = parser.parse_args()
pid_file = options.pid_file
process.system_config_directory = mediaproxy.system_config_directory
config_file = ConfigFile(mediaproxy.configuration_filename)
log.level.current = config_file.get_setting("Dispatcher", 'log_level', type=datatypes.LogLevel, default=log.level.DEBUG)
try:
process.runtime_directory = mediaproxy.runtime_directory
except ProcessError, e:
log.fatal("Cannot start %s: %s" % (fullname, e))
sys.exit(1)
if options.fork:
try:
process.daemonize(pid_file)
except ProcessError, e:
log.fatal("Cannot start %s: %s" % (fullname, e))
sys.exit(1)
log.start_syslog(name)
log.msg("Starting %s %s" % (fullname, mediaproxy.__version__))
from mediaproxy.dispatcher import Dispatcher
if not options.fork:
from application.debug.memory import memory_dump
try:
dispatcher = Dispatcher()
except Exception, e:
log.fatal("failed to create %s: %s" % (fullname, e))
if e.__class__ is not RuntimeError:
log.err()
sys.exit(1)
dispatcher.run()
if not options.fork:
#from application.debug.memory import memory_dump
memory_dump()
diff --git a/media-relay b/media-relay
index 81560c8..b64b8e3 100644
--- a/media-relay
+++ b/media-relay
@@ -1,111 +1,108 @@
#!/usr/bin/env python
-# Copyright (C) 2008 AG Projects
-#
-
"""MediaProxy Relay component"""
from __future__ import with_statement
if __name__ == "__main__":
import errno
import sys
import subprocess
from optparse import OptionParser
from application import log
from application.configuration import ConfigFile, datatypes
from application.process import process, ProcessError
from application.version import Version
import mediaproxy
IP_FORWARD_FILE = "/proc/sys/net/ipv4/ip_forward"
CONNTRACK_ACCT_FILE = "/proc/sys/net/netfilter/nf_conntrack_acct"
KERNEL_VERSION_FILE = "/proc/sys/kernel/osrelease"
name = "media-relay"
fullname = "MediaProxy Relay"
description = "MediaProxy Relay component"
default_pid = mediaproxy.runtime_directory + '/relay.pid'
parser = OptionParser(version="%%prog %s" % mediaproxy.__version__)
parser.add_option("--no-fork", action="store_false", dest="fork", default=1, help="run the process in the foreground (for debugging)")
parser.add_option("--pid", dest="pid_file", default=default_pid, help="pid file (%s)" % default_pid, metavar="File")
(options, args) = parser.parse_args()
if not sys.platform.startswith('linux'):
log.fatal("Cannot start %s. A Linux host is required for operation." % fullname)
sys.exit(1)
try:
subprocess.call(['modprobe', 'ip_tables'], env={'PATH': '/usr/sbin:/sbin:/usr/bin:/bin'})
except OSError, e:
log.fatal("Cannot start %s: failed to load the ip_tables kernel module: %s" % (fullname, e))
sys.exit(1)
try:
kernel_version = Version.parse(open(KERNEL_VERSION_FILE).read().strip())
except (OSError, IOError, ValueError):
log.fatal("Could not determine Linux kernel version")
sys.exit(1)
if kernel_version < Version(2, 6, 18):
log.fatal("Linux kernel version 2.6.18 or newer is required to run the media relay")
sys.exit(1)
try:
ip_forward = bool(int(open(IP_FORWARD_FILE).read()))
except (OSError, IOError, ValueError):
ip_forward = False
if not ip_forward:
log.fatal("IP forwarding is not available or not enabled (check %s)" % IP_FORWARD_FILE)
sys.exit(1)
try:
with open(CONNTRACK_ACCT_FILE, 'w') as acct_file:
acct_file.write("1")
except (IOError, OSError), e:
if e.errno != errno.ENOENT:
log.fatal("Could not enable conntrack rule counters (check %s): %s" % (CONNTRACK_ACCT_FILE, e))
sys.exit(1)
pid_file = options.pid_file
process.system_config_directory = mediaproxy.system_config_directory
config_file = ConfigFile(mediaproxy.configuration_filename)
log.level.current = config_file.get_setting("Relay", 'log_level', type=datatypes.LogLevel, default=log.level.DEBUG)
try:
process.runtime_directory = mediaproxy.runtime_directory
except ProcessError, e:
log.fatal("Cannot start %s: %s" % (fullname, e))
sys.exit(1)
if options.fork:
try:
process.daemonize(pid_file)
except ProcessError, e:
log.fatal("Cannot start %s: %s" % (fullname, e))
sys.exit(1)
log.start_syslog(name)
log.msg("Starting %s %s" % (fullname, mediaproxy.__version__))
try:
from mediaproxy.relay import MediaRelay
if not options.fork:
from application.debug.memory import memory_dump
relay = MediaRelay()
except Exception, e:
log.fatal("failed to create %s: %s" % (fullname, e))
if e.__class__ is not RuntimeError:
log.err()
sys.exit(1)
relay.run()
if not options.fork:
#from application.debug.memory import memory_dump
memory_dump()
diff --git a/mediaproxy/__init__.py b/mediaproxy/__init__.py
index 1546898..5dd5e3b 100644
--- a/mediaproxy/__init__.py
+++ b/mediaproxy/__init__.py
@@ -1,12 +1,10 @@
-# Copyright (C) 2008-2014 AG-Projects.
-#
"""Mediaproxy implements a media relay for SIP calls"""
__version__ = "2.6.2"
system_config_directory = '/etc/mediaproxy'
runtime_directory = '/var/run/mediaproxy'
configuration_filename = 'config.ini'
diff --git a/mediaproxy/configuration/__init__.py b/mediaproxy/configuration/__init__.py
index d3f0895..775aad4 100644
--- a/mediaproxy/configuration/__init__.py
+++ b/mediaproxy/configuration/__init__.py
@@ -1,90 +1,88 @@
-# Copyright (C) 2008-2014 AG Projects
-#
from application.configuration import ConfigSection, ConfigSetting
from application.configuration.datatypes import IPAddress, NetworkRangeList
from application.system import host
from mediaproxy import configuration_filename
from mediaproxy.configuration.datatypes import AccountingModuleList, DispatcherIPAddress, DispatcherAddressList, DispatcherManagementAddress, PortRange, PositiveInteger, SIPThorDomain, X509NameValidator
class DispatcherConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'Dispatcher'
socket_path = "dispatcher.sock"
listen = ConfigSetting(type=DispatcherIPAddress, value=DispatcherIPAddress("any"))
listen_management = ConfigSetting(type=DispatcherManagementAddress, value=DispatcherManagementAddress("any"))
relay_timeout = 5 # How much to wait for an answer from a relay
relay_recover_interval = 60 # How much to wait for an unresponsive relay to recover, before disconnecting it
cleanup_dead_relays_after = 43200 # 12 hours
cleanup_expired_sessions_after = 86400 # 24 hours
management_use_tls = True
accounting = ConfigSetting(type=AccountingModuleList, value=[])
passport = ConfigSetting(type=X509NameValidator, value=None)
management_passport = ConfigSetting(type=X509NameValidator, value=None)
class RelayConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'Relay'
relay_ip = ConfigSetting(type=IPAddress, value=host.default_ip)
advertised_ip = ConfigSetting(type=IPAddress, value=None)
stream_timeout = 90
on_hold_timeout = 7200
traffic_sampling_period = 15
userspace_transmit_every = 1
dispatchers = ConfigSetting(type=DispatcherAddressList, value=[])
port_range = PortRange("50000:60000")
dns_check_interval = PositiveInteger(60)
keepalive_interval = PositiveInteger(10)
reconnect_delay = PositiveInteger(10)
passport = ConfigSetting(type=X509NameValidator, value=None)
routable_private_ranges = ConfigSetting(type=NetworkRangeList, value=[])
class OpenSIPSConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'OpenSIPS'
socket_path = '/var/run/opensips/socket'
max_connections = 10
class RadiusConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'Radius'
config_file = "/etc/opensips/radius/client.conf"
additional_dictionary = "radius/dictionary"
class DatabaseConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'Database'
dburi = ""
sessions_table = "media_sessions"
callid_column = "call_id"
fromtag_column = "from_tag"
totag_column = "to_tag"
info_column = "info"
class TLSConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'TLS'
certs_path = 'tls'
verify_interval = 300
class ThorNetworkConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'ThorNetwork'
domain = ConfigSetting(type=SIPThorDomain, value=None)
node_ip = host.default_ip
diff --git a/mediaproxy/configuration/datatypes.py b/mediaproxy/configuration/datatypes.py
index 98687d7..01e3049 100644
--- a/mediaproxy/configuration/datatypes.py
+++ b/mediaproxy/configuration/datatypes.py
@@ -1,118 +1,116 @@
-# Copyright (C) 2008-2014 AG Projects
-#
import re
from application.configuration.datatypes import IPAddress, NetworkAddress, StringList
from gnutls import crypto
class DispatcherIPAddress(NetworkAddress):
default_port = 25060
class DispatcherManagementAddress(NetworkAddress):
default_port = 25061
class AccountingModuleList(StringList):
_valid_backends = set(('database', 'radius'))
def __new__(cls, value):
proposed_backends = set(StringList.__new__(cls, value))
return list(proposed_backends & cls._valid_backends)
class DispatcherAddress(tuple):
default_port = 25060
def __new__(cls, value):
match = re.search(r"^(?P
.+?):(?P\d+)$", value)
if match:
address = str(match.group("address"))
port = int(match.group("port"))
else:
address = value
port = cls.default_port
try:
address = IPAddress(address)
is_domain = False
except ValueError:
is_domain = True
return tuple.__new__(cls, (address, port, is_domain))
class DispatcherAddressList(list):
def __init__(cls, value):
list.__init__(cls, (DispatcherAddress(dispatcher) for dispatcher in re.split(r'\s*,\s*|\s+', value)))
class PortRange(object):
"""A port range in the form start:end with start and end being even numbers in the [1024, 65536] range"""
def __init__(self, value):
self.start, self.end = [int(p) for p in value.split(':', 1)]
allowed = xrange(1024, 65537, 2)
if not (self.start in allowed and self.end in allowed and self.start < self.end):
raise ValueError("bad range: %r: ports must be even numbers in the range [1024, 65536] with start < end" % value)
def __repr__(self):
return "%s('%d:%d')" % (self.__class__.__name__, self.start, self.end)
class PositiveInteger(int):
def __new__(cls, value):
instance = int.__new__(cls, value)
if instance < 1:
raise ValueError("value must be a positive integer")
return instance
class SIPThorDomain(str):
"""A SIP Thor domain name or the keyword None"""
def __new__(cls, name):
if name is None:
return None
elif not isinstance(name, basestring):
raise TypeError("domain name must be a string, unicode or None")
if name.lower() == 'none':
return None
return name
class X509NameValidator(crypto.X509Name):
def __new__(cls, dname):
if dname.lower() == 'none':
return None
return crypto.X509Name.__new__(cls, dname)
def __init__(self, dname):
str.__init__(self)
pairs = [x.replace('\,', ',') for x in re.split(r'(?=limit), self.sessions.iteritems())]
if obsolete:
[self.sessions.pop(call_id) for call_id in obsolete]
log.warn("found %d expired sessions which were not removed during the last %d hours" % (len(obsolete), round(limit/3600.0)))
return KeepRunning
def buildProtocol(self, addr):
ip = addr.host
log.debug("Connection from relay at %s" % ip)
prot = Factory.buildProtocol(self, addr)
prot.ip = ip
return prot
def new_relay(self, relay):
old_relay = self.relays.pop(relay.ip, None)
if old_relay is not None:
log.warn("Relay at %s reconnected, closing old connection" % relay.ip)
reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced("relay reconnected")))
self.relays[relay.ip] = relay
timer = self.cleanup_timers.pop(relay.ip, None)
if timer is not None:
timer.cancel()
defer = relay.send_command("sessions", [])
defer.addCallback(self._cb_purge_sessions, relay.ip)
def _cb_purge_sessions(self, result, relay_ip):
relay_sessions = cjson.decode(result)
relay_call_ids = [session["call_id"] for session in relay_sessions]
for session_id, session in self.sessions.items():
if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids:
log.warn("Session %s is no longer on relay %s, statistics are probably lost" % (session_id, relay_ip))
if session.dialog_id is not None:
self.dispatcher.opensips_management.end_dialog(session.dialog_id)
del self.sessions[session_id]
def send_command(self, command, headers):
try:
parsed_headers = dict(header.split(": ", 1) for header in headers)
except:
raise RelayError("Could not parse headers from OpenSIPs")
try:
call_id = parsed_headers["call_id"]
except KeyError:
raise RelayError("Missing call_id header")
session = self.sessions.get(call_id, None)
if session and session.expire_time is None:
relay = session.relay_ip
if relay not in self.relays:
raise RelayError("Relay for this session (%s) is no longer connected" % relay)
return self.relays[relay].send_command(command, headers)
## We do not have a session for this call_id or the session is already expired
if command == "update":
preferred_relay = parsed_headers.get("media_relay")
try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay)
random.shuffle(try_relays)
if preferred_relay is not None:
protocol = self.relays.get(preferred_relay)
if protocol is not None and protocol.active:
try_relays.appendleft(protocol)
else:
log.warn("user requested media_relay %s is not available" % preferred_relay)
defer = self._try_next(try_relays, command, headers)
defer.addCallback(self._add_session, try_relays, call_id, parsed_headers)
return defer
elif command == 'remove' and session:
## This is the remove we received for an expired session for which we triggered dialog termination
del self.sessions[call_id]
return 'removed'
else:
raise RelayError("Got `%s' command from OpenSIPS for unknown session with call-id `%s'" % (command, call_id))
def _add_session(self, result, try_relays, call_id, parsed_headers):
self.sessions[call_id] = RelaySession(try_relays[0].ip, parsed_headers)
return result
def _relay_error(self, failure, try_relays, command, headers):
failure.trap(RelayError)
failed_relay = try_relays.popleft()
log.warn("Relay %s failed: %s" % (failed_relay, failure.value))
return self._try_next(try_relays, command, headers)
def _try_next(self, try_relays, command, headers):
if len(try_relays) == 0:
raise RelayError("No suitable relay found")
defer = try_relays[0].send_command(command, headers)
defer.addErrback(self._relay_error, try_relays, command, headers)
return defer
def get_summary(self):
defer = DeferredList([relay.send_command("summary", []).addErrback(self._summary_error, ip) for ip, relay in self.relays.iteritems()])
defer.addCallback(self._got_summaries)
return defer
def _summary_error(self, failure, ip):
log.error("Error processing query at relay %s: %s" % (ip, failure.value))
return cjson.encode(dict(status="error", ip=ip))
def _got_summaries(self, results):
return "[%s]" % ', '.join(result for succeeded, result in results if succeeded)
def get_statistics(self):
defer = DeferredList([relay.send_command("sessions", []) for relay in self.relays.itervalues()])
defer.addCallback(self._got_statistics)
return defer
def _got_statistics(self, results):
return "[%s]" % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result!='[]')
def connection_lost(self, relay):
if relay not in self.relays.itervalues():
return
if relay.authenticated:
del self.relays[relay.ip]
if self.shutting_down:
if len(self.relays) == 0:
self.defer.callback(None)
else:
self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip)
def _do_cleanup(self, ip):
log.debug("Doing cleanup for old relay %s" % ip)
del self.cleanup_timers[ip]
for call_id in [call_id for call_id, session in self.sessions.items() if session.relay_ip == ip]:
del self.sessions[call_id]
def shutdown(self):
if self.shutting_down:
return
self.shutting_down = True
for timer in self.cleanup_timers.itervalues():
timer.cancel()
if len(self.relays) == 0:
retval = succeed(None)
else:
for prot in self.relays.itervalues():
prot.transport.loseConnection()
self.defer = Deferred()
retval = self.defer
retval.addCallback(self._save_state)
return retval
def _save_state(self, result):
pickle.dump(self.sessions, open(process.runtime_file("dispatcher_state"), "w"))
class Dispatcher(object):
def __init__(self):
self.accounting = [__import__("mediaproxy.interfaces.accounting.%s" % mod.lower(), globals(), locals(), [""]).Accounting() for mod in set(DispatcherConfig.accounting)]
self.cred = X509Credentials(cert_name='dispatcher')
self.relay_factory = RelayFactory(self)
dispatcher_addr, dispatcher_port = DispatcherConfig.listen
self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.cred, interface=dispatcher_addr)
self.opensips_factory = OpenSIPSControlFactory(self)
socket_path = process.runtime_file(DispatcherConfig.socket_path)
unlink(socket_path)
self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory)
self.opensips_management = opensips.ManagementInterface()
self.management_factory = ManagementControlFactory(self)
management_addr, management_port = DispatcherConfig.listen_management
if DispatcherConfig.management_use_tls:
self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.cred, interface=management_addr)
else:
self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr)
def run(self):
process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP)
process.signals.add_handler(signal.SIGINT, self._handle_SIGINT)
process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM)
for accounting_module in self.accounting:
accounting_module.start()
reactor.run(installSignalHandlers=False)
def send_command(self, command, headers):
return maybeDeferred(self.relay_factory.send_command, command, headers)
def update_statistics(self, stats):
log.debug("Got statistics: %s" % stats)
if stats["start_time"] is not None:
for accounting in self.accounting:
try:
accounting.do_accounting(stats)
except Exception, e:
log.error("An unhandled error occured while doing accounting: %s" % e)
log.err()
def _handle_SIGHUP(self, *args):
log.msg("Received SIGHUP, shutting down.")
reactor.callFromThread(self._shutdown)
def _handle_SIGINT(self, *args):
if process._daemon:
log.msg("Received SIGINT, shutting down.")
else:
log.msg("Received KeyboardInterrupt, exiting.")
reactor.callFromThread(self._shutdown)
def _handle_SIGTERM(self, *args):
log.msg("Received SIGTERM, shutting down.")
reactor.callFromThread(self._shutdown)
def _shutdown(self):
defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None])
defer.addCallback(lambda x: self.opensips_factory.shutdown())
defer.addCallback(lambda x: self.management_factory.shutdown())
defer.addCallback(lambda x: self.relay_factory.shutdown())
defer.addCallback(lambda x: self._stop())
def _stop(self):
for act in self.accounting:
act.stop()
reactor.stop()
diff --git a/mediaproxy/headers.py b/mediaproxy/headers.py
index 0f461da..c169331 100644
--- a/mediaproxy/headers.py
+++ b/mediaproxy/headers.py
@@ -1,107 +1,105 @@
-# Copyright (C) 2008-2014 AG Projects
-#
"""Header encoding and decoding rules for communication between the dispatcher and relay components"""
class EncodingError(Exception):
pass
class DecodingError(Exception):
pass
class MediaProxyHeaders(object):
@classmethod
def encode(cls, name, value):
func_name = "encode_%s" % name
if hasattr(cls, func_name):
return getattr(cls, func_name)(value)
else:
return value
@classmethod
def decode(cls, name, value):
func_name = "decode_%s" % name
if hasattr(cls, func_name):
return getattr(cls, func_name)(value)
else:
return value
@staticmethod
def encode_cseq(value):
return str(value)
@staticmethod
def decode_cseq(value):
try:
return int(value)
except ValueError:
raise DecodingError("Not an integer: %s" % value)
@staticmethod
def encode_type(value):
if value not in ["request", "reply"]:
raise EncodingError('"type" header should be either "request" or "reply"')
return value
@staticmethod
def decode_type(value):
if value not in ["request", "reply"]:
raise DecodingError('"type" header should be either "request" or "reply"')
return value
@staticmethod
def encode_media(value):
try:
return ','.join(':'.join([type, ip, str(port), direction] + ['%s=%s' % (k, v) for k, v in parameters.iteritems()]) for type, ip, port, direction, parameters in value)
except:
raise EncodingError("Ill-formatted media information")
@staticmethod
def decode_media(value):
try:
streams = []
for stream_data in (data for data in value.split(",") if data):
stream_data = stream_data.split(":")
type, ip, port, direction = stream_data[:4]
parameters = dict(param.split("=") for param in stream_data[4:] if param)
streams.append((type, ip, int(port), direction, parameters))
return streams
except:
raise DecodingError("Ill-formatted media header")
class CodingDict(dict):
def __init__(self, *args, **kwargs):
if not args and not kwargs:
it = []
elif kwargs:
it = kwargs.iteritems()
elif isinstance(args[0], dict):
it = args[0].iteritems()
else:
try:
it = iter(args[0])
except:
dict.__init__(self, *args, **kwargs)
return
dict.__init__(self)
for key, value in it:
self.__setitem__(key, value)
class EncodingDict(CodingDict):
def __setitem__(self, key, value):
encoded_value = MediaProxyHeaders.encode(key, value)
dict.__setitem__(self, key, encoded_value)
class DecodingDict(CodingDict):
def __setitem__(self, key, value):
decoded_value = MediaProxyHeaders.decode(key, value)
dict.__setitem__(self, key, decoded_value)
diff --git a/mediaproxy/interfaces/__init__.py b/mediaproxy/interfaces/__init__.py
index 2c6e1a8..db2bb93 100644
--- a/mediaproxy/interfaces/__init__.py
+++ b/mediaproxy/interfaces/__init__.py
@@ -1,5 +1,3 @@
-# Copyright (C) 2008-2014 AG-Projects.
-#
"""Interfaces between Mediaproxy and the other components in the system"""
diff --git a/mediaproxy/interfaces/accounting/__init__.py b/mediaproxy/interfaces/accounting/__init__.py
index 594a06f..a24bfe9 100644
--- a/mediaproxy/interfaces/accounting/__init__.py
+++ b/mediaproxy/interfaces/accounting/__init__.py
@@ -1,5 +1,3 @@
-# Copyright (C) 2008 AG-Projects.
-#
"""Interfaces to various accounting backends"""
diff --git a/mediaproxy/interfaces/accounting/database.py b/mediaproxy/interfaces/accounting/database.py
index 512f884..1bfa22f 100644
--- a/mediaproxy/interfaces/accounting/database.py
+++ b/mediaproxy/interfaces/accounting/database.py
@@ -1,91 +1,88 @@
-# Copyright (C) 2008 AG Projects
-# Author: Ruud Klaver
-#
"""Implementation of database accounting"""
import cjson
from application import log
from application.python.queue import EventQueue
from sqlobject import SQLObject, connectionForURI, sqlhub
from sqlobject import StringCol, BLOBCol, DatabaseIndex
from sqlobject.dberrors import DatabaseError, ProgrammingError, OperationalError
from mediaproxy.configuration import DatabaseConfig
if not DatabaseConfig.dburi:
raise RuntimeError("Database accounting is enabled, but the database URI is not specified in config.ini")
connection = connectionForURI(DatabaseConfig.dburi)
sqlhub.processConnection = connection
class MediaSessions(SQLObject):
class sqlmeta:
table = DatabaseConfig.sessions_table
createSQL = {'mysql': 'ALTER TABLE %s ENGINE MyISAM' % DatabaseConfig.sessions_table}
cacheValues = False
call_id = StringCol(length=255, dbName=DatabaseConfig.callid_column, notNone=True)
from_tag = StringCol(length=64, dbName=DatabaseConfig.fromtag_column, notNone=True)
to_tag = StringCol(length=64, dbName=DatabaseConfig.totag_column)
info = BLOBCol(length=2**24-1, dbName=DatabaseConfig.info_column) # 2**24-1 makes it a mediumblob in mysql, that can hold 16 million bytes
## Indexes
callid_idx = DatabaseIndex('call_id', 'from_tag', 'to_tag', unique=True)
try:
MediaSessions.createTable(ifNotExists=True)
except OperationalError, e:
log.error("cannot create the `%s' table: %s" % (DatabaseConfig.sessions_table, e))
log.msg("please make sure that the `%s' user has the CREATE and ALTER rights on the `%s' database" % (connection.user, connection.db))
log.msg("then restart the dispatcher, or you can create the table yourself using the following definition:")
log.msg("----------------- >8 -----------------")
sql, constraints = MediaSessions.createTableSQL()
statements = ';\n'.join([sql] + constraints) + ';'
log.msg(statements)
log.msg("----------------- >8 -----------------")
#raise RuntimeError(str(e))
class Accounting(object):
def __init__(self):
self.handler = DatabaseAccounting()
def start(self):
self.handler.start()
def do_accounting(self, stats):
self.handler.put(stats)
def stop(self):
self.handler.stop()
self.handler.join()
class DatabaseAccounting(EventQueue):
def __init__(self):
EventQueue.__init__(self, self.do_accounting)
def do_accounting(self, stats):
sqlrepr = connection.sqlrepr
names = ', '.join([DatabaseConfig.callid_column, DatabaseConfig.fromtag_column, DatabaseConfig.totag_column, DatabaseConfig.info_column])
values = ', '.join((sqlrepr(v) for v in [stats["call_id"], stats["from_tag"], stats["to_tag"], cjson.encode(stats)]))
q = """INSERT INTO %s (%s) VALUES (%s)""" % (DatabaseConfig.sessions_table, names, values)
try:
try:
connection.query(q)
except ProgrammingError, e:
try:
MediaSessions.createTable(ifNotExists=True)
except OperationalError:
raise e
else:
connection.query(q)
except DatabaseError, e:
log.error("failed to insert record into database: %s" % e)
diff --git a/mediaproxy/interfaces/accounting/radius.py b/mediaproxy/interfaces/accounting/radius.py
index 30a25cc..5cd5f92 100644
--- a/mediaproxy/interfaces/accounting/radius.py
+++ b/mediaproxy/interfaces/accounting/radius.py
@@ -1,127 +1,124 @@
-# Copyright (C) 2008 AG Projects
-# Author: Ruud Klaver
-#
"""Implementation of RADIUS accounting"""
from application import log
from application.process import process
from application.python.queue import EventQueue
import pyrad.client
import pyrad.dictionary
from mediaproxy.configuration import RadiusConfig
try:
from pyrad.dictfile import DictFile
except ImportError:
# helper class to make pyrad support the $INCLUDE statement in dictionary files
class RadiusDictionaryFile(object):
def __init__(self, base_file_name):
self.file_names = [base_file_name]
self.fd_stack = [open(base_file_name)]
def readlines(self):
while True:
line = self.fd_stack[-1].readline()
if line:
if line.startswith("$INCLUDE"):
file_name = line.rstrip("\n").split(None, 1)[1]
if file_name not in self.file_names:
self.file_names.append(file_name)
self.fd_stack.append(open(file_name))
continue
else:
yield line
else:
self.fd_stack.pop()
if len(self.fd_stack) == 0:
return
else:
del DictFile
class RadiusDictionaryFile(str):
pass
class Accounting(object):
def __init__(self):
self.handler = RadiusAccounting()
def start(self):
self.handler.start()
def do_accounting(self, stats):
self.handler.put(stats)
def stop(self):
self.handler.stop()
self.handler.join()
class RadiusAccounting(EventQueue, pyrad.client.Client):
def __init__(self):
main_config_file = process.config_file(RadiusConfig.config_file)
if main_config_file is None:
raise RuntimeError("Cannot find the radius configuration file: `%s'" % RadiusConfig.config_file)
try:
config = dict(line.rstrip("\n").split(None, 1) for line in open(main_config_file) if len(line.split(None, 1)) == 2 and not line.startswith("#"))
secrets = dict(line.rstrip("\n").split(None, 1) for line in open(config["servers"]) if len(line.split(None, 1)) == 2 and not line.startswith("#"))
server = config["acctserver"]
try:
server, acctport = server.split(":")
acctport = int(acctport)
except ValueError:
acctport = 1813
secret = secrets[server]
dicts = [RadiusDictionaryFile(config["dictionary"])]
if RadiusConfig.additional_dictionary:
additional_dictionary = process.config_file(RadiusConfig.additional_dictionary)
if additional_dictionary:
dicts.append(RadiusDictionaryFile(additional_dictionary))
else:
log.warn("Could not load additional RADIUS dictionary file: `%s'" % RadiusConfig.additional_dictionary)
raddict = pyrad.dictionary.Dictionary(*dicts)
timeout = int(config["radius_timeout"])
retries = int(config["radius_retries"])
except Exception:
log.fatal("cannot read the RADIUS configuration file")
raise
pyrad.client.Client.__init__(self, server, 1812, acctport, secret, raddict)
self.timeout = timeout
self.retries = retries
if "bindaddr" in config and config["bindaddr"] != "*":
self.bind((config["bindaddr"], 0))
EventQueue.__init__(self, self.do_accounting)
def do_accounting(self, stats):
attrs = {}
attrs["Acct-Status-Type"] = "Update"
attrs["User-Name"] = "mediaproxy@default"
attrs["Acct-Session-Id"] = stats["call_id"]
attrs["Acct-Session-Time"] = stats["duration"]
attrs["Acct-Input-Octets"] = sum(stream_stats['caller_bytes'] for stream_stats in stats['streams'])
attrs["Acct-Output-Octets"] = sum(stream_stats['callee_bytes'] for stream_stats in stats['streams'])
attrs["Sip-From-Tag"] = stats["from_tag"]
attrs["Sip-To-Tag"] = stats["to_tag"] or ""
attrs["NAS-IP-Address"] = stats["streams"][0]["caller_local"].split(":")[0]
attrs["Sip-User-Agents"] = (stats["caller_ua"] + "+" + stats["callee_ua"])[:253]
attrs["Sip-Applications"] = ', '.join(sorted(set(stream['media_type'] for stream in stats['streams'] if stream['start_time'] != stream['end_time'])))[:253]
attrs["Media-Codecs"] = ', '.join(stream['caller_codec'] for stream in stats['streams'])[:253]
if stats["timed_out"] and not stats.get("all_streams_ice", False):
attrs["Media-Info"] = "timeout"
elif stats.get("all_streams_ice", False):
attrs["Media-Info"] = "ICE session"
else:
attrs["Media-Info"] = ""
for stream in stats["streams"]:
if stream["post_dial_delay"] is not None:
attrs["Acct-Delay-Time"] = int(stream["post_dial_delay"])
break
try:
self.SendPacket(self.CreateAcctPacket(**attrs))
except Exception, e:
log.error("failed to send radius accounting record: %s" % e)
diff --git a/mediaproxy/interfaces/opensips.py b/mediaproxy/interfaces/opensips.py
index a6a7cc3..f9d6f8b 100644
--- a/mediaproxy/interfaces/opensips.py
+++ b/mediaproxy/interfaces/opensips.py
@@ -1,159 +1,157 @@
-# Copyright (C) 2006-2014 AG Projects.
-#
"""The OpenSIPS Management Interface"""
import socket
from collections import deque
from twisted.internet import reactor, defer
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.error import CannotListenError
from twisted.python.failure import Failure
from application.python.types import Singleton
from application.process import process
from application.system import unlink
from application import log
from mediaproxy.configuration import OpenSIPSConfig
class Error(Exception): pass
class CommandError(Error): pass
class TimeoutError(Error): pass
class NegativeReplyError(Error): pass
class Request(object):
def __init__(self, command):
self.command = command
self.deferred = defer.Deferred()
class UNIXSocketProtocol(DatagramProtocol):
noisy = False
def datagramReceived(self, data, address):
deferred = self.transport.deferred
if deferred is None or deferred.called:
return
# accumulate in a buffer until message end (do this later when implemented by opensips) -Dan
if not data:
failure = Failure(CommandError("Empty reply from OpenSIPS"))
deferred.errback(failure)
return
try:
status, msg = data.split('\n', 1)
except ValueError:
failure = Failure(CommandError("Missing line terminator after status line in OpenSIPS reply"))
deferred.errback(failure)
return
if status.upper() == '200 OK':
deferred.callback(msg)
else:
deferred.errback(Failure(NegativeReplyError(status)))
class UNIXSocketConnection(object):
timeout = 3
def __init__(self, socket_path):
self._initialized = False
self.path = socket_path
self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol())
reactor.addSystemEventTrigger('during', 'shutdown', self.close)
self.transport.deferred = None ## placeholder for the deferred used by a request
self._initialized = True
def close(self):
if self._initialized:
self.transport.stopListening()
unlink(self.path)
def _get_deferred(self):
return self.transport.deferred
def _set_deferred(self, d):
self.transport.deferred = d
deferred = property(_get_deferred, _set_deferred)
def _did_timeout(self, deferred):
if deferred.called:
return
deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout")))
def send(self, request):
self.deferred = request.deferred
try:
self.transport.write(request.command, OpenSIPSConfig.socket_path)
except socket.error, why:
log.error("cannot write request to `%s': %s" % (OpenSIPSConfig.socket_path, why[1]))
self.deferred.errback(Failure(CommandError("Cannot send request to OpenSIPS")))
else:
reactor.callLater(self.timeout, self._did_timeout, self.deferred)
class UNIXSocketConnectionPool(object):
"""Pool of UNIX socket connection to OpenSIPS"""
def __init__(self, max_connections=10, pool_id=''):
assert max_connections > 0, 'maximum should be > 0'
self.max = max_connections
self.id = pool_id
self.workers = 0
self.waiters = deque()
self.connections = deque()
def _create_connections_as_needed(self):
while self.workers < self.max and len(self.waiters) > len(self.connections):
socket_name = "opensips_%s%02d.sock" % (self.id, self.workers+1)
socket_path = process.runtime_file(socket_name)
unlink(socket_path)
try:
conn = UNIXSocketConnection(socket_path)
except CannotListenError, why:
log.error("cannot create an OpenSIPS UNIX socket connection: %s" % str(why))
break
self.connections.append(conn)
self.workers += 1
def _release_connection(self, result, conn):
self.connections.append(conn)
self._process_waiters()
return result
def _process_waiters(self):
while self.waiters:
try:
conn = self.connections.popleft()
except IndexError:
return
request = self.waiters.popleft()
request.deferred.addBoth(self._release_connection, conn)
conn.send(request)
def defer_to_connection(self, command):
request = Request(command)
self.waiters.append(request)
self._create_connections_as_needed()
self._process_waiters()
return request.deferred
class ManagementInterface(object):
__metaclass__ = Singleton
def __init__(self):
self.pool = UNIXSocketConnectionPool(OpenSIPSConfig.max_connections)
## Reply handlers __RH_xxx
def __RH_end_dialog(self, result):
if isinstance(result, Failure):
log.error("failed to end dialog: %s" % result.value)
return False
return True
def end_dialog(self, dialog_id):
cmd = ':dlg_end_dlg:\n%s\n%s\n\n' % (dialog_id.h_entry, dialog_id.h_id)
return self.pool.defer_to_connection(cmd).addBoth(self.__RH_end_dialog)
diff --git a/mediaproxy/interfaces/system/__init__.py b/mediaproxy/interfaces/system/__init__.py
index bcd31aa..cecddb1 100644
--- a/mediaproxy/interfaces/system/__init__.py
+++ b/mediaproxy/interfaces/system/__init__.py
@@ -1,5 +1,3 @@
-# Copyright (C) 2008 AG-Projects.
-#
"""Interfaces to interact with the underlying operating system"""
diff --git a/mediaproxy/iputils.py b/mediaproxy/iputils.py
index e1237a2..c172aab 100644
--- a/mediaproxy/iputils.py
+++ b/mediaproxy/iputils.py
@@ -1,47 +1,45 @@
-# Copyright (C) 2008-2014 AG Projects
-#
"""IP address utilities"""
__all__ = ["is_routable_ip"]
import socket
import struct
from application.configuration.datatypes import NetworkRangeList
from mediaproxy.configuration import RelayConfig
# Non routable network addresses (RFC 3330)
#
_non_routable_netlist = [
'0.0.0.0/8',
'10.0.0.0/8',
'127.0.0.0/8',
'169.254.0.0/16',
'172.16.0.0/12',
'192.0.2.0/24',
'192.168.0.0/16',
'198.51.100.0/24',
'203.0.113.0/24',
'224.0.0.0/4',
'255.255.255.255/32'
]
_non_routable_nets = NetworkRangeList(_non_routable_netlist)
def is_routable_ip(ip):
try:
ip_addr = struct.unpack('!L', socket.inet_aton(ip))[0]
except:
return False
for netbase, mask in RelayConfig.routable_private_ranges:
if (ip_addr & mask) == netbase:
return True
for netbase, mask in _non_routable_nets:
if (ip_addr & mask) == netbase:
return False
return True
diff --git a/mediaproxy/mediacontrol.py b/mediaproxy/mediacontrol.py
index d76ae99..66105a7 100644
--- a/mediaproxy/mediacontrol.py
+++ b/mediaproxy/mediacontrol.py
@@ -1,798 +1,796 @@
-# Copyright (C) 2008-2014 AG Projects
-#
import struct
from time import time
from collections import deque
from operator import attrgetter
from itertools import chain
from application import log
from twisted.internet import reactor
from twisted.internet.interfaces import IReadDescriptor
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.error import CannotListenError
from twisted.python.log import Logger
from zope.interface import implements
from mediaproxy.configuration import RelayConfig
from mediaproxy.interfaces.system import _conntrack
from mediaproxy.iputils import is_routable_ip
from mediaproxy.scheduler import RecurrentCall, KeepRunning
UDP_TIMEOUT_FILE = "/proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream"
rtp_payloads = {
0: "G711u", 1: "1016", 2: "G721", 3: "GSM", 4: "G723", 5: "DVI4", 6: "DVI4",
7: "LPC", 8: "G711a", 9: "G722", 10: "L16", 11: "L16", 14: "MPA", 15: "G728",
18: "G729", 25: "CelB", 26: "JPEG", 28: "nv", 31: "H261", 32: "MPV", 33: "MP2T",
34: "H263"
}
class RelayPortsExhaustedError(Exception):
pass
if RelayConfig.relay_ip is None:
raise RuntimeError("Could not determine default host IP; either add default route or specify relay IP manually")
class Address(object):
"""Representation of an endpoint address"""
def __init__(self, host, port, in_use=True, got_rtp=False):
self.host = host
self.port = port
self.in_use = self.__nonzero__() and in_use
self.got_rtp = got_rtp
def __len__(self):
return 2
def __nonzero__(self):
return None not in (self.host, self.port)
def __getitem__(self, index):
return (self.host, self.port)[index]
def __contains__(self, item):
return item in (self.host, self.port)
def __iter__(self):
yield self.host
yield self.port
def __str__(self):
return self.__nonzero__() and ("%s:%d" % (self.host, self.port)) or "Unknown"
def __repr__(self):
return "%s(%r, %r, in_use=%r, got_rtp=%r)" % (self.__class__.__name__, self.host, self.port, self.in_use, self.got_rtp)
def forget(self):
self.host, self.port, self.in_use, self.got_rtp = None, None, False, False
@property
def unknown(self):
return None in (self.host, self.port)
@property
def obsolete(self):
return self.__nonzero__() and not self.in_use
class Counters(dict):
def __add__(self, other):
n = Counters(self)
for k, v in other.iteritems():
n[k] += v
return n
def __iadd__(self, other):
for k, v in other.iteritems():
self[k] += v
return self
@property
def caller_bytes(self):
return self['caller_bytes']
@property
def callee_bytes(self):
return self['callee_bytes']
@property
def caller_packets(self):
return self['caller_packets']
@property
def callee_packets(self):
return self['callee_packets']
@property
def relayed_bytes(self):
return self['caller_bytes'] + self['callee_bytes']
@property
def relayed_packets(self):
return self['caller_packets'] + self['callee_packets']
class StreamListenerProtocol(DatagramProtocol):
noisy = False
def __init__(self):
self.cb_func = None
self.sdp = None
self.send_packet_count = 0
self.stun_queue = []
def datagramReceived(self, data, (host, port)):
if self.cb_func is not None:
self.cb_func(host, port, data)
def set_remote_sdp(self, ip, port):
if is_routable_ip(ip):
self.sdp = ip, port
else:
self.sdp = None
def send(self, data, is_stun, ip=None, port=None):
if is_stun:
self.stun_queue.append(data)
if ip is None or port is None:
# this means that we have not received any packets from this host yet,
# so we have not learnt its address
if self.sdp is None:
# we can't do anything if we haven't received the SDP IP yet or
# it was in a private range
return
ip, port = self.sdp
# we learnt the IP, empty the STUN packets queue
if self.stun_queue:
for data in self.stun_queue:
self.transport.write(data, (ip, port))
self.stun_queue = []
if not is_stun:
if not self.send_packet_count % RelayConfig.userspace_transmit_every:
self.transport.write(data, (ip, port))
self.send_packet_count += 1
def _stun_test(data):
# Check if data is a STUN request and if it's a binding request
if len(data) < 20:
return False, False
msg_type, msg_len, magic = struct.unpack("!HHI", data[:8])
if msg_type & 0xc == 0 and magic == 0x2112A442:
if msg_type == 0x0001:
return True, True
else:
return True, False
else:
return False, False
class MediaSubParty(object):
def __init__(self, substream, listener):
self.substream = substream
self.listener = listener
self.listener.protocol.cb_func = self.got_data
self.remote = Address(None, None)
host = self.listener.protocol.transport.getHost()
self.local = Address(host.host, host.port)
self.timer = None
self.codec = "Unknown"
self.got_stun_probing = False
self.reset()
def reset(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, "no-traffic timeout", RelayConfig.stream_timeout)
self.remote.in_use = False # keep remote address around but mark it as obsolete
self.remote.got_rtp = False
self.got_stun_probing = False
self.listener.protocol.send_packet_count = 0
def before_hold(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = reactor.callLater(RelayConfig.on_hold_timeout, self.substream.expired, "on hold timeout", RelayConfig.on_hold_timeout)
def after_hold(self):
if self.timer and self.timer.active():
self.timer.cancel()
if not self.remote.in_use:
self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, "no-traffic timeout", RelayConfig.stream_timeout)
def got_data(self, host, port, data):
if (host, port) == tuple(self.remote):
if self.remote.obsolete:
# the received packet matches the previously used IP/port,
# which has been made obsolete, so ignore it
return
else:
if self.remote.in_use:
# the received packet is different than the recorded IP/port,
# so we will discard it
return
# we have learnt the remote IP/port
self.remote.host, self.remote.port = host, port
self.remote.in_use = True
log.debug("Got traffic information for stream: %s" % self.substream.stream)
is_stun, is_binding_request = _stun_test(data)
self.substream.send_data(self, data, is_stun)
if not self.remote.got_rtp and not is_stun:
# This is the first RTP packet received
self.remote.got_rtp = True
if self.timer:
if self.timer.active():
self.timer.cancel()
self.timer = None
if self.codec == "Unknown" and self.substream is self.substream.stream.rtp:
try:
pt = ord(data[1]) & 127
except IndexError:
pass
else:
if pt > 95:
self.codec = "Dynamic(%d)" % pt
elif pt in rtp_payloads:
self.codec = rtp_payloads[pt]
else:
self.codec = "Unknown(%d)" % pt
self.substream.check_create_conntrack()
if is_binding_request:
self.got_stun_probing = True
def cleanup(self):
if self.timer and self.timer.active():
self.timer.cancel()
self.timer = None
self.listener.protocol.cb_func = None
self.substream = None
class MediaSubStream(object):
def __init__(self, stream, listener_caller, listener_callee):
self.stream = stream
self.forwarding_rule = None
self.caller = MediaSubParty(self, listener_caller)
self.callee = MediaSubParty(self, listener_callee)
self._counters = Counters(caller_bytes=0, callee_bytes=0, caller_packets=0, callee_packets=0)
@property
def counters(self):
"""Accumulated counters from all the forwarding rules the stream had"""
if self.forwarding_rule is None:
return self._counters
else:
try:
return self._counters + self.forwarding_rule.counters
except _conntrack.Error:
return self._counters
def _stop_relaying(self):
if self.forwarding_rule is not None:
try:
self._counters += self.forwarding_rule.counters
except _conntrack.Error:
pass
self.forwarding_rule = None
def reset(self, party):
if party == "caller":
self.caller.reset()
else:
self.callee.reset()
self._stop_relaying()
def check_create_conntrack(self):
if self.stream.first_media_time is None:
self.stream.first_media_time = time()
if self.caller.remote.in_use and self.caller.remote.got_rtp and self.callee.remote.in_use and self.callee.remote.got_rtp:
self.forwarding_rule = _conntrack.ForwardingRule(self.caller.remote, self.caller.local, self.callee.remote, self.callee.local, self.stream.session.mark)
self.forwarding_rule.expired_func = self.conntrack_expired
def send_data(self, source, data, is_stun):
if source is self.caller:
dest = self.callee
else:
dest = self.caller
if dest.remote:
# if we have already learnt the remote address of the destination, use that
ip, port = dest.remote.host, dest.remote.port
dest.listener.protocol.send(data, is_stun, ip, port)
else:
# otherwise use the IP/port specified in the SDP, if public
dest.listener.protocol.send(data, is_stun)
def conntrack_expired(self):
try:
timeout_wait = int(open(UDP_TIMEOUT_FILE).read())
except:
timeout_wait = 0
self.expired("conntrack timeout", timeout_wait)
def expired(self, reason, timeout_wait):
self._stop_relaying()
self.stream.substream_expired(self, reason, timeout_wait)
def cleanup(self):
self.caller.cleanup()
self.callee.cleanup()
self._stop_relaying()
self.stream = None
class MediaParty(object):
def __init__(self, stream):
self.manager = stream.session.manager
self._remote_sdp = None
self.is_on_hold = False
self.uses_ice = False
while True:
self.listener_rtp = None
self.ports = port_rtp, port_rtcp = self.manager.get_ports()
try:
self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=RelayConfig.relay_ip)
self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=RelayConfig.relay_ip)
except CannotListenError:
if self.listener_rtp is not None:
self.listener_rtp.stopListening()
self.manager.set_bad_ports(self.ports)
log.warn("Cannot use port pair %d/%d" % self.ports)
else:
break
def _get_remote_sdp(self):
return self._remote_sdp
def _set_remote_sdp(self, (ip, port)):
self._remote_sdp = ip, port
self.listener_rtp.protocol.set_remote_sdp(ip, port)
remote_sdp = property(_get_remote_sdp, _set_remote_sdp)
def cleanup(self):
self.listener_rtp.stopListening()
self.listener_rtcp.stopListening()
self.manager.free_ports(self.ports)
self.manager = None
class MediaStream(object):
def __init__(self, session, media_type, media_ip, media_port, direction, media_parameters, initiating_party):
self.is_alive = True
self.session = session
self.media_type = media_type
self.caller = MediaParty(self)
self.callee = MediaParty(self)
self.rtp = MediaSubStream(self, self.caller.listener_rtp, self.callee.listener_rtp)
self.rtcp = MediaSubStream(self, self.caller.listener_rtcp, self.callee.listener_rtcp)
getattr(self, initiating_party).remote_sdp = (media_ip, media_port)
getattr(self, initiating_party).uses_ice = (media_parameters.get("ice", "no") == "yes")
self.check_hold(initiating_party, direction, media_ip)
self.create_time = time()
self.first_media_time = None
self.start_time = None
self.end_time = None
self.status = "active"
self.timeout_wait = 0
def __str__(self):
if self.caller.remote_sdp is None:
src = "Unknown"
else:
src = "%s:%d" % self.caller.remote_sdp
if self.caller.is_on_hold:
src += " ON HOLD"
if self.caller.uses_ice:
src += " (ICE)"
if self.callee.remote_sdp is None:
dst = "Unknown"
else:
dst = "%s:%d" % self.callee.remote_sdp
if self.callee.is_on_hold:
dst += " ON HOLD"
if self.callee.uses_ice:
dst += " (ICE)"
rtp = self.rtp
rtcp = self.rtcp
return "(%s) %s (RTP: %s, RTCP: %s) <-> %s <-> %s <-> %s (RTP: %s, RTCP: %s)" % (
self.media_type, src, rtp.caller.remote, rtcp.caller.remote, rtp.caller.local, rtp.callee.local, dst, rtp.callee.remote, rtcp.callee.remote)
@property
def counters(self):
return self.rtp.counters + self.rtcp.counters
@property
def is_on_hold(self):
return self.caller.is_on_hold or self.callee.is_on_hold
def check_hold(self, party, direction, ip):
previous_hold = self.is_on_hold
party = getattr(self, party)
if direction == "sendonly" or direction == "inactive":
party.is_on_hold = True
elif ip == "0.0.0.0":
party.is_on_hold = True
else:
party.is_on_hold = False
if previous_hold and not self.is_on_hold:
for substream in [self.rtp, self.rtcp]:
for subparty in [substream.caller, substream.callee]:
self.status = "active"
subparty.after_hold()
if not previous_hold and self.is_on_hold:
for substream in [self.rtp, self.rtcp]:
for subparty in [substream.caller, substream.callee]:
self.status = "on hold"
subparty.before_hold()
def reset(self, party, media_ip, media_port):
self.rtp.reset(party)
self.rtcp.reset(party)
getattr(self, party).remote_sdp = (media_ip, media_port)
def substream_expired(self, substream, reason, timeout_wait):
if substream is self.rtp and self.caller.uses_ice and self.callee.uses_ice:
log.debug("RTP stream expired for session %s: %s" % (self.session, reason))
reason = "unselected ICE candidate"
if not substream.caller.got_stun_probing and not substream.callee.got_stun_probing:
log.debug("unselected ICE candidate for session %s but no STUN was received" % self.session)
if substream is self.rtcp or (self.is_on_hold and reason=='conntrack timeout'):
# Forget about the remote addresses, this will cause any
# re-occurence of the same traffic to be forwarded again
substream.caller.remote.forget()
substream.caller.listener.protocol.send_packet_count = 0
substream.callee.remote.forget()
substream.callee.listener.protocol.send_packet_count = 0
else:
session = self.session
self.cleanup(reason)
self.timeout_wait = timeout_wait
session.stream_expired(self)
def cleanup(self, status="closed"):
if self.is_alive:
self.is_alive = False
self.status = status
self.caller.cleanup()
self.callee.cleanup()
self.rtp.cleanup()
self.rtcp.cleanup()
self.session = None
self.end_time = time()
class Session(object):
def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark = 0):
self.manager = manager
self.dispatcher = dispatcher
self.call_id = call_id
self.from_tag = from_tag
self.to_tag = None
self.mark = mark
self.from_uri = from_uri
self.to_uri = to_uri
self.caller_ua = None
self.callee_ua = None
self.cseq = None
self.previous_cseq = None
self.streams = {}
self.start_time = None
self.end_time = None
self.update_media(cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq)
def __str__(self):
return "%s: %s (%s) --> %s" % (self.call_id, self.from_uri, self.from_tag, self.to_uri)
def update_media(self, cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq):
if self.cseq is None:
old_cseq = (0,0)
else:
old_cseq = self.cseq
if is_caller_cseq:
cseq = (cseq, old_cseq[1])
if self.to_tag is None and to_tag is not None:
self.to_tag = to_tag
else:
cseq = (old_cseq[0], cseq)
if is_downstream:
party = "caller"
if self.caller_ua is None:
self.caller_ua = user_agent
else:
party = "callee"
if self.callee_ua is None:
self.callee_ua = user_agent
if self.cseq is None or cseq > self.cseq:
if not media_list:
return
log.debug("Received new SDP offer")
self.streams[cseq] = new_streams = []
if self.cseq is None:
old_streams = []
else:
old_streams = self.streams[self.cseq]
for media_type, media_ip, media_port, media_direction, media_parameters in media_list:
stream = None
for old_stream in old_streams:
old_remote = getattr(old_stream, party).remote_sdp
if old_remote is not None:
old_ip, old_port = old_remote
else:
old_ip, old_port = None, None
if old_stream.is_alive and old_stream.media_type == media_type and ((media_ip, media_port) in ((old_ip, old_port), ('0.0.0.0', old_port), (old_ip, 0))):
stream = old_stream
stream.check_hold(party, media_direction, media_ip)
log.debug("Found matching existing stream: %s" % stream)
break
if stream is None:
stream = MediaStream(self, media_type, media_ip, media_port, media_direction, media_parameters, party)
log.debug("Added new stream: %s" % stream)
if media_port == 0:
stream.cleanup()
log.debug("Stream explicitly closed: %s" % stream)
new_streams.append(stream)
if self.previous_cseq is not None:
for stream in self.streams[self.previous_cseq]:
if stream not in self.streams[self.cseq] + new_streams:
stream.cleanup()
self.previous_cseq = self.cseq
self.cseq = cseq
elif self.cseq == cseq:
log.debug("Received updated SDP answer")
now = time()
if self.start_time is None:
self.start_time = now
current_streams = self.streams[cseq]
for stream in current_streams:
if stream.start_time is None:
stream.start_time = now
if to_tag is not None and not media_list:
return
if len(media_list) < len(current_streams):
for stream in current_streams[len(media_list):]:
log.debug("Stream rejected by not being included in the SDP answer: %s" % stream)
stream.cleanup("rejected")
if stream.start_time is None:
stream.start_time = now
for stream, (media_type, media_ip, media_port, media_direction, media_parameters) in zip(current_streams, media_list):
if stream.media_type != media_type:
raise ValueError('Media types do not match: "%s" and "%s"' % (stream.media_type, media_type))
if media_port == 0:
log.debug("Stream explicitly rejected: %s" % stream)
stream.cleanup("rejected")
continue
stream.check_hold(party, media_direction, media_ip)
party_info = getattr(stream, party)
party_info.uses_ice = (media_parameters.get("ice", "no") == "yes")
if party_info.remote_sdp is None or party_info.remote_sdp[0] == "0.0.0.0":
party_info.remote_sdp = (media_ip, media_port)
log.debug("Got initial answer from %s for stream: %s" % (party, stream))
else:
if party_info.remote_sdp[1] != media_port or (party_info.remote_sdp[0] != media_ip != '0.0.0.0'):
stream.reset(party, media_ip, media_port)
log.debug("Updated %s for stream: %s" % (party, stream))
else:
log.debug("Unchanged stream: %s" % stream)
if self.previous_cseq is not None:
for stream in [stream for stream in self.streams[self.previous_cseq] if stream not in current_streams]:
log.debug("Removing old stream: %s" % stream)
stream.cleanup()
else:
log.debug("Received old CSeq %d:%d, ignoring" % cseq)
def get_local_media(self, is_downstream, cseq, is_caller_cseq):
if is_caller_cseq:
pos = 0
else:
pos = 1
try:
cseq = max(key for key in self.streams.keys() if key[pos] == cseq)
except ValueError:
return None
if is_downstream:
retval = [(stream.status in ["active", "on hold"]) and tuple(stream.rtp.callee.local) or (stream.rtp.callee.local.host, 0) for stream in self.streams[cseq]]
else:
retval = [(stream.status in ["active", "on hold"]) and tuple(stream.rtp.caller.local) or (stream.rtp.caller.local.host, 0) for stream in self.streams[cseq]]
return retval
def cleanup(self):
self.end_time = time()
for cseq in [self.previous_cseq, self.cseq]:
if cseq is not None:
for stream in self.streams[cseq]:
stream.cleanup()
def stream_expired(self, stream):
active_streams = set()
for cseq in [self.previous_cseq, self.cseq]:
if cseq is not None:
active_streams.update([stream for stream in self.streams[cseq] if stream.is_alive])
if len(active_streams) == 0:
self.manager.session_expired(self.call_id, self.from_tag)
@property
def duration(self):
if self.start_time is not None:
if self.end_time is not None:
return int(self.end_time - self.start_time)
else:
return int(time() - self.start_time)
else:
return 0
@property
def relayed_bytes(self):
return sum(stream.counters.relayed_bytes for stream in set(chain(*self.streams.itervalues())))
@property
def statistics(self):
all_streams = set(chain(*self.streams.itervalues()))
attributes = ('call_id', 'from_tag', 'from_uri', 'to_tag', 'to_uri', 'start_time', 'duration')
stats = dict((name, getattr(self, name)) for name in attributes)
stats['caller_ua'] = self.caller_ua or 'Unknown'
stats['callee_ua'] = self.callee_ua or 'Unknown'
stats['streams'] = streams = []
stream_attributes = ('media_type', 'status', 'timeout_wait')
for stream in sorted(all_streams, key=attrgetter('start_time')):
info = dict((name, getattr(stream, name)) for name in stream_attributes)
info['caller_codec'] = stream.rtp.caller.codec
info['callee_codec'] = stream.rtp.callee.codec
if stream.start_time is None:
info['start_time'] = info['end_time'] = None
elif self.start_time is None:
info['start_time'] = info['end_time'] = 0
else:
info['start_time'] = max(int(stream.start_time - self.start_time), 0)
if stream.status == 'rejected':
info['end_time'] = info['start_time']
else:
if stream.end_time is None:
info['end_time'] = stats['duration']
else:
info['end_time'] = min(int(stream.end_time - self.start_time), self.duration)
if stream.first_media_time is None:
info['post_dial_delay'] = None
else:
info['post_dial_delay'] = stream.first_media_time - stream.create_time
caller = stream.rtp.caller
callee = stream.rtp.callee
info.update(stream.counters)
info['caller_local'] = str(caller.local)
info['callee_local'] = str(callee.local)
info['caller_remote'] = str(caller.remote)
info['callee_remote'] = str(callee.remote)
streams.append(info)
return stats
class SessionManager(Logger):
implements(IReadDescriptor)
def __init__(self, relay, start_port, end_port):
self.relay = relay
self.ports = deque((i, i+1) for i in xrange(start_port, end_port, 2))
self.bad_ports = deque()
self.sessions = {}
self.watcher = _conntrack.ExpireWatcher()
self.active_byte_counter = 0 # relayed byte counter for sessions active during last speed measurement
self.closed_byte_counter = 0 # relayed byte counter for sessions closed after last speed measurement
self.bps_relayed = 0
if RelayConfig.traffic_sampling_period > 0:
self.speed_calculator = RecurrentCall(RelayConfig.traffic_sampling_period, self._measure_speed)
else:
self.speed_calculator = None
reactor.addReader(self)
def _measure_speed(self):
start_time = time()
current_byte_counter = sum(session.relayed_bytes for session in self.sessions.itervalues())
self.bps_relayed = 8 * (current_byte_counter + self.closed_byte_counter - self.active_byte_counter) / RelayConfig.traffic_sampling_period
self.active_byte_counter = current_byte_counter
self.closed_byte_counter = 0
us_taken = int((time() - start_time) * 1000000)
if us_taken > 10000:
log.warn("Aggregate speed calculation time exceeded 10ms: %d us for %d sessions" % (us_taken, len(self.sessions)))
return KeepRunning
# implemented for IReadDescriptor
def fileno(self):
return self.watcher.fd
def doRead(self):
stream = self.watcher.read()
if stream:
stream.expired_func()
def connectionLost(self, reason):
reactor.removeReader(self)
# port management
def get_ports(self):
if len(self.bad_ports) > len(self.ports):
log.debug("Excessive amount of bad ports, doing cleanup")
self.ports.extend(self.bad_ports)
self.bad_ports = deque()
try:
return self.ports.popleft()
except IndexError:
raise RelayPortsExhaustedError()
def set_bad_ports(self, ports):
self.bad_ports.append(ports)
def free_ports(self, ports):
self.ports.append(ports)
# called by higher level
def _find_session_key(self, call_id, from_tag, to_tag):
key_from = (call_id, from_tag)
if key_from in self.sessions:
return key_from
if to_tag:
key_to = (call_id, to_tag)
if key_to in self.sessions:
return key_to
return None
def has_session(self, call_id, from_tag, to_tag=None, **kw):
return any((call_id, tag) in self.sessions for tag in (from_tag, to_tag) if tag is not None)
def update_session(self, dispatcher, call_id, from_tag, from_uri, to_uri, cseq, user_agent, type, media=[], to_tag=None, **kw):
key = self._find_session_key(call_id, from_tag, to_tag)
if key:
session = self.sessions[key]
log.debug("updating existing session %s" % session)
is_downstream = (session.from_tag != from_tag) ^ (type == "request")
is_caller_cseq = (session.from_tag == from_tag)
session.update_media(cseq, to_tag, user_agent, media, is_downstream, is_caller_cseq)
elif type == "reply" and not media:
return None
else:
is_downstream = type == "request"
is_caller_cseq = True
session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq)
self.sessions[(call_id, from_tag)] = session
self.relay.add_session(dispatcher)
log.debug("created new session %s" % session)
return session.get_local_media(is_downstream, cseq, is_caller_cseq)
def remove_session(self, call_id, from_tag, to_tag=None, **kw):
key = self._find_session_key(call_id, from_tag, to_tag)
try:
session = self.sessions[key]
except KeyError:
log.warn("The dispatcher tried to remove a session which is no longer present on the relay")
return None
log.debug("removing session %s" % session)
session.cleanup()
self.closed_byte_counter += session.relayed_bytes
del self.sessions[key]
reactor.callLater(0, self.relay.remove_session, session.dispatcher)
return session
def session_expired(self, call_id, from_tag):
key = (call_id, from_tag)
try:
session = self.sessions[key]
except KeyError:
log.warn("A session expired that was no longer present on the relay")
return
log.debug("expired session %s" % session)
session.cleanup()
self.closed_byte_counter += session.relayed_bytes
del self.sessions[key]
self.relay.session_expired(session)
self.relay.remove_session(session.dispatcher)
def cleanup(self):
if self.speed_calculator is not None:
self.speed_calculator.cancel()
for key in self.sessions.keys():
self.session_expired(*key)
@property
def statistics(self):
return [session.statistics for session in self.sessions.itervalues()]
@property
def stream_count(self):
stream_count = {}
for session in self.sessions.itervalues():
for stream in set(chain(*session.streams.itervalues())):
if stream.is_alive:
stream_count[stream.media_type] = stream_count.get(stream.media_type, 0) + 1
return stream_count
diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py
index 56d8283..277e59f 100644
--- a/mediaproxy/relay.py
+++ b/mediaproxy/relay.py
@@ -1,388 +1,386 @@
-# Copyright (C) 2008-2014 AG Projects
-#
"""Implementation of the MediaProxy relay"""
import cjson
import signal
import resource
from time import time
try: from twisted.internet import epollreactor; epollreactor.install()
except: raise RuntimeError("mandatory epoll reactor support is missing from the twisted framework")
from application import log
from application.process import process
from gnutls.errors import CertificateError, CertificateSecurityError
from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError
from twisted.internet.protocol import ClientFactory
from twisted.internet.defer import DeferredList, succeed
from twisted.internet import reactor
from twisted.python import failure
from twisted.names import dns
from twisted.names.client import lookupService
from twisted.names.error import DomainError
from mediaproxy import __version__
from mediaproxy.configuration import RelayConfig
from mediaproxy.headers import DecodingDict, DecodingError
from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError
from mediaproxy.scheduler import RecurrentCall, KeepRunning
from mediaproxy.tls import X509Credentials
## Increase the system limit for the maximum number of open file descriptors
## to be able to handle connections to all ports in port_range
try:
fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000
resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit))
except ValueError:
raise RuntimeError("Cannot set resource limit for maximum open file descriptors to %d" % fd_limit)
else:
new_limits = resource.getrlimit(resource.RLIMIT_NOFILE)
if new_limits < (fd_limit, fd_limit):
raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit))
else:
log.msg("Set resource limit for maximum open file descriptors to %d" % fd_limit)
class RelayClientProtocol(LineOnlyReceiver):
noisy = False
required_headers = {'update': set(['call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type']),
'remove': set(['call_id', 'from_tag']),
'summary': set(),
'sessions': set()}
def __init__(self):
self.command = None
self.seq = None
self._connection_watcher = None
self._queued_keepalives = 0
def _send_keepalive(self):
if self._queued_keepalives >= 3:
# 3 keepalives in a row didn't get an answer. assume connection is down.
log.error("missed 3 keepalive answers in a row. assuming the connection is down.")
# do not use loseConnection() as it waits to flush the output buffers.
reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError()))
return None
self.transport.write("ping\r\n")
self._queued_keepalives += 1
return KeepRunning
def connectionMade(self):
peer = self.transport.getPeer()
log.debug("Connected to dispatcher at %s:%d" % (peer.host, peer.port))
if RelayConfig.passport is not None:
peer_cert = self.transport.getPeerCertificate()
if not RelayConfig.passport.accept(peer_cert):
self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted'))
self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive)
def connectionLost(self, reason):
if self._connection_watcher is not None:
self._connection_watcher.cancel()
self._connection_watcher = None
self._queued_keepalives = 0
def lineReceived(self, line):
if line == 'pong':
self._queued_keepalives -= 1
return
if self.command is None:
try:
command, seq = line.split()
except ValueError:
log.error("Could not decode command/sequence number pair from dispatcher: %s" % line)
return
if command in self.required_headers:
self.command = command
self.seq = seq
self.headers = DecodingDict()
else:
log.error("Unknown command: %s" % command)
self.transport.write("%s error\r\n" % seq)
elif line == "":
try:
missing_headers = self.required_headers[self.command].difference(self.headers)
if missing_headers:
for header in missing_headers:
log.error("Missing mandatory header '%s' from '%s' command" % (header, self.command))
response = "error"
else:
try:
response = self.factory.parent.got_command(self.factory.host, self.command, self.headers)
except:
log.err()
response = "error"
finally:
self.transport.write("%s %s\r\n" % (self.seq, response))
self.command = None
else:
try:
name, value = line.split(": ", 1)
except ValueError:
log.error("Unable to parse header: %s" % line)
else:
try:
self.headers[name] = value
except DecodingError, e:
log.error("Could not decode header: %s" % e)
class DispatcherConnectingFactory(ClientFactory):
noisy = False
protocol = RelayClientProtocol
def __init__(self, parent, host, port):
self.parent = parent
self.host = (host, port)
self.delayed = None
self.connection_lost = False
def __eq__(self, other):
return self.host == other.host
def clientConnectionFailed(self, connector, reason):
log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value))
if self.parent.connector_needs_reconnect(connector):
self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect)
def clientConnectionLost(self, connector, reason):
self.cancel_delayed()
if reason.type != ConnectionDone:
log.error("Connection with dispatcher at %(host)s:%(port)d was lost: %%s" % connector.__dict__ % reason.value)
else:
log.msg("Connection with dispatcher at %(host)s:%(port)d was closed" % connector.__dict__)
if self.parent.connector_needs_reconnect(connector):
if isinstance(reason.value, CertificateError) or self.connection_lost:
self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect)
else:
self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect)
self.connection_lost = True
def buildProtocol(self, addr):
self.delayed = reactor.callLater(5, self._connected_successfully)
return ClientFactory.buildProtocol(self, addr)
def _connected_successfully(self):
self.connection_lost = False
def cancel_delayed(self):
if self.delayed:
if self.delayed.active():
self.delayed.cancel()
self.delayed = None
class SRVMediaRelayBase(object):
def __init__(self):
self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup)
self._do_lookup()
def _do_lookup(self):
defers = []
for addr, port, is_domain in RelayConfig.dispatchers:
if is_domain:
defer = lookupService("_sip._udp.%s" % addr)
defer.addCallback(self._cb_got_srv, port)
defer.addErrback(self._eb_no_srv, addr, port)
defers.append(defer)
else:
defers.append(succeed((addr, port)))
defer = DeferredList(defers)
defer.addCallback(self._cb_got_all)
return KeepRunning
def _cb_got_srv(self, (answers, auth, add), port):
for answer in answers:
if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."):
return str(answer.payload.target), port
raise DomainError
def _eb_no_srv(self, failure, addr, port):
failure.trap(DomainError)
return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr)
def _eb_no_dns(self, failure, addr):
failure.trap(DNSLookupError)
log.error("Could resolve neither SRV nor A record for '%s'" % addr)
def _cb_got_all(self, results):
if not self.shutting_down:
dispatchers = [result[1] for result in results if result[0] and result[1] is not None]
self.update_dispatchers(dispatchers)
def update_dispatchers(self, dispatchers):
raise NotImplementedError()
def run(self):
process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP)
process.signals.add_handler(signal.SIGINT, self._handle_SIGINT)
process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM)
reactor.run(installSignalHandlers=False)
def _handle_SIGHUP(self, *args):
log.msg("Received SIGHUP, shutting down after all sessions have expired.")
reactor.callFromThread(self.shutdown, graceful=True)
def _handle_SIGINT(self, *args):
if process._daemon:
log.msg("Received SIGINT, shutting down.")
else:
log.msg("Received KeyboardInterrupt, exiting.")
reactor.callFromThread(self.shutdown)
def _handle_SIGTERM(self, *args):
log.msg("Received SIGTERM, shutting down.")
reactor.callFromThread(self.shutdown)
def shutdown(self, graceful=False):
raise NotImplementedError()
def on_shutdown(self):
pass
def _shutdown(self):
reactor.stop()
self.on_shutdown()
try:
from mediaproxy.sipthor import SIPThorMediaRelayBase
MediaRelayBase = SIPThorMediaRelayBase
except ImportError:
MediaRelayBase = SRVMediaRelayBase
class MediaRelay(MediaRelayBase):
def __init__(self):
self.cred = X509Credentials(cert_name='relay')
self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end)
self.dispatchers = set()
self.dispatcher_session_count = {}
self.dispatcher_connectors = {}
self.old_connectors = {}
self.shutting_down = False
self.graceful_shutdown = False
self.start_time = time()
MediaRelayBase.__init__(self)
@property
def status(self):
if self.graceful_shutdown or self.shutting_down:
return 'halting'
else:
return 'active'
def update_dispatchers(self, dispatchers):
dispatchers = set(dispatchers)
for new_dispatcher in dispatchers.difference(self.dispatchers):
if new_dispatcher in self.old_connectors.iterkeys():
log.debug('Restoring old dispatcher at %s:%d' % new_dispatcher)
self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher)
else:
log.debug('Adding new dispatcher at %s:%d' % new_dispatcher)
dispatcher_addr, dispatcher_port = new_dispatcher
factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port)
self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.cred)
for old_dispatcher in self.dispatchers.difference(dispatchers):
log.debug('Removing old dispatcher at %s:%d' % old_dispatcher)
self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher)
self._check_disconnect(old_dispatcher)
self.dispatchers = dispatchers
def got_command(self, dispatcher, command, headers):
if command == "summary":
summary = {'ip' : RelayConfig.relay_ip,
'version' : __version__,
'status' : self.status,
'uptime' : int(time() - self.start_time),
'session_count' : len(self.session_manager.sessions),
'stream_count' : self.session_manager.stream_count,
'bps_relayed' : self.session_manager.bps_relayed}
return cjson.encode(summary)
elif command == "sessions":
return cjson.encode(self.session_manager.statistics)
elif command == "update":
if self.graceful_shutdown or self.shutting_down:
if not self.session_manager.has_session(**headers):
log.debug("cannot add new session: media-relay is shutting down")
return 'halting'
try:
local_media = self.session_manager.update_session(dispatcher, **headers)
except RelayPortsExhaustedError:
log.error("Could not reserve relay ports for session, all allocated ports are being used")
return "error"
if local_media:
return " ".join([RelayConfig.advertised_ip or local_media[0][0]] + [str(media[1]) for media in local_media])
else: # remove
session = self.session_manager.remove_session(**headers)
if session is None:
return "error"
else:
return cjson.encode(session.statistics)
def session_expired(self, session):
connector = self.dispatcher_connectors.get(session.dispatcher)
if connector is None:
connector = self.old_connectors.get(session.dispatcher)
if connector and connector.state == "connected":
connector.transport.write(" ".join(["expired", cjson.encode(session.statistics)]) + "\r\n")
else:
log.warn("dispatcher for expired session is no longer online, statistics are lost!")
def add_session(self, dispatcher):
self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1
def remove_session(self, dispatcher):
self.dispatcher_session_count[dispatcher] -= 1
if self.dispatcher_session_count[dispatcher] == 0:
del self.dispatcher_session_count[dispatcher]
if self.graceful_shutdown and not self.dispatcher_session_count:
self.shutdown()
elif dispatcher in self.old_connectors:
self._check_disconnect(dispatcher)
def _check_disconnect(self, dispatcher):
connector = self.old_connectors[dispatcher]
if self.dispatcher_session_count.get(dispatcher, 0) == 0:
old_state = connector.state
connector.factory.cancel_delayed()
connector.disconnect()
if old_state == "disconnected":
del self.old_connectors[dispatcher]
if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0:
self._shutdown()
def connector_needs_reconnect(self, connector):
if connector in self.dispatcher_connectors.values():
return True
else:
for dispatcher, old_connector in self.old_connectors.items():
if old_connector is connector:
if self.dispatcher_session_count.get(dispatcher, 0) > 0:
return True
else:
del self.old_connectors[dispatcher]
break
if self.shutting_down:
if len(self.old_connectors) == 0:
self._shutdown()
return False
def shutdown(self, graceful=False):
if graceful:
self.graceful_shutdown = True
if self.dispatcher_session_count:
return
if not self.shutting_down:
self.shutting_down = True
self.srv_monitor.cancel()
self.session_manager.cleanup()
if len(self.dispatcher_connectors) + len(self.old_connectors) == 0:
self._shutdown()
else:
self.update_dispatchers([])
diff --git a/mediaproxy/scheduler.py b/mediaproxy/scheduler.py
index 48eb6db..dfa4a3d 100644
--- a/mediaproxy/scheduler.py
+++ b/mediaproxy/scheduler.py
@@ -1,48 +1,46 @@
-# Copyright (C) 2007-2014 Dan Pascu
-#
"""Schedule calls on the twisted reactor"""
__all__ = ['RecurrentCall', 'KeepRunning']
from time import time
class KeepRunning:
"""Return this class from a recurrent function to indicate that it should keep running"""
pass
class RecurrentCall(object):
"""Execute a function repeatedly at the given interval, until signaled to stop"""
def __init__(self, period, func, *args, **kwargs):
from twisted.internet import reactor
self.func = func
self.args = args
self.kwargs = kwargs
self.period = period
self.now = None
self.next = None
self.callid = reactor.callLater(period, self)
def __call__(self):
from twisted.internet import reactor
self.callid = None
if self.now is None:
self.now = time()
self.next = self.now + self.period
else:
self.now, self.next = self.next, self.next + self.period
result = self.func(*self.args, **self.kwargs)
if result is KeepRunning:
delay = max(self.next-time(), 0)
self.callid = reactor.callLater(delay, self)
def cancel(self):
if self.callid is not None:
try:
self.callid.cancel()
except ValueError:
pass
self.callid = None
diff --git a/mediaproxy/sipthor.py b/mediaproxy/sipthor.py
index fe12300..5d2ed87 100644
--- a/mediaproxy/sipthor.py
+++ b/mediaproxy/sipthor.py
@@ -1,64 +1,62 @@
-# Copyright (C) 2008-2014 AG Projects
-#
"""SIP Thor backend"""
from application import log
from gnutls.constants import COMP_LZO, COMP_DEFLATE, COMP_NULL
from thor.entities import ThorEntities, GenericThorEntity
from thor.eventservice import EventServiceClient, ThorEvent
from thor.tls import X509Credentials
from mediaproxy import __version__
from mediaproxy.configuration import ThorNetworkConfig
from mediaproxy.configuration.datatypes import DispatcherIPAddress
from mediaproxy.relay import SRVMediaRelayBase
if ThorNetworkConfig.domain is None:
## SIP Thor is installed but disabled. Fake an ImportError to start in standalone media relay mode.
log.warn("SIP Thor is installed but disabled from the configuration")
raise ImportError("SIP Thor is disabled")
class SIPThorMediaRelayBase(EventServiceClient, SRVMediaRelayBase):
topics = ["Thor.Members"]
def __init__(self):
self.node = GenericThorEntity(ThorNetworkConfig.node_ip, ["media_relay"], version=__version__)
self.presence_message = ThorEvent('Thor.Presence', self.node.id)
self.shutdown_message = ThorEvent('Thor.Leave', self.node.id)
credentials = X509Credentials(cert_name='relay')
credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL)
self.sipthor_dispatchers = []
self.additional_dispatchers = []
EventServiceClient.__init__(self, ThorNetworkConfig.domain, credentials)
SRVMediaRelayBase.__init__(self)
def handle_event(self, event):
if not self.shutting_down:
sip_proxy_ips = [node.ip for node in ThorEntities(event.message, role="sip_proxy")]
self.sipthor_dispatchers = [(ip, DispatcherIPAddress.default_port) for ip in sip_proxy_ips]
self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers)
def _cb_got_all(self, results):
if not self.shutting_down:
self.additional_dispatchers = [result[1] for result in results if result[0] and result[1] is not None]
self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers)
def update_dispatchers(self, dispatchers):
raise NotImplementedError()
def _handle_SIGHUP(self, *args):
SRVMediaRelayBase._handle_SIGHUP(self, *args)
def _handle_SIGINT(self, *args):
SRVMediaRelayBase._handle_SIGINT(self, *args)
def _handle_SIGTERM(self, *args):
SRVMediaRelayBase._handle_SIGTERM(self, *args)
def shutdown(self, graceful=False):
raise NotImplementedError()
diff --git a/mediaproxy/tls.py b/mediaproxy/tls.py
index 091c0f2..3f92ece 100644
--- a/mediaproxy/tls.py
+++ b/mediaproxy/tls.py
@@ -1,86 +1,84 @@
-# Copyright (C) 2007-2014 AG Projects.
-#
"""TLS support"""
__all__ = ['X509Credentials']
import os
import stat
from application.process import process
from gnutls import crypto
from gnutls.interfaces import twisted
from mediaproxy.configuration import TLSConfig
class FileDescriptor(object):
def __init__(self, name, type):
certs_path = os.path.normpath(TLSConfig.certs_path)
self.path = os.path.join(certs_path, name)
self.klass = type
self.timestamp = 0
self.object = None
def get(self):
path = process.config_file(self.path)
if path is None:
raise RuntimeError("missing or unreadable file: %s" % self.path)
mtime = os.stat(path)[stat.ST_MTIME]
if self.timestamp < mtime:
f = open(path)
try:
self.object = self.klass(f.read())
self.timestamp = mtime
finally:
f.close()
return self.object
class X509Entity(object):
type = None
def __init__(self, name_attr):
self.name_attr = name_attr
self.descriptors = {}
def __get__(self, obj, type_=None):
name = getattr(obj or type_, self.name_attr, None)
if name is None:
return None
descriptor = self.descriptors.setdefault(name, FileDescriptor(name, self.type))
return descriptor.get()
def __set__(self, obj, value):
raise AttributeError("cannot set attribute")
def __delete__(self, obj):
raise AttributeError("cannot delete attribute")
class X509Certificate(X509Entity):
type = crypto.X509Certificate
class X509PrivateKey(X509Entity):
type = crypto.X509PrivateKey
class X509CRL(X509Entity):
type = crypto.X509CRL
class X509Credentials(twisted.X509Credentials):
"""SIPThor X509 credentials"""
X509cert_name = None ## will be defined by each instance
X509key_name = None ## will be defined by each instance
X509ca_name = 'ca.pem'
X509crl_name = 'crl.pem'
X509cert = X509Certificate(name_attr='X509cert_name')
X509key = X509PrivateKey(name_attr='X509key_name')
X509ca = X509Certificate(name_attr='X509ca_name')
X509crl = X509CRL(name_attr='X509crl_name')
def __init__(self, cert_name):
self.X509cert_name = '%s.crt' % cert_name
self.X509key_name = '%s.key' % cert_name
twisted.X509Credentials.__init__(self, self.X509cert, self.X509key, [self.X509ca], [self.X509crl])
self.verify_peer = True
self.verify_period = TLSConfig.verify_interval