diff --git a/media-dispatcher b/media-dispatcher index 943ffbd..715bd63 100644 --- a/media-dispatcher +++ b/media-dispatcher @@ -1,60 +1,63 @@ #!/usr/bin/env python if __name__ == '__main__': import sys from optparse import OptionParser from application.process import process, ProcessError from application.configuration import ConfigFile, datatypes from application import log import mediaproxy name = 'media-dispatcher' fullname = 'MediaProxy Dispatcher' description = 'MediaProxy Dispatcher component' - default_pid = mediaproxy.runtime_directory + '/dispatcher.pid' + process.configuration.user_directory = None + process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory + process.runtime.subdirectory = mediaproxy.mediaproxy_subdirectory + + default_pid = process.runtime.file('{}.pid'.format(name)) parser = OptionParser(version="%%prog %s" % mediaproxy.__version__) parser.add_option("--no-fork", action="store_false", dest="fork", default=1, help="run the process in the foreground (for debugging)") parser.add_option("--pid", dest="pid_file", default=default_pid, help="pid file (%s)" % default_pid, metavar="File") (options, args) = parser.parse_args() - process.system_config_directory = mediaproxy.config_directory config_file = ConfigFile(mediaproxy.configuration_file) log.level.current = config_file.get_setting('Dispatcher', 'log_level', type=datatypes.LogLevel, default=log.level.DEBUG) try: - process.runtime_directory = mediaproxy.runtime_directory - except ProcessError, e: + process.runtime.create_directory() + except ProcessError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) if options.fork: try: process.daemonize(options.pid_file) - except ProcessError, e: + except ProcessError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) log.use_syslog(name) log.info('Starting %s %s' % (fullname, mediaproxy.__version__)) from mediaproxy.dispatcher import Dispatcher if not options.fork: from application.debug.memory import memory_dump try: dispatcher = Dispatcher() - except Exception, e: + except Exception as e: log.critical('Failed to create %s: %s' % (fullname, e)) - if e.__class__ is not RuntimeError: + if type(e) is not RuntimeError: log.exception() sys.exit(1) dispatcher.run() if not options.fork: #from application.debug.memory import memory_dump memory_dump() diff --git a/media-relay b/media-relay index 629b395..39a8d01 100644 --- a/media-relay +++ b/media-relay @@ -1,102 +1,99 @@ #!/usr/bin/env python if __name__ == '__main__': import errno import sys import subprocess from optparse import OptionParser from application import log from application.configuration import ConfigFile, datatypes from application.process import process, ProcessError from application.version import Version import mediaproxy IP_FORWARD_FILE = '/proc/sys/net/ipv4/ip_forward' CONNTRACK_ACCT_FILE = '/proc/sys/net/netfilter/nf_conntrack_acct' KERNEL_VERSION_FILE = '/proc/sys/kernel/osrelease' name = 'media-relay' fullname = 'MediaProxy Relay' description = 'MediaProxy Relay component' - default_pid = mediaproxy.runtime_directory + '/relay.pid' + process.configuration.user_directory = None + process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory + process.runtime.subdirectory = mediaproxy.mediaproxy_subdirectory + + default_pid = process.runtime.file('{}.pid'.format(name)) parser = OptionParser(version="%%prog %s" % mediaproxy.__version__) parser.add_option("--no-fork", action="store_false", dest="fork", default=1, help="run the process in the foreground (for debugging)") parser.add_option("--pid", dest="pid_file", default=default_pid, help="pid file (%s)" % default_pid, metavar="File") (options, args) = parser.parse_args() if not sys.platform.startswith('linux'): log.critical('Cannot start %s. A Linux host is required for operation.' % fullname) sys.exit(1) try: subprocess.call(['modprobe', 'ip_tables'], env={'PATH': '/usr/sbin:/sbin:/usr/bin:/bin'}) except OSError, e: log.critical('Cannot start %s: failed to load the ip_tables kernel module: %s' % (fullname, e)) sys.exit(1) try: kernel_version = Version.parse(open(KERNEL_VERSION_FILE).read().strip()) except (OSError, IOError, ValueError): log.critical('Could not determine Linux kernel version') sys.exit(1) if kernel_version < Version(2, 6, 18): log.critical('Linux kernel version 2.6.18 or newer is required to run the media relay') sys.exit(1) try: ip_forward = bool(int(open(IP_FORWARD_FILE).read())) except (OSError, IOError, ValueError): ip_forward = False if not ip_forward: log.critical('IP forwarding is not available or not enabled (check %s)' % IP_FORWARD_FILE) sys.exit(1) try: with open(CONNTRACK_ACCT_FILE, 'w') as acct_file: acct_file.write('1') except (IOError, OSError), e: if e.errno != errno.ENOENT: log.critical('Could not enable conntrack rule counters (check %s): %s' % (CONNTRACK_ACCT_FILE, e)) sys.exit(1) - process.system_config_directory = mediaproxy.config_directory config_file = ConfigFile(mediaproxy.configuration_file) log.level.current = config_file.get_setting('Relay', 'log_level', type=datatypes.LogLevel, default=log.level.DEBUG) - try: - process.runtime_directory = mediaproxy.runtime_directory - except ProcessError, e: - log.critical('Cannot start %s: %s' % (fullname, e)) - sys.exit(1) - if options.fork: try: process.daemonize(options.pid_file) except ProcessError, e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) log.use_syslog(name) log.info('Starting %s %s' % (fullname, mediaproxy.__version__)) try: from mediaproxy.relay import MediaRelay if not options.fork: from application.debug.memory import memory_dump relay = MediaRelay() except Exception, e: log.critical('Failed to create %s: %s' % (fullname, e)) if e.__class__ is not RuntimeError: log.exception() sys.exit(1) relay.run() if not options.fork: #from application.debug.memory import memory_dump memory_dump() diff --git a/mediaproxy/__init__.py b/mediaproxy/__init__.py index 0b606bf..af3c008 100644 --- a/mediaproxy/__init__.py +++ b/mediaproxy/__init__.py @@ -1,8 +1,6 @@ -"""Mediaproxy implements a media relay for SIP calls""" +__version__ = '2.6.6' -__version__ = "2.6.6" - -config_directory = '/etc/mediaproxy' -runtime_directory = '/run/mediaproxy' +# mediaproxy configuration and runtime settings +mediaproxy_subdirectory = 'mediaproxy' configuration_file = 'config.ini' diff --git a/mediaproxy/dispatcher.py b/mediaproxy/dispatcher.py index 1f6e6f9..0bba63d 100644 --- a/mediaproxy/dispatcher.py +++ b/mediaproxy/dispatcher.py @@ -1,533 +1,533 @@ """Implementation of the MediaProxy dispatcher""" import random import signal import cPickle as pickle import cjson from collections import deque from itertools import ifilter from time import time from application import log from application.process import process from application.system import unlink from gnutls.errors import CertificateSecurityError from gnutls.interfaces.twisted import TLSContext from twisted.protocols.basic import LineOnlyReceiver from twisted.python import failure from twisted.internet.error import ConnectionDone, TCPTimedOutError from twisted.internet.protocol import Factory from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed from twisted.internet import reactor from mediaproxy import __version__ from mediaproxy.configuration import DispatcherConfig from mediaproxy.interfaces import opensips from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials class ControlProtocol(LineOnlyReceiver): noisy = False def __init__(self): self.in_progress = 0 def lineReceived(self, line): raise NotImplementedError() def connectionLost(self, reason): log.debug("Connection to %s lost: %s" % (self.description, reason.value)) self.factory.connection_lost(self) def reply(self, reply): self.transport.write(reply + "\r\n") def _relay_error(self, failure): failure.trap(RelayError) log.error(failure.value) self.transport.write("error\r\n") def _catch_all(self, failure): log.error(failure.getBriefTraceback()) self.transport.write("error\r\n") def _decrement(self, result): self.in_progress = 0 if self.factory.shutting_down: self.transport.loseConnection() def _add_callbacks(self, defer): defer.addCallback(self.reply) defer.addErrback(self._relay_error) defer.addErrback(self._catch_all) defer.addBoth(self._decrement) class OpenSIPSControlProtocol(ControlProtocol): description = "OpenSIPS" def __init__(self): self.line_buf = [] ControlProtocol.__init__(self) def lineReceived(self, line): if line == "": if self.line_buf: self.in_progress += 1 defer = self.factory.dispatcher.send_command(self.line_buf[0], self.line_buf[1:]) self._add_callbacks(defer) self.line_buf = [] elif not line.endswith(": "): self.line_buf.append(line) class ManagementControlProtocol(ControlProtocol): description = "Management interface client" def connectionMade(self): if DispatcherConfig.management_use_tls and DispatcherConfig.management_passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.management_passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return def lineReceived(self, line): if line in ["quit", "exit"]: self.transport.loseConnection() elif line == "summary": defer = self.factory.dispatcher.relay_factory.get_summary() self._add_callbacks(defer) elif line == "sessions": defer = self.factory.dispatcher.relay_factory.get_statistics() self._add_callbacks(defer) elif line == "version": self.reply(__version__) else: log.error("Unknown command on management interface: %s" % line) self.reply("error") class ControlFactory(Factory): noisy = False def __init__(self, dispatcher): self.dispatcher = dispatcher self.protocols = [] self.shutting_down = False def buildProtocol(self, addr): prot = Factory.buildProtocol(self, addr) self.protocols.append(prot) return prot def connection_lost(self, prot): self.protocols.remove(prot) if self.shutting_down and len(self.protocols) == 0: self.defer.callback(None) def shutdown(self): if self.shutting_down: return self.shutting_down = True if len(self.protocols) == 0: return succeed(None) else: for prot in self.protocols: if prot.in_progress == 0: prot.transport.loseConnection() self.defer = Deferred() return self.defer class OpenSIPSControlFactory(ControlFactory): protocol = OpenSIPSControlProtocol class ManagementControlFactory(ControlFactory): protocol = ManagementControlProtocol class RelayError(Exception): pass class ConnectionReplaced(ConnectionDone): pass class RelayServerProtocol(LineOnlyReceiver): noisy = False MAX_LENGTH = 4096*1024 ## (4MB) def __init__(self): self.commands = {} self.halting = False self.timedout = False self.disconnect_timer = None self.sequence_number = 0 self.authenticated = False @property def active(self): return not self.halting and not self.timedout def send_command(self, command, headers): log.debug('Issuing "%s" command to relay at %s' % (command, self.ip)) seq = str(self.sequence_number) self.sequence_number += 1 defer = Deferred() timer = reactor.callLater(DispatcherConfig.relay_timeout, self._timeout, seq, defer) self.commands[seq] = (command, defer, timer) self.transport.write("\r\n".join([" ".join([command, seq])] + headers + ["", ""])) return defer def _timeout(self, seq, defer): del self.commands[seq] defer.errback(RelayError("Relay at %s timed out" % self.ip)) if self.timedout is False: self.timedout = True self.disconnect_timer = reactor.callLater(DispatcherConfig.relay_recover_interval, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) def connectionMade(self): if DispatcherConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return self.authenticated = True self.factory.new_relay(self) def lineReceived(self, line): try: first, rest = line.split(" ", 1) except ValueError: first = line rest = "" if first == "expired": try: stats = cjson.decode(rest) except cjson.DecodeError: log.error("Error decoding JSON from relay at %s" % self.ip) else: call_id = stats['call_id'] session = self.factory.sessions.get(call_id, None) if session is None: log.error("Unknown session with call_id %s expired at relay %s" % (call_id, self.ip)) return if session.relay_ip != self.ip: log.error("session with call_id %s expired at relay %s, but is actually at relay %s, ignoring" % (call_id, self.ip, session.relay_ip)) return all_streams_ice = all(stream_info["status"] == "unselected ICE candidate" for stream_info in stats["streams"]) if all_streams_ice: log.info('session with call_id %s from relay %s removed because ICE was used' % (call_id, session.relay_ip)) stats["timed_out"] = False else: log.info('session with call_id %s from relay %s did timeout' % (call_id, session.relay_ip)) stats["timed_out"] = True stats["dialog_id"] = session.dialog_id stats["all_streams_ice"] = all_streams_ice self.factory.dispatcher.update_statistics(stats) if session.dialog_id is not None and stats["start_time"] is not None and not all_streams_ice: self.factory.dispatcher.opensips_management.end_dialog(session.dialog_id) session.expire_time = time() else: del self.factory.sessions[call_id] return elif first == "ping": if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.transport.write("pong\r\n") return try: command, defer, timer = self.commands.pop(first) except KeyError: log.error("Got unexpected response from relay at %s: %s" % (self.ip, line)) return timer.cancel() if rest == "error": defer.errback(RelayError("Received error from relay at %s in response to `%s' command" % (self.ip, command))) elif rest == "halting": self.halting = True defer.errback(RelayError("Relay at %s is shutting down" % self.ip)) elif command == "remove": try: stats = cjson.decode(rest) except cjson.DecodeError: log.error("Error decoding JSON from relay at %s" % self.ip) else: call_id = stats['call_id'] session = self.factory.sessions[call_id] stats["dialog_id"] = session.dialog_id stats["timed_out"] = False self.factory.dispatcher.update_statistics(stats) del self.factory.sessions[call_id] defer.callback("removed") else: # update command defer.callback(rest) def connectionLost(self, reason): if reason.type == ConnectionDone: log.info('Connection with relay at %s was closed' % self.ip) elif reason.type == ConnectionReplaced: log.warning('Old connection with relay at %s was lost' % self.ip) else: log.error("Connection with relay at %s was lost: %s" % (self.ip, reason.value)) for command, defer, timer in self.commands.itervalues(): timer.cancel() defer.errback(RelayError("Relay at %s disconnected" % self.ip)) if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.factory.connection_lost(self) class RelaySession(object): def __init__(self, relay_ip, command_headers): self.relay_ip = relay_ip self.dialog_id = command_headers.get('dialog_id') self.expire_time = None class RelayFactory(Factory): noisy = False protocol = RelayServerProtocol def __init__(self, dispatcher): self.dispatcher = dispatcher self.relays = {} self.shutting_down = False - state_file = process.runtime_file("dispatcher_state") + state_file = process.runtime.file('dispatcher_state') try: self.sessions = pickle.load(open(state_file)) except: self.sessions = {} self.cleanup_timers = {} else: self.cleanup_timers = dict((ip, reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, ip)) for ip in set(session.relay_ip for session in self.sessions.itervalues())) unlink(state_file) self.expired_cleaner = RecurrentCall(600, self._remove_expired_sessions) def _remove_expired_sessions(self): now, limit = time(), DispatcherConfig.cleanup_expired_sessions_after obsolete = [k for k, s in ifilter(lambda (k, s): s.expire_time and (now-s.expire_time>=limit), self.sessions.iteritems())] if obsolete: [self.sessions.pop(call_id) for call_id in obsolete] log.warning('found %d expired sessions which were not removed during the last %d hours' % (len(obsolete), round(limit / 3600.0))) return KeepRunning def buildProtocol(self, addr): ip = addr.host log.debug('Connection from relay at %s' % ip) prot = Factory.buildProtocol(self, addr) prot.ip = ip return prot def new_relay(self, relay): old_relay = self.relays.pop(relay.ip, None) if old_relay is not None: log.warning('Relay at %s reconnected, closing old connection' % relay.ip) reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced("relay reconnected"))) self.relays[relay.ip] = relay timer = self.cleanup_timers.pop(relay.ip, None) if timer is not None: timer.cancel() defer = relay.send_command('sessions', []) defer.addCallback(self._cb_purge_sessions, relay.ip) def _cb_purge_sessions(self, result, relay_ip): relay_sessions = cjson.decode(result) relay_call_ids = [session['call_id'] for session in relay_sessions] for session_id, session in self.sessions.items(): if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids: log.warning('Session %s is no longer on relay %s, statistics are probably lost' % (session_id, relay_ip)) if session.dialog_id is not None: self.dispatcher.opensips_management.end_dialog(session.dialog_id) del self.sessions[session_id] def send_command(self, command, headers): try: parsed_headers = dict(header.split(': ', 1) for header in headers) except Exception: raise RelayError('Could not parse headers from OpenSIPs') try: call_id = parsed_headers['call_id'] except KeyError: raise RelayError('Missing call_id header') session = self.sessions.get(call_id, None) if session and session.expire_time is None: relay = session.relay_ip if relay not in self.relays: raise RelayError('Relay for this session (%s) is no longer connected' % relay) return self.relays[relay].send_command(command, headers) # We do not have a session for this call_id or the session is already expired if command == 'update': preferred_relay = parsed_headers.get("media_relay") try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay) random.shuffle(try_relays) if preferred_relay is not None: protocol = self.relays.get(preferred_relay) if protocol is not None and protocol.active: try_relays.appendleft(protocol) else: log.warning('user requested media_relay %s is not available' % preferred_relay) defer = self._try_next(try_relays, command, headers) defer.addCallback(self._add_session, try_relays, call_id, parsed_headers) return defer elif command == 'remove' and session: # This is the remove we received for an expired session for which we triggered dialog termination del self.sessions[call_id] return 'removed' else: raise RelayError("Got `%s' command from OpenSIPS for unknown session with call-id `%s'" % (command, call_id)) def _add_session(self, result, try_relays, call_id, parsed_headers): self.sessions[call_id] = RelaySession(try_relays[0].ip, parsed_headers) return result def _relay_error(self, failure, try_relays, command, headers): failure.trap(RelayError) failed_relay = try_relays.popleft() log.warning('Relay %s failed: %s' % (failed_relay, failure.value)) return self._try_next(try_relays, command, headers) def _try_next(self, try_relays, command, headers): if len(try_relays) == 0: raise RelayError('No suitable relay found') defer = try_relays[0].send_command(command, headers) defer.addErrback(self._relay_error, try_relays, command, headers) return defer def get_summary(self): defer = DeferredList([relay.send_command('summary', []).addErrback(self._summary_error, ip) for ip, relay in self.relays.iteritems()]) defer.addCallback(self._got_summaries) return defer def _summary_error(self, failure, ip): log.error('Error processing query at relay %s: %s' % (ip, failure.value)) return cjson.encode(dict(status="error", ip=ip)) def _got_summaries(self, results): return '[%s]' % ', '.join(result for succeeded, result in results if succeeded) def get_statistics(self): defer = DeferredList([relay.send_command('sessions', []) for relay in self.relays.itervalues()]) defer.addCallback(self._got_statistics) return defer def _got_statistics(self, results): return '[%s]' % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result != '[]') def connection_lost(self, relay): if relay not in self.relays.itervalues(): return if relay.authenticated: del self.relays[relay.ip] if self.shutting_down: if len(self.relays) == 0: self.defer.callback(None) else: self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip) def _do_cleanup(self, ip): log.debug('Doing cleanup for old relay %s' % ip) del self.cleanup_timers[ip] for call_id in [call_id for call_id, session in self.sessions.items() if session.relay_ip == ip]: del self.sessions[call_id] def shutdown(self): if self.shutting_down: return self.shutting_down = True for timer in self.cleanup_timers.itervalues(): timer.cancel() if len(self.relays) == 0: retval = succeed(None) else: for prot in self.relays.itervalues(): prot.transport.loseConnection() self.defer = Deferred() retval = self.defer retval.addCallback(self._save_state) return retval def _save_state(self, result): - pickle.dump(self.sessions, open(process.runtime_file('dispatcher_state'), 'w')) + pickle.dump(self.sessions, open(process.runtime.file('dispatcher_state'), 'w')) class Dispatcher(object): def __init__(self): self.accounting = [__import__('mediaproxy.interfaces.accounting.%s' % mod.lower(), globals(), locals(), ['']).Accounting() for mod in set(DispatcherConfig.accounting)] self.cred = X509Credentials(cert_name='dispatcher') self.tls_context = TLSContext(self.cred) self.relay_factory = RelayFactory(self) dispatcher_addr, dispatcher_port = DispatcherConfig.listen self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr) self.opensips_factory = OpenSIPSControlFactory(self) - socket_path = process.runtime_file(DispatcherConfig.socket_path) + socket_path = process.runtime.file(DispatcherConfig.socket_path) unlink(socket_path) self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory) self.opensips_management = opensips.ManagementInterface() self.management_factory = ManagementControlFactory(self) management_addr, management_port = DispatcherConfig.listen_management if DispatcherConfig.management_use_tls: self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.tls_context, interface=management_addr) else: self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr) def run(self): log.debug('Running with the twisted {0.__class__.__name__}'.format(reactor)) process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) for accounting_module in self.accounting: accounting_module.start() reactor.run(installSignalHandlers=False) def send_command(self, command, headers): return maybeDeferred(self.relay_factory.send_command, command, headers) def update_statistics(self, stats): log.debug('Got statistics: %s' % stats) if stats['start_time'] is not None: for accounting in self.accounting: try: accounting.do_accounting(stats) except Exception, e: log.exception('An unhandled error occurred while doing accounting: %s' % e) def _handle_SIGHUP(self, *args): log.info('Received SIGHUP, shutting down.') reactor.callFromThread(self._shutdown) def _handle_SIGINT(self, *args): if process.daemon: log.info('Received SIGINT, shutting down.') else: log.info('Received KeyboardInterrupt, exiting.') reactor.callFromThread(self._shutdown) def _handle_SIGTERM(self, *args): log.info('Received SIGTERM, shutting down.') reactor.callFromThread(self._shutdown) def _shutdown(self): defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None]) defer.addCallback(lambda x: self.opensips_factory.shutdown()) defer.addCallback(lambda x: self.management_factory.shutdown()) defer.addCallback(lambda x: self.relay_factory.shutdown()) defer.addCallback(lambda x: self._stop()) def _stop(self): for act in self.accounting: act.stop() reactor.stop() diff --git a/mediaproxy/interfaces/accounting/radius.py b/mediaproxy/interfaces/accounting/radius.py index 9ddcd92..687e9e8 100644 --- a/mediaproxy/interfaces/accounting/radius.py +++ b/mediaproxy/interfaces/accounting/radius.py @@ -1,124 +1,124 @@ """Implementation of RADIUS accounting""" from application import log from application.process import process from application.python.queue import EventQueue import pyrad.client import pyrad.dictionary from mediaproxy.configuration import RadiusConfig try: from pyrad.dictfile import DictFile except ImportError: # helper class to make pyrad support the $INCLUDE statement in dictionary files class RadiusDictionaryFile(object): def __init__(self, base_file_name): self.file_names = [base_file_name] self.fd_stack = [open(base_file_name)] def readlines(self): while True: line = self.fd_stack[-1].readline() if line: if line.startswith('$INCLUDE'): file_name = line.rstrip('\n').split(None, 1)[1] if file_name not in self.file_names: self.file_names.append(file_name) self.fd_stack.append(open(file_name)) continue else: yield line else: self.fd_stack.pop() if len(self.fd_stack) == 0: return else: del DictFile class RadiusDictionaryFile(str): pass class Accounting(object): def __init__(self): self.handler = RadiusAccounting() def start(self): self.handler.start() def do_accounting(self, stats): self.handler.put(stats) def stop(self): self.handler.stop() self.handler.join() class RadiusAccounting(EventQueue, pyrad.client.Client): def __init__(self): - main_config_file = process.config_file(RadiusConfig.config_file) + main_config_file = process.configuration.file(RadiusConfig.config_file) if main_config_file is None: raise RuntimeError('Cannot find the radius configuration file: %r' % RadiusConfig.config_file) try: config = dict(line.rstrip('\n').split(None, 1) for line in open(main_config_file) if len(line.split(None, 1)) == 2 and not line.startswith('#')) secrets = dict(line.rstrip('\n').split(None, 1) for line in open(config['servers']) if len(line.split(None, 1)) == 2 and not line.startswith('#')) server = config['acctserver'] try: server, acctport = server.split(':') acctport = int(acctport) except ValueError: acctport = 1813 secret = secrets[server] dicts = [RadiusDictionaryFile(config['dictionary'])] if RadiusConfig.additional_dictionary: - additional_dictionary = process.config_file(RadiusConfig.additional_dictionary) + additional_dictionary = process.configuration.file(RadiusConfig.additional_dictionary) if additional_dictionary: dicts.append(RadiusDictionaryFile(additional_dictionary)) else: log.warning('Could not load additional RADIUS dictionary file: %r' % RadiusConfig.additional_dictionary) raddict = pyrad.dictionary.Dictionary(*dicts) timeout = int(config['radius_timeout']) retries = int(config['radius_retries']) except Exception: log.critical('cannot read the RADIUS configuration file') raise pyrad.client.Client.__init__(self, server, 1812, acctport, secret, raddict) self.timeout = timeout self.retries = retries if 'bindaddr' in config and config['bindaddr'] != '*': self.bind((config['bindaddr'], 0)) EventQueue.__init__(self, self.do_accounting) def do_accounting(self, stats): attrs = {} attrs['Acct-Status-Type'] = 'Update' attrs['User-Name'] = 'mediaproxy@default' attrs['Acct-Session-Id'] = stats['call_id'] attrs['Acct-Session-Time'] = stats['duration'] attrs['Acct-Input-Octets'] = sum(stream_stats['caller_bytes'] for stream_stats in stats['streams']) attrs['Acct-Output-Octets'] = sum(stream_stats['callee_bytes'] for stream_stats in stats['streams']) attrs['Sip-From-Tag'] = stats['from_tag'] attrs['Sip-To-Tag'] = stats['to_tag'] or '' attrs['NAS-IP-Address'] = stats['streams'][0]['caller_local'].split(':')[0] attrs['Sip-User-Agents'] = (stats['caller_ua'] + '+' + stats['callee_ua'])[:253] attrs['Sip-Applications'] = ', '.join(sorted(set(stream['media_type'] for stream in stats['streams'] if stream['start_time'] != stream['end_time'])))[:253] attrs['Media-Codecs'] = ', '.join(stream['caller_codec'] for stream in stats['streams'])[:253] if stats['timed_out'] and not stats.get('all_streams_ice', False): attrs['Media-Info'] = 'timeout' elif stats.get('all_streams_ice', False): attrs['Media-Info'] = 'ICE session' else: attrs['Media-Info'] = '' for stream in stats['streams']: if stream['post_dial_delay'] is not None: attrs['Acct-Delay-Time'] = int(stream['post_dial_delay']) break try: self.SendPacket(self.CreateAcctPacket(**attrs)) except Exception, e: log.error('Failed to send radius accounting record: %s' % e) diff --git a/mediaproxy/interfaces/opensips.py b/mediaproxy/interfaces/opensips.py index 8b8cc66..ce2f3cf 100644 --- a/mediaproxy/interfaces/opensips.py +++ b/mediaproxy/interfaces/opensips.py @@ -1,240 +1,240 @@ import json import socket import urlparse from abc import ABCMeta, abstractmethod, abstractproperty from application import log from application.python.types import Singleton from application.process import process from application.system import unlink from random import getrandbits from twisted.internet import reactor, defer from twisted.internet.protocol import DatagramProtocol from twisted.python.failure import Failure from mediaproxy.configuration import OpenSIPSConfig class Error(Exception): pass class TimeoutError(Error): pass class OpenSIPSError(Error): pass class NegativeReplyError(OpenSIPSError): def __init__(self, code, message): super(NegativeReplyError, self).__init__(code, message) self.code = code self.message = message def __repr__(self): return '{0.__class__.__name__}({0.code!r}, {0.message!r})'.format(self) def __str__(self): return '[{0.code}] {0.message}'.format(self) class Request(object): __metaclass__ = ABCMeta method = abstractproperty() @abstractmethod def __init__(self, *args): self.id = '{:x}'.format(getrandbits(32)) self.args = list(args) self.deferred = defer.Deferred() @property def __data__(self): return dict(jsonrpc='2.0', id=self.id, method=self.method, params=self.args) @abstractmethod def process_response(self, response): raise NotImplementedError # noinspection PyAbstractClass class BooleanRequest(Request): """A request that returns True if successful, False otherwise""" def process_response(self, response): return not isinstance(response, Failure) class AddressReload(BooleanRequest): method = 'address_reload' def __init__(self): super(AddressReload, self).__init__() class DomainReload(BooleanRequest): method = 'domain_reload' def __init__(self): super(DomainReload, self).__init__() class EndDialog(BooleanRequest): method = 'dlg_end_dlg' def __init__(self, dialog_id): super(EndDialog, self).__init__(dialog_id) class RefreshWatchers(BooleanRequest): method = 'refresh_watchers' def __init__(self, account, refresh_type): super(RefreshWatchers, self).__init__('sip:{}'.format(account), 'presence', refresh_type) class UpdateSubscriptions(BooleanRequest): method = 'rls_update_subscriptions' def __init__(self, account): super(UpdateSubscriptions, self).__init__('sip:{}'.format(account)) class GetOnlineDevices(Request): method = 'ul_show_contact' def __init__(self, account): super(GetOnlineDevices, self).__init__(OpenSIPSConfig.location_table, account) def process_response(self, response): if isinstance(response, Failure): if response.type is NegativeReplyError and response.value.code == 404: return [] return response return [ContactData(contact) for contact in response[u'Contacts']] class ContactData(dict): __fields__ = {u'contact', u'expires', u'received', u'user_agent'} def __init__(self, data): super(ContactData, self).__init__({key: value for key, value in ((key.lower().replace(u'-', u'_'), value) for key, value in data.iteritems()) if key in self.__fields__}) self.setdefault(u'user_agent', None) if u'received' in self: parsed_received = urlparse.parse_qs(self[u'received']) if u'target' in parsed_received: self[u'NAT_contact'] = parsed_received[u'target'][0] else: self[u'NAT_contact'] = self[u'received'] del self[u'received'] else: self[u'NAT_contact'] = self[u'contact'] class UNIXSocketProtocol(DatagramProtocol): noisy = False def datagramReceived(self, data, address): log.debug('Got MI response: {}'.format(data)) try: response = json.loads(data) except ValueError: code, _, message = data.partition(' ') try: code = int(code) except ValueError: log.error('MI response from OpenSIPS cannot be parsed (neither JSON nor status reply)') return # we got one of the 'code message' type of replies. This means either parsing error or internal error in OpenSIPS. # if we only have one request pending, we can associate the response with it, otherwise is impossible to tell to # which request the response corresponds. The failed request will fail with timeout later. if len(self.transport.requests) == 1: _, request = self.transport.requests.popitem() request.deferred.errback(Failure(NegativeReplyError(code, message))) log.error('MI request {.method} failed with: {} {}'.format(request, code, message)) else: log.error('Got MI status reply from OpenSIPS that cannot be associated with a request: {!r}'.format(data)) else: try: request_id = response['id'] except KeyError: log.error('MI JSON response from OpenSIPS lacks id field') return if request_id not in self.transport.requests: log.error('MI JSON response from OpenSIPS has unknown id: {!r}'.format(request_id)) return request = self.transport.requests.pop(request_id) if 'result' in response: request.deferred.callback(response['result']) elif 'error' in response: log.error('MI request {0.method} failed with: {1[error][code]} {1[error][message]}'.format(request, response)) request.deferred.errback(Failure(NegativeReplyError(response['error']['code'], response['error']['message']))) else: log.error('Invalid MI JSON response from OpenSIPS') request.deferred.errback(Failure(OpenSIPSError('Invalid MI JSON response from OpenSIPS'))) class UNIXSocketConnection(object): timeout = 3 def __init__(self): - socket_path = process.runtime_file('opensips.sock') + socket_path = process.runtime.file('opensips.sock') unlink(socket_path) self.path = socket_path self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol()) self.transport.requests = {} reactor.addSystemEventTrigger('during', 'shutdown', self.close) def close(self): for request in self.transport.requests.values(): if not request.deferred.called: request.deferred.errback(Error('shutting down')) self.transport.requests.clear() self.transport.stopListening() unlink(self.path) def send(self, request): try: self.transport.write(json.dumps(request.__data__), OpenSIPSConfig.socket_path) except socket.error as e: log.error("cannot write request to %s: %s" % (OpenSIPSConfig.socket_path, e[1])) request.deferred.errback(Failure(Error("Cannot send MI request %s to OpenSIPS" % request.method))) else: self.transport.requests[request.id] = request request.deferred.addBoth(request.process_response) reactor.callLater(self.timeout, self._did_timeout, request) log.debug('Send MI request: {}'.format(request.__data__)) return request.deferred def _did_timeout(self, request): if not request.deferred.called: request.deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) self.transport.requests.pop(request.id) class ManagementInterface(object): __metaclass__ = Singleton def __init__(self): self.connection = UNIXSocketConnection() def reload_domains(self): return self.connection.send(DomainReload()) def reload_addresses(self): return self.connection.send(AddressReload()) def end_dialog(self, dialog_id): return self.connection.send(EndDialog(dialog_id)) def get_online_devices(self, account): return self.connection.send(GetOnlineDevices(account)) def refresh_watchers(self, account, refresh_type): return self.connection.send(RefreshWatchers(account, refresh_type)) def update_subscriptions(self, account): return self.connection.send(UpdateSubscriptions(account)) diff --git a/mediaproxy/tls.py b/mediaproxy/tls.py index b07b4be..f0e7518 100644 --- a/mediaproxy/tls.py +++ b/mediaproxy/tls.py @@ -1,89 +1,89 @@ """TLS support""" __all__ = ['X509Credentials'] import os import stat from application.process import process from gnutls import crypto from gnutls.interfaces import twisted from mediaproxy.configuration import TLSConfig class FileDescriptor(object): def __init__(self, name, type): certs_path = os.path.normpath(TLSConfig.certs_path) self.path = os.path.join(certs_path, name) self.klass = type self.timestamp = 0 self.object = None def get(self): - path = process.config_file(self.path) + path = process.configuration.file(self.path) if path is None: raise RuntimeError('missing or unreadable file: %s' % self.path) mtime = os.stat(path)[stat.ST_MTIME] if self.timestamp < mtime: f = open(path) try: self.object = self.klass(f.read()) self.timestamp = mtime finally: f.close() return self.object class X509Entity(object): type = None def __init__(self, name_attr): self.name_attr = name_attr self.descriptors = {} def __get__(self, obj, type_=None): name = getattr(obj or type_, self.name_attr, None) if name is None: return None descriptor = self.descriptors.setdefault(name, FileDescriptor(name, self.type)) return descriptor.get() def __set__(self, obj, value): raise AttributeError('cannot set attribute') def __delete__(self, obj): raise AttributeError('cannot delete attribute') class X509Certificate(X509Entity): type = crypto.X509Certificate class X509PrivateKey(X509Entity): type = crypto.X509PrivateKey class X509CRL(X509Entity): type = crypto.X509CRL class X509Credentials(twisted.X509Credentials): """SIPThor X509 credentials""" X509cert_name = None # will be defined by each instance X509key_name = None # will be defined by each instance X509ca_name = 'ca.pem' X509crl_name = 'crl.pem' X509cert = X509Certificate(name_attr='X509cert_name') X509key = X509PrivateKey(name_attr='X509key_name') X509ca = X509Certificate(name_attr='X509ca_name') X509crl = X509CRL(name_attr='X509crl_name') def __init__(self, cert_name): self.X509cert_name = '%s.crt' % cert_name self.X509key_name = '%s.key' % cert_name twisted.X509Credentials.__init__(self, self.X509cert, self.X509key, [self.X509ca], [self.X509crl]) self.verify_peer = True self.verify_period = TLSConfig.verify_interval diff --git a/test/common.py b/test/common.py index 58a0607..2330602 100644 --- a/test/common.py +++ b/test/common.py @@ -1,214 +1,216 @@ # Copyright (C) 2008 AG Projects # import sys sys.path.append(".") sys.path.append("..") import os import random import string import struct import mediaproxy -from application.system import host from application.configuration import * from application.process import process -process.system_config_directory = mediaproxy.config_directory - +from application.system import host +from twisted.internet import reactor +from twisted.internet.defer import Deferred, DeferredList, succeed from twisted.internet.protocol import DatagramProtocol, ClientFactory -from twisted.protocols.basic import LineOnlyReceiver from twisted.internet.task import LoopingCall -from twisted.internet.defer import Deferred, DeferredList, succeed -from twisted.internet import reactor +from twisted.protocols.basic import LineOnlyReceiver from mediaproxy.headers import EncodingDict + +process.configuration.user_directory = None +process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory + + class Config(ConfigSection): __cfgfile__ = mediaproxy.configuration_file __section__ = 'Dispatcher' socket = "/run/mediaproxy/dispatcher.sock" - random_data = os.urandom(512) stun_data = struct.pack("!HHIIII", 0x0001, 0, 0x2112A442, 0, 0, 0) default_host_ip = host.default_ip class OpenSIPSControlClientProtocol(LineOnlyReceiver): def __init__(self): self.defer = None def lineReceived(self, line): if line == "error": print "got error from dispatcher!" reactor.stop() elif self.defer is not None: print "got ip/ports from dispatcher: %s" % line ip, ports = line.split(" ", 1) defer = self.defer self.defer = None defer.callback((ip, [int(i) for i in ports.split()])) else: print "got reply from dispatcher: %s" % line defer = self.defer self.defer = None defer.callback(line) def _send_command(self, command, headers): self.defer = Deferred() data = "\r\n".join([command] + ["%s: %s" % item for item in headers.iteritems()] + ["", ""]) #print "writing on socket:\n%s" % data self.transport.write(data) return self.defer def update(self, **kw_args): return self._send_command("update", EncodingDict(kw_args)) def remove(self, **kw_args): return self._send_command("remove", EncodingDict(kw_args)) class OpenSIPSConnectorFactory(ClientFactory): protocol = OpenSIPSControlClientProtocol def __init__(self): self.defer = Deferred() def buildProtocol(self, addr): prot = ClientFactory.buildProtocol(self, addr) reactor.callLater(0, self.defer.callback, prot) return prot class MediaReceiverProtocol(DatagramProtocol): def __init__(self, endpoint, index): self.endpoint = endpoint self.index = index self.loop = None self.received_media = False self.defer = Deferred() def datagramReceived(self, data, (host, port)): if not self.received_media: self.received_media = True print "received media %d for %s from %s:%d" % (self.index, self.endpoint.name, host, port) self.defer.callback(None) def connectionRefused(self): print "connection refused for media %d for %s" % (self.index, self.endpoint.name) class Endpoint(object): def __init__(self, sip_uri, user_agent, is_caller): if is_caller: self.name = "caller" else: self.name = "callee" self.sip_uri = sip_uri self.user_agent = user_agent self.tag = "".join(random.sample(string.ascii_lowercase, 8)) self.connectors = [] self.media = [] self.cseq = 1 def set_media(self, media): assert(len(self.connectors) == 0) self.media = media for index, (media_type, port, direction, parameters) in enumerate(self.media): if port != 0: protocol = MediaReceiverProtocol(self, index) connector = reactor.listenUDP(port, protocol) else: connector = None self.connectors.append(connector) return DeferredList([connector.protocol.defer for connector in self.connectors if connector is not None]) def get_media(self, use_old_hold): if use_old_hold: ip = "0.0.0.0" else: ip = default_host_ip return [(media_type, ip, port, direction, parameters) for media_type, port, direction, parameters in self.media] def start_media(self, ip, ports, send_stun=False): for port, connector in zip(ports, self.connectors): if connector is not None: protocol = connector.protocol if port != 0: protocol.transport.connect(ip, port) protocol.loop = LoopingCall(protocol.transport.write, send_stun and stun_data or random_data) protocol.loop.start(random.uniform(0.5, 1)) else: protocol.defer.callback(None) def stop_media(self): defers = [] for connector in self.connectors: if connector is not None: if connector.protocol.loop is not None: connector.protocol.loop.stop() connector.protocol.loop = None defer = connector.stopListening() if defer is not None: defers.append(defer) self.connectors = [] if defers: return DeferredList(defers) else: return succeed(None) class Session(object): def __init__(self, caller, callee): self.caller = caller self.callee = callee self.call_id = "".join(random.sample(string.ascii_letters, 24)) def _get_parties(self, party): party = getattr(self, party) if party is self.caller: other = self.callee else: other = self.caller return party, other def do_update(self, opensips, party, type, is_final, use_old_hold=False): party, other = self._get_parties(party) if type == "request": from_tag = party.tag to_tag = other.tag from_uri = party.sip_uri to_uri = other.sip_uri cseq = party.cseq else: from_tag = other.tag to_tag = party.tag from_uri = other.sip_uri to_uri = party.sip_uri cseq = other.cseq if is_final: defer = opensips.update(call_id = self.call_id, from_tag = from_tag, to_tag = to_tag, from_uri = from_uri, to_uri = to_uri, cseq = cseq, user_agent = party.user_agent, media = party.get_media(use_old_hold), type = type, dialog_id = "12345:67890") else: defer = opensips.update(call_id = self.call_id, from_tag = from_tag, to_tag = to_tag, from_uri = from_uri, to_uri = to_uri, cseq = cseq, user_agent = party.user_agent, media = party.get_media(use_old_hold), type = type, dialog_id = "12345:67890") if is_final: if type == "request": party.cseq += 1 else: other.cseq += 1 return defer def do_remove(self, opensips, party): party, other = self._get_parties(party) opensips.remove(call_id = self.call_id, from_tag = party.tag, to_tag = other.tag) def connect_to_dispatcher(): factory = OpenSIPSConnectorFactory() connector = reactor.connectUNIX(Config.socket, factory) return connector, factory.defer