diff --git a/README b/README index 8af63f5..c4445e6 100644 --- a/README +++ b/README @@ -1,325 +1,325 @@ MediaProxy ---------- -Copyright (c) 2008-2012 AG Projects +Copyright (c) 2008-2014 AG Projects http://ag-projects.com Authors: Ruud Klaver, Dan Pascu, Saul Ibarra Home page: http://mediaproxy.ag-projects.com License ------- This software is licensed according to the GNU General Public License version 2. See LICENSE file for more details. For other licensing options please contact sales-request@ag-projects.com Description ----------- MediaProxy is a media relay for RTP/RTCP and UDP streams that works in tandem with OpenSIPS to provide NAT traversal capability for media streams from SIP user agents located behind NAT. When using MediaProxy, NAT traversal for RTP media will work without any settings in the SIP User Agents or the NAT router. Features -------- - Scalability of thousands of calls per server limited only by the Linux kernel networking layer and network interface bandwidth - Supports multiple chained relays as long as each has a public IP - TLS encryption between the relays and dispatcher - T.38 fax support - Graceful shutdown capability - Automatic load balancing and redundancy among all media relays - Real-time sessions statistics - Configurable IP and UDP port range - Support for any combination of audio and video streams - Ability to use OpenSIPS' MI interface to close a call that did timeout - Radius accounting of IP network traffic - Database accounting of complete media information including all streams, their type, codecs and duration. - Supports ICE negotiation by behaving like a TURN relay candiate Background ----------- MediaProxy 2.0 is the second generation media relay application which is based on a completely new design that allows for major improvements in areas such as scalability (an order of magnitude more scalable than previous version) and security (communication between relay and dispatcher is encrypted). New features have been added to support call flows related to user mobility and fax transmission. Architecture ------------ MediaProxy consists of a dispatcher and one or more media relays. The dispatcher component always runs on the same host as OpenSIPS and communicates with its mediaproxy module through a UNIX domain socket. The relay(s) connect to the dispatcher using TLS. This relay component may be on the same or on a different host as OpenSIPS. There may be several relays for the dispatcher to choose from and a relay may service more than one dispatcher. When OpenSIPS requests that a call be relayed, the dispatcher will forward this request to one of its connected relays, along with some data from the SDP. The relay will allocate a set of UDP ports for this session, depending on the number of proposed streams. It will inform the dispatcher which ports it has allocated so that it may in turn notify the mediaproxy module of OpenSIPS, which will replace the relevant parts of the SDP. The same is done for any SIP messages from the callee, thus all the media streams will be sent through the relay. When the session between caller and callee has finished, either through a SIP BYE or because the media is no longer flowing and has timed out, the relay will send session information to the dispatcher, which can store this information using one or more accounting modules. The session information may also be queried using a management interface on the dispatcher. All of this is illustrated in the following diagram: +---+ +---+ | | +---------------------+ | | | | | SIP Proxy | | | | | | +----------+ | SIP | | | |<--+->| OpenSIPS |<------+------------------->| | | | | +----------+ | | | | | | ^ | | | | | | | UNIX socket | | | | C | | v | | C | | A | | +------------+ | +------------+ | A | | L | | | Dispatcher |<-----+-->| Management | | L | | L | | +------------+ TCP | | client | | L | | E | | ^ /TLS | +------------+ | E | | R | | | | | E | | | +---------+-----------+ | | | | | | | | | | TLS | | | | v | | | | +-------------+ UDP | | | |<---->| Relay |<----------------------->| | | | +-------------+ RTP / RTCP | | +---+ +---+ Please note that the accounting modules are not shown. Compatibility and pre-requisites -------------------------------- Both OpenSIPS and MediaProxy must use a public IP address. To run the software, you will need a server running the Linux Operating System using a kernel version 2.6.18 or higher that has been compiled with connection tracking support (conntrack). IPtables 1.4.3 or higher is also required. Because of this dependency on Linux, other operating systems are not supported. This dependency only applies to the media relay component. The dispatcher component which runs on the same host as OpenSIPS, can run on any platform that has a python interpreter and supports the twisted framework. Communication between the dispatcher and the relays uses TLS encryption and requires a set of X509 certificates to work. For more information about this please read tls/README which contains information about the sample certificates that are included as well as information about how to generate your own. MediaProxy is meant to be used together with OpenSIPS' mediaproxy module. This version of MediaProxy (2.0 or higher) cannot be used in combination with any version of OpenSIPS older than 1.4 or any components of MediaProxy older than 2.0. You must completely upgrade any previous installation of OpenSER to OpenSIPS to use this version of MediaProxy. No STUN or TURN support are required in the clients. The SIP User Agents must work symmetrically (that is to send and receive data on the same port for each stream), which is documented in RFC 4961. To display the history of the media streams CDRTool 6.5 or higher is required. Some features that were present in the previous version have been removed: - Support for specifying media relays per domain has been discontinued - Support for DNS records has been discontinued - Support for asymmetric clients has been discontinued - Support for other operating systems than Linux has been discontinued (only for the media relay, as the dispatcher has no such limitation) For information of how to install MediaProxy, please consult the INSTALL file. Operation --------- Before the relay is run, please make sure that /proc/sys/net/ipv4/ip_forward is set to "1". Also for newer kernels ACCT on connection tracking needs to be enabled. Therefore /proc/sys/net/netfilter/nf_conntrack_acct must be set to "1". Both the dispatcher and the relay should be executed with root privileges. With no arguments, both applications will automatically fork into the background and log to syslog. They can remain in the foreground and log to console when given the --no-fork argument. The relay can be shut down in two ways. When receiving either an INT or TERM signal, the relay will terminate all of its sessions immediately and inform the dispatcher that those sessions have expired. When given the HUP signal, it will not accept any new sessions from the dispatcher and wait for all of the running sessions to expire, thus terminating gracefully. At the very least a set of TLS credentials is required. Sample certificates for this are included in the tls/ subdirectory. DO NOT USE THESE IN A PRODUCTION ENVIRONMENT, but only for testing purposes. For more information about TLS certificates and how to generate your own, check the tls/README file. Accounting ---------- MediaProxy is capable to do additional per call accounting with information related to the media streams used by the call. MediaProxy has a modular interface to the accounting system, allowing for new modules to be easily implemented. Currently it supports database and radius backends. Multiple backends can be configured and used simultaneously. Radius accounting ----------------- The radius backend logs very basic information about the media streams. The limited nature of the logged information is mainly given by the limitations imposed by the radius protocol to the data size. The information sent in the radius packet is shown below: Acct-Status-Type = "Update" User-Name = "mediaproxy@default" Acct-Session-Id = call_id Sip-From-Tag = from_tag Sip-To-Tag = to_tag Acct-Session-Time = call duration Acct-Input-Octets = bytes received from caller Acct-Output-Octets = bytes received from callee NAS-IP-Address = media-relay address Sip-User-Agents = caller + callee user agents Sip-Applications = "Audio", "Video", ... Media-Codecs = codecs used by streams (comma separated) Media-Info = "timeout" or "" Acct-Delay-Time = post dial delay (seconds from INVITE to 1st media packet) Database accounting ------------------- The database backend logs all the information related to the media streams that were created/closed during the whole session. This information is stored as a JSON encoded string in a BLOB column in the database, along with the call_id, from_tag and to_tag columns that can be used to retrieve the media information for a given call. The database table and column names are fully configurable in the database section of the configuration file. The table used to store these records, is automatically created by the media dispatcher on startup, if it's not present. For this to happen, the user that is configured in the dburi option in the database section, must have the CREATE and ALTER rights on the database specified in the same dburi. If this is not possible, then the media dispatcher will log an error indicating why it could not create the table and also output the table definition that can be used by some human operator to manually create the table. However, the recommended way is to grant the CREATE and ALTER privileges to the user in the dburi over the database specified in the same dburi. The database module uses SQLObject to access the database, which means it can work with a lot of databases, by simply changing the scheme in the dburi. Currently the following databases are supported: mysql, postgres, sqlite, firebird, maxdb, mssql and sybase. Closing expired calls --------------------- Starting with version 2.1.0, MediaProxy supports closing calls for which all the media streams did timeout, but for which no BYE was received to close the call in the standard way. This feature will only work, when the OpenSIPS mediaproxy module uses the engage_media_proxy() command to start MediaProxy for a given call. In this case the mediaproxy module uses the dialog module to keep track of the call and can pass the dialog id to the media dispatcher. When a media session is expired because all streams did timeout, but no closing request was received from the proxy, the media dispatcher will use the dialog id that was received from the mediaproxy module, to issue a dlg_end_dlg request into the OpenSIPS' MI interface, instructing OpenSIPS to generate the BYEs for the call, closing it in a clean way and generating the accounting records. To use this, the mi_datagram module must be loaded and configured to use a UNIX filesystem socket which must also be configured into the OpenSIPS section of the MediaProxy configuration as socket_path. This feature is not available when using the use_media_proxy/end_media_session functions in the proxy configuration, because in that case there is no dialog that is tracked by the proxy which could be terminated using dlg_end_dlg. Management interface -------------------- The management interface will accept commands terminated by \r\n. It will return the results of the command, one per line, terminated by an empty line (also \r\n terminated). Currently two commands are supported: sessions : This will have the dispatcher query all of its connected relays for active sessions. For every sessions it finds it will return one line with a JSON encoded dictionary containing session information. summary : This will have the dispatcher present a summary of each of its connected relays. The results are returned as a JSON encoded dictionary, one line per relay. Free support ------------ MediaProxy is developed and supported by AG Projects. AG Projects offers best-effort free support for MediaProxy. "best-effort" means that we try to solve the bugs you report or help fix your problems as soon as we can, subject to available resources. You may report bugs or feature request to: users@lists.opensips.org A mailing list archive is available at: http://lists.opensips.org/cgi-bin/mailman/listinfo/users Commercial support ------------------ Commercial support options are available by purchasing: 1. Multimedia Service Platform: http://ag-projects.com/Products or 2. MediaProxy and support: https://secure.dns-hosting.info/buyer_guide.phtml diff --git a/mediaproxy/__init__.py b/mediaproxy/__init__.py index 463c08b..8e26399 100644 --- a/mediaproxy/__init__.py +++ b/mediaproxy/__init__.py @@ -1,31 +1,31 @@ -# Copyright (C) 2008 AG-Projects. +# Copyright (C) 2008-2014 AG-Projects. # """Mediaproxy implements a media relay for SIP calls""" __version__ = "2.6.0" system_config_directory = '/etc/mediaproxy' runtime_directory = '/var/run/mediaproxy' configuration_filename = 'config.ini' package_requirements = {'python-application': '1.2.8', 'python-gnutls': '1.1.8', 'twisted': '2.5.0'} try: from application.dependency import ApplicationDependencies, DependencyError except ImportError: class DependencyError(Exception): pass class ApplicationDependencies(object): def __init__(self, *args, **kw): pass def check(self): required_version = package_requirements['python-application'] raise DependencyError("need python-application version %s or higher but it's not installed" % required_version) dependencies = ApplicationDependencies(**package_requirements) diff --git a/mediaproxy/dispatcher.py b/mediaproxy/dispatcher.py index 4282a5e..fcf8fe9 100644 --- a/mediaproxy/dispatcher.py +++ b/mediaproxy/dispatcher.py @@ -1,547 +1,546 @@ -# Copyright (C) 2008 AG Projects -# Author: Ruud Klaver +# Copyright (C) 2008-2014 AG Projects # """Implementation of the MediaProxy dispatcher""" import random import signal import cPickle as pickle import cjson from collections import deque from itertools import ifilter from time import time from application import log from application.process import process from application.system import unlink from gnutls.errors import CertificateSecurityError from twisted.protocols.basic import LineOnlyReceiver from twisted.python import failure from twisted.internet.error import ConnectionDone, TCPTimedOutError from twisted.internet.protocol import Factory from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed from twisted.internet import reactor from mediaproxy import __version__ from mediaproxy.configuration import DispatcherConfig from mediaproxy.interfaces import opensips from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials log.msg("Twisted is using %s" % reactor.__module__.rsplit('.', 1)[-1]) class ControlProtocol(LineOnlyReceiver): noisy = False def __init__(self): self.in_progress = 0 def lineReceived(self, line): raise NotImplementedError() def connectionLost(self, reason): log.debug("Connection to %s lost: %s" % (self.description, reason.value)) self.factory.connection_lost(self) def reply(self, reply): self.transport.write(reply + "\r\n") def _relay_error(self, failure): failure.trap(RelayError) log.error(failure.value) self.transport.write("error\r\n") def _catch_all(self, failure): log.error(failure.getBriefTraceback()) self.transport.write("error\r\n") def _decrement(self, result): self.in_progress = 0 if self.factory.shutting_down: self.transport.loseConnection() def _add_callbacks(self, defer): defer.addCallback(self.reply) defer.addErrback(self._relay_error) defer.addErrback(self._catch_all) defer.addBoth(self._decrement) class OpenSIPSControlProtocol(ControlProtocol): description = "OpenSIPS" def __init__(self): self.line_buf = [] ControlProtocol.__init__(self) def lineReceived(self, line): if line == "": if self.line_buf: self.in_progress += 1 defer = self.factory.dispatcher.send_command(self.line_buf[0], self.line_buf[1:]) self._add_callbacks(defer) self.line_buf = [] elif not line.endswith(": "): self.line_buf.append(line) class ManagementControlProtocol(ControlProtocol): description = "Management interface client" def connectionMade(self): if DispatcherConfig.management_use_tls and DispatcherConfig.management_passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.management_passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return def lineReceived(self, line): if line in ["quit", "exit"]: self.transport.loseConnection() elif line == "summary": defer = self.factory.dispatcher.relay_factory.get_summary() self._add_callbacks(defer) elif line == "sessions": defer = self.factory.dispatcher.relay_factory.get_statistics() self._add_callbacks(defer) elif line == "version": self.reply(__version__) else: log.error("Unknown command on management interface: %s" % line) self.reply("error") class ControlFactory(Factory): noisy = False def __init__(self, dispatcher): self.dispatcher = dispatcher self.protocols = [] self.shutting_down = False def buildProtocol(self, addr): prot = Factory.buildProtocol(self, addr) self.protocols.append(prot) return prot def connection_lost(self, prot): self.protocols.remove(prot) if self.shutting_down and len(self.protocols) == 0: self.defer.callback(None) def shutdown(self): if self.shutting_down: return self.shutting_down = True if len(self.protocols) == 0: return succeed(None) else: for prot in self.protocols: if prot.in_progress == 0: prot.transport.loseConnection() self.defer = Deferred() return self.defer class OpenSIPSControlFactory(ControlFactory): protocol = OpenSIPSControlProtocol class ManagementControlFactory(ControlFactory): protocol = ManagementControlProtocol class RelayError(Exception): pass class ConnectionReplaced(ConnectionDone): pass class RelayServerProtocol(LineOnlyReceiver): noisy = False MAX_LENGTH = 4096*1024 ## (4MB) def __init__(self): self.commands = {} self.halting = False self.timedout = False self.disconnect_timer = None self.sequence_number = 0 self.authenticated = False @property def active(self): return not self.halting and not self.timedout def send_command(self, command, headers): log.debug('Issuing "%s" command to relay at %s' % (command, self.ip)) seq = str(self.sequence_number) self.sequence_number += 1 defer = Deferred() timer = reactor.callLater(DispatcherConfig.relay_timeout, self._timeout, seq, defer) self.commands[seq] = (command, defer, timer) self.transport.write("\r\n".join([" ".join([command, seq])] + headers + ["", ""])) return defer def _timeout(self, seq, defer): del self.commands[seq] defer.errback(RelayError("Relay at %s timed out" % self.ip)) if self.timedout is False: self.timedout = True self.disconnect_timer = reactor.callLater(DispatcherConfig.relay_recover_interval, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) def connectionMade(self): if DispatcherConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return self.authenticated = True self.factory.new_relay(self) def lineReceived(self, line): try: first, rest = line.split(" ", 1) except ValueError: first = line rest = "" if first == "expired": try: stats = cjson.decode(rest) except cjson.DecodeError: log.error("Error decoding JSON from relay at %s" % self.ip) else: call_id = stats['call_id'] session = self.factory.sessions.get(call_id, None) if session is None: log.error("Unknown session with call_id %s expired at relay %s" % (call_id, self.ip)) return if session.relay_ip != self.ip: log.error("session with call_id %s expired at relay %s, but is actually at relay %s, ignoring" % (call_id, self.ip, session.relay_ip)) return log.msg("session with call_id %s from relay %s did timeout" % (call_id, session.relay_ip)) stats["dialog_id"] = session.dialog_id stats["timed_out"] = True all_streams_ice = all(stream_info["status"] == "unselected ICE candidate" for stream_info in stats["streams"]) stats["all_streams_ice"] = all_streams_ice self.factory.dispatcher.update_statistics(stats) if session.dialog_id is not None and stats["start_time"] is not None and not all_streams_ice: self.factory.dispatcher.opensips_management.end_dialog(session.dialog_id) session.expire_time = time() else: del self.factory.sessions[call_id] return elif first == "ping": if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.transport.write("pong\r\n") return try: command, defer, timer = self.commands.pop(first) except KeyError: log.error("Got unexpected response from relay at %s: %s" % (self.ip, line)) return timer.cancel() if rest == "error": defer.errback(RelayError("Received error from relay at %s in response to `%s' command" % (self.ip, command))) elif rest == "halting": self.halting = True defer.errback(RelayError("Relay at %s is shutting down" % self.ip)) elif command == "remove": try: stats = cjson.decode(rest) except cjson.DecodeError: log.error("Error decoding JSON from relay at %s" % self.ip) else: call_id = stats['call_id'] session = self.factory.sessions[call_id] stats["dialog_id"] = session.dialog_id stats["timed_out"] = False self.factory.dispatcher.update_statistics(stats) del self.factory.sessions[call_id] defer.callback("removed") else: # update command defer.callback(rest) def connectionLost(self, reason): if reason.type == ConnectionDone: log.msg("Connection with relay at %s was closed" % self.ip) elif reason.type == ConnectionReplaced: log.warn("Old connection with relay at %s was lost" % self.ip) else: log.error("Connection with relay at %s was lost: %s" % (self.ip, reason.value)) for command, defer, timer in self.commands.itervalues(): timer.cancel() defer.errback(RelayError("Relay at %s disconnected" % self.ip)) if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.factory.connection_lost(self) class DialogID(str): def __new__(cls, did): if did is None: return None try: h_entry, h_id = did.split(':') except: log.error("invalid dialog_id value: `%s'" % did) return None instance = str.__new__(cls, did) instance.h_entry = h_entry instance.h_id = h_id return instance class RelaySession(object): def __init__(self, relay_ip, command_headers): self.relay_ip = relay_ip self.dialog_id = DialogID(command_headers.get('dialog_id')) self.expire_time = None class RelayFactory(Factory): noisy = False protocol = RelayServerProtocol def __init__(self, dispatcher): self.dispatcher = dispatcher self.relays = {} self.shutting_down = False state_file = process.runtime_file("dispatcher_state") try: self.sessions = pickle.load(open(state_file)) except: self.sessions = {} self.cleanup_timers = {} else: self.cleanup_timers = dict((ip, reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, ip)) for ip in set(session.relay_ip for session in self.sessions.itervalues())) unlink(state_file) self.expired_cleaner = RecurrentCall(600, self._remove_expired_sessions) def _remove_expired_sessions(self): now, limit = time(), DispatcherConfig.cleanup_expired_sessions_after obsolete = [k for k, s in ifilter(lambda (k, s): s.expire_time and (now-s.expire_time>=limit), self.sessions.iteritems())] if obsolete: [self.sessions.pop(call_id) for call_id in obsolete] log.warn("found %d expired sessions which were not removed during the last %d hours" % (len(obsolete), round(limit/3600.0))) return KeepRunning def buildProtocol(self, addr): ip = addr.host log.debug("Connection from relay at %s" % ip) prot = Factory.buildProtocol(self, addr) prot.ip = ip return prot def new_relay(self, relay): old_relay = self.relays.pop(relay.ip, None) if old_relay is not None: log.warn("Relay at %s reconnected, closing old connection" % relay.ip) reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced("relay reconnected"))) self.relays[relay.ip] = relay timer = self.cleanup_timers.pop(relay.ip, None) if timer is not None: timer.cancel() defer = relay.send_command("sessions", []) defer.addCallback(self._cb_purge_sessions, relay.ip) def _cb_purge_sessions(self, result, relay_ip): relay_sessions = cjson.decode(result) relay_call_ids = [session["call_id"] for session in relay_sessions] for session_id, session in self.sessions.items(): if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids: log.warn("Session %s is no longer on relay %s, statistics are probably lost" % (session_id, relay_ip)) if session.dialog_id is not None: self.dispatcher.opensips_management.end_dialog(session.dialog_id) del self.sessions[session_id] def send_command(self, command, headers): try: parsed_headers = dict(header.split(": ", 1) for header in headers) except: raise RelayError("Could not parse headers from OpenSIPs") try: call_id = parsed_headers["call_id"] except KeyError: raise RelayError("Missing call_id header") session = self.sessions.get(call_id, None) if session and session.expire_time is None: relay = session.relay_ip if relay not in self.relays: raise RelayError("Relay for this session (%s) is no longer connected" % relay) return self.relays[relay].send_command(command, headers) ## We do not have a session for this call_id or the session is already expired if command == "update": preferred_relay = parsed_headers.get("media_relay") try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay) random.shuffle(try_relays) if preferred_relay is not None: protocol = self.relays.get(preferred_relay) if protocol is not None and protocol.active: try_relays.appendleft(protocol) else: log.warn("user requested media_relay %s is not available" % preferred_relay) defer = self._try_next(try_relays, command, headers) defer.addCallback(self._add_session, try_relays, call_id, parsed_headers) return defer elif command == 'remove' and session: ## This is the remove we received for an expired session for which we triggered dialog termination del self.sessions[call_id] return 'removed' else: raise RelayError("Got `%s' command from OpenSIPS for unknown session with call-id `%s'" % (command, call_id)) def _add_session(self, result, try_relays, call_id, parsed_headers): self.sessions[call_id] = RelaySession(try_relays[0].ip, parsed_headers) return result def _relay_error(self, failure, try_relays, command, headers): failure.trap(RelayError) failed_relay = try_relays.popleft() log.warn("Relay %s failed: %s" % (failed_relay, failure.value)) return self._try_next(try_relays, command, headers) def _try_next(self, try_relays, command, headers): if len(try_relays) == 0: raise RelayError("No suitable relay found") defer = try_relays[0].send_command(command, headers) defer.addErrback(self._relay_error, try_relays, command, headers) return defer def get_summary(self): defer = DeferredList([relay.send_command("summary", []).addErrback(self._summary_error, ip) for ip, relay in self.relays.iteritems()]) defer.addCallback(self._got_summaries) return defer def _summary_error(self, failure, ip): log.error("Error processing query at relay %s: %s" % (ip, failure.value)) return cjson.encode(dict(status="error", ip=ip)) def _got_summaries(self, results): return "[%s]" % ', '.join(result for succeeded, result in results if succeeded) def get_statistics(self): defer = DeferredList([relay.send_command("sessions", []) for relay in self.relays.itervalues()]) defer.addCallback(self._got_statistics) return defer def _got_statistics(self, results): return "[%s]" % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result!='[]') def connection_lost(self, relay): if relay not in self.relays.itervalues(): return if relay.authenticated: del self.relays[relay.ip] if self.shutting_down: if len(self.relays) == 0: self.defer.callback(None) else: self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip) def _do_cleanup(self, ip): log.debug("Doing cleanup for old relay %s" % ip) del self.cleanup_timers[ip] for call_id in [call_id for call_id, session in self.sessions.items() if session.relay_ip == ip]: del self.sessions[call_id] def shutdown(self): if self.shutting_down: return self.shutting_down = True for timer in self.cleanup_timers.itervalues(): timer.cancel() if len(self.relays) == 0: retval = succeed(None) else: for prot in self.relays.itervalues(): prot.transport.loseConnection() self.defer = Deferred() retval = self.defer retval.addCallback(self._save_state) return retval def _save_state(self, result): pickle.dump(self.sessions, open(process.runtime_file("dispatcher_state"), "w")) class Dispatcher(object): def __init__(self): self.accounting = [__import__("mediaproxy.interfaces.accounting.%s" % mod.lower(), globals(), locals(), [""]).Accounting() for mod in set(DispatcherConfig.accounting)] self.cred = X509Credentials(cert_name='dispatcher') self.relay_factory = RelayFactory(self) dispatcher_addr, dispatcher_port = DispatcherConfig.listen self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.cred, interface=dispatcher_addr) self.opensips_factory = OpenSIPSControlFactory(self) socket_path = process.runtime_file(DispatcherConfig.socket_path) unlink(socket_path) self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory) self.opensips_management = opensips.ManagementInterface() self.management_factory = ManagementControlFactory(self) management_addr, management_port = DispatcherConfig.listen_management if DispatcherConfig.management_use_tls: self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.cred, interface=management_addr) else: self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr) def run(self): process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) for accounting_module in self.accounting: accounting_module.start() reactor.run(installSignalHandlers=False) def send_command(self, command, headers): return maybeDeferred(self.relay_factory.send_command, command, headers) def update_statistics(self, stats): log.debug("Got statistics: %s" % stats) if stats["start_time"] is not None: for accounting in self.accounting: try: accounting.do_accounting(stats) except Exception, e: log.error("An unhandled error occured while doing accounting: %s" % e) log.err() def _handle_SIGHUP(self, *args): log.msg("Received SIGHUP, shutting down.") reactor.callFromThread(self._shutdown) def _handle_SIGINT(self, *args): if process._daemon: log.msg("Received SIGINT, shutting down.") else: log.msg("Received KeyboardInterrupt, exiting.") reactor.callFromThread(self._shutdown) def _handle_SIGTERM(self, *args): log.msg("Received SIGTERM, shutting down.") reactor.callFromThread(self._shutdown) def _shutdown(self): defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None]) defer.addCallback(lambda x: self.opensips_factory.shutdown()) defer.addCallback(lambda x: self.management_factory.shutdown()) defer.addCallback(lambda x: self.relay_factory.shutdown()) defer.addCallback(lambda x: self._stop()) def _stop(self): for act in self.accounting: act.stop() reactor.stop() diff --git a/mediaproxy/headers.py b/mediaproxy/headers.py index cef2907..0f461da 100644 --- a/mediaproxy/headers.py +++ b/mediaproxy/headers.py @@ -1,108 +1,107 @@ -# Copyright (C) 2008 AG Projects -# Author: Ruud Klaver +# Copyright (C) 2008-2014 AG Projects # """Header encoding and decoding rules for communication between the dispatcher and relay components""" class EncodingError(Exception): pass class DecodingError(Exception): pass class MediaProxyHeaders(object): @classmethod def encode(cls, name, value): func_name = "encode_%s" % name if hasattr(cls, func_name): return getattr(cls, func_name)(value) else: return value @classmethod def decode(cls, name, value): func_name = "decode_%s" % name if hasattr(cls, func_name): return getattr(cls, func_name)(value) else: return value @staticmethod def encode_cseq(value): return str(value) @staticmethod def decode_cseq(value): try: return int(value) except ValueError: raise DecodingError("Not an integer: %s" % value) @staticmethod def encode_type(value): if value not in ["request", "reply"]: raise EncodingError('"type" header should be either "request" or "reply"') return value @staticmethod def decode_type(value): if value not in ["request", "reply"]: raise DecodingError('"type" header should be either "request" or "reply"') return value @staticmethod def encode_media(value): try: return ','.join(':'.join([type, ip, str(port), direction] + ['%s=%s' % (k, v) for k, v in parameters.iteritems()]) for type, ip, port, direction, parameters in value) except: raise EncodingError("Ill-formatted media information") @staticmethod def decode_media(value): try: streams = [] for stream_data in (data for data in value.split(",") if data): stream_data = stream_data.split(":") type, ip, port, direction = stream_data[:4] parameters = dict(param.split("=") for param in stream_data[4:] if param) streams.append((type, ip, int(port), direction, parameters)) return streams except: raise DecodingError("Ill-formatted media header") class CodingDict(dict): def __init__(self, *args, **kwargs): if not args and not kwargs: it = [] elif kwargs: it = kwargs.iteritems() elif isinstance(args[0], dict): it = args[0].iteritems() else: try: it = iter(args[0]) except: dict.__init__(self, *args, **kwargs) return dict.__init__(self) for key, value in it: self.__setitem__(key, value) class EncodingDict(CodingDict): def __setitem__(self, key, value): encoded_value = MediaProxyHeaders.encode(key, value) dict.__setitem__(self, key, encoded_value) class DecodingDict(CodingDict): def __setitem__(self, key, value): decoded_value = MediaProxyHeaders.decode(key, value) dict.__setitem__(self, key, decoded_value) diff --git a/mediaproxy/interfaces/__init__.py b/mediaproxy/interfaces/__init__.py index 2d987ed..2c6e1a8 100644 --- a/mediaproxy/interfaces/__init__.py +++ b/mediaproxy/interfaces/__init__.py @@ -1,5 +1,5 @@ -# Copyright (C) 2008 AG-Projects. +# Copyright (C) 2008-2014 AG-Projects. # """Interfaces between Mediaproxy and the other components in the system""" diff --git a/mediaproxy/interfaces/opensips.py b/mediaproxy/interfaces/opensips.py index 83ccbb0..a6a7cc3 100644 --- a/mediaproxy/interfaces/opensips.py +++ b/mediaproxy/interfaces/opensips.py @@ -1,159 +1,159 @@ -# Copyright (C) 2006-2008 AG Projects. +# Copyright (C) 2006-2014 AG Projects. # """The OpenSIPS Management Interface""" import socket from collections import deque from twisted.internet import reactor, defer from twisted.internet.protocol import DatagramProtocol from twisted.internet.error import CannotListenError from twisted.python.failure import Failure from application.python.types import Singleton from application.process import process from application.system import unlink from application import log from mediaproxy.configuration import OpenSIPSConfig class Error(Exception): pass class CommandError(Error): pass class TimeoutError(Error): pass class NegativeReplyError(Error): pass class Request(object): def __init__(self, command): self.command = command self.deferred = defer.Deferred() class UNIXSocketProtocol(DatagramProtocol): noisy = False def datagramReceived(self, data, address): deferred = self.transport.deferred if deferred is None or deferred.called: return # accumulate in a buffer until message end (do this later when implemented by opensips) -Dan if not data: failure = Failure(CommandError("Empty reply from OpenSIPS")) deferred.errback(failure) return try: status, msg = data.split('\n', 1) except ValueError: failure = Failure(CommandError("Missing line terminator after status line in OpenSIPS reply")) deferred.errback(failure) return if status.upper() == '200 OK': deferred.callback(msg) else: deferred.errback(Failure(NegativeReplyError(status))) class UNIXSocketConnection(object): timeout = 3 def __init__(self, socket_path): self._initialized = False self.path = socket_path self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol()) reactor.addSystemEventTrigger('during', 'shutdown', self.close) self.transport.deferred = None ## placeholder for the deferred used by a request self._initialized = True def close(self): if self._initialized: self.transport.stopListening() unlink(self.path) def _get_deferred(self): return self.transport.deferred def _set_deferred(self, d): self.transport.deferred = d deferred = property(_get_deferred, _set_deferred) def _did_timeout(self, deferred): if deferred.called: return deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) def send(self, request): self.deferred = request.deferred try: self.transport.write(request.command, OpenSIPSConfig.socket_path) except socket.error, why: log.error("cannot write request to `%s': %s" % (OpenSIPSConfig.socket_path, why[1])) self.deferred.errback(Failure(CommandError("Cannot send request to OpenSIPS"))) else: reactor.callLater(self.timeout, self._did_timeout, self.deferred) class UNIXSocketConnectionPool(object): """Pool of UNIX socket connection to OpenSIPS""" def __init__(self, max_connections=10, pool_id=''): assert max_connections > 0, 'maximum should be > 0' self.max = max_connections self.id = pool_id self.workers = 0 self.waiters = deque() self.connections = deque() def _create_connections_as_needed(self): while self.workers < self.max and len(self.waiters) > len(self.connections): socket_name = "opensips_%s%02d.sock" % (self.id, self.workers+1) socket_path = process.runtime_file(socket_name) unlink(socket_path) try: conn = UNIXSocketConnection(socket_path) except CannotListenError, why: log.error("cannot create an OpenSIPS UNIX socket connection: %s" % str(why)) break self.connections.append(conn) self.workers += 1 def _release_connection(self, result, conn): self.connections.append(conn) self._process_waiters() return result def _process_waiters(self): while self.waiters: try: conn = self.connections.popleft() except IndexError: return request = self.waiters.popleft() request.deferred.addBoth(self._release_connection, conn) conn.send(request) def defer_to_connection(self, command): request = Request(command) self.waiters.append(request) self._create_connections_as_needed() self._process_waiters() return request.deferred class ManagementInterface(object): __metaclass__ = Singleton def __init__(self): self.pool = UNIXSocketConnectionPool(OpenSIPSConfig.max_connections) ## Reply handlers __RH_xxx def __RH_end_dialog(self, result): if isinstance(result, Failure): log.error("failed to end dialog: %s" % result.value) return False return True def end_dialog(self, dialog_id): cmd = ':dlg_end_dlg:\n%s\n%s\n\n' % (dialog_id.h_entry, dialog_id.h_id) return self.pool.defer_to_connection(cmd).addBoth(self.__RH_end_dialog) diff --git a/mediaproxy/iputils.py b/mediaproxy/iputils.py index 3d2cb1c..e1237a2 100644 --- a/mediaproxy/iputils.py +++ b/mediaproxy/iputils.py @@ -1,47 +1,47 @@ -# Copyright (C) 2008 AG Projects +# Copyright (C) 2008-2014 AG Projects # """IP address utilities""" __all__ = ["is_routable_ip"] import socket import struct from application.configuration.datatypes import NetworkRangeList from mediaproxy.configuration import RelayConfig # Non routable network addresses (RFC 3330) # _non_routable_netlist = [ '0.0.0.0/8', '10.0.0.0/8', '127.0.0.0/8', '169.254.0.0/16', '172.16.0.0/12', '192.0.2.0/24', '192.168.0.0/16', '198.51.100.0/24', '203.0.113.0/24', '224.0.0.0/4', '255.255.255.255/32' ] _non_routable_nets = NetworkRangeList(_non_routable_netlist) def is_routable_ip(ip): try: ip_addr = struct.unpack('!L', socket.inet_aton(ip))[0] except: return False for netbase, mask in RelayConfig.routable_private_ranges: if (ip_addr & mask) == netbase: return True for netbase, mask in _non_routable_nets: if (ip_addr & mask) == netbase: return False return True diff --git a/mediaproxy/mediacontrol.py b/mediaproxy/mediacontrol.py index 99a1f61..6abe3a1 100644 --- a/mediaproxy/mediacontrol.py +++ b/mediaproxy/mediacontrol.py @@ -1,796 +1,795 @@ -# Copyright (C) 2008 AG Projects -# Author: Ruud Klaver +# Copyright (C) 2008-2014 AG Projects # import struct from time import time from collections import deque from operator import attrgetter from itertools import chain from application import log from twisted.internet import reactor from twisted.internet.interfaces import IReadDescriptor from twisted.internet.protocol import DatagramProtocol from twisted.internet.error import CannotListenError from twisted.python.log import Logger from zope.interface import implements from mediaproxy.configuration import RelayConfig from mediaproxy.interfaces.system import _conntrack from mediaproxy.iputils import is_routable_ip from mediaproxy.scheduler import RecurrentCall, KeepRunning UDP_TIMEOUT_FILE = "/proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream" rtp_payloads = { 0: "G711u", 1: "1016", 2: "G721", 3: "GSM", 4: "G723", 5: "DVI4", 6: "DVI4", 7: "LPC", 8: "G711a", 9: "G722", 10: "L16", 11: "L16", 14: "MPA", 15: "G728", 18: "G729", 25: "CelB", 26: "JPEG", 28: "nv", 31: "H261", 32: "MPV", 33: "MP2T", 34: "H263" } class RelayPortsExhaustedError(Exception): pass if RelayConfig.relay_ip is None: raise RuntimeError("Could not determine default host IP; either add default route or specify relay IP manually") class Address(object): """Representation of an endpoint address""" def __init__(self, host, port, in_use=True, got_rtp=False): self.host = host self.port = port self.in_use = self.__nonzero__() and in_use self.got_rtp = got_rtp def __len__(self): return 2 def __nonzero__(self): return None not in (self.host, self.port) def __getitem__(self, index): return (self.host, self.port)[index] def __contains__(self, item): return item in (self.host, self.port) def __iter__(self): yield self.host yield self.port def __str__(self): return self.__nonzero__() and ("%s:%d" % (self.host, self.port)) or "Unknown" def __repr__(self): return "%s(%r, %r, in_use=%r, got_rtp=%r)" % (self.__class__.__name__, self.host, self.port, self.in_use, self.got_rtp) def forget(self): self.host, self.port, self.in_use, self.got_rtp = None, None, False, False @property def unknown(self): return None in (self.host, self.port) @property def obsolete(self): return self.__nonzero__() and not self.in_use class Counters(dict): def __add__(self, other): n = Counters(self) for k, v in other.iteritems(): n[k] += v return n def __iadd__(self, other): for k, v in other.iteritems(): self[k] += v return self @property def caller_bytes(self): return self['caller_bytes'] @property def callee_bytes(self): return self['callee_bytes'] @property def caller_packets(self): return self['caller_packets'] @property def callee_packets(self): return self['callee_packets'] @property def relayed_bytes(self): return self['caller_bytes'] + self['callee_bytes'] @property def relayed_packets(self): return self['caller_packets'] + self['callee_packets'] class StreamListenerProtocol(DatagramProtocol): noisy = False def __init__(self): self.cb_func = None self.sdp = None self.send_packet_count = 0 self.stun_queue = [] def datagramReceived(self, data, (host, port)): if self.cb_func is not None: self.cb_func(host, port, data) def set_remote_sdp(self, ip, port): if is_routable_ip(ip): self.sdp = ip, port else: self.sdp = None def send(self, data, is_stun, ip=None, port=None): if is_stun: self.stun_queue.append(data) if ip is None or port is None: # this means that we have not received any packets from this host yet, # so we have not learnt its address if self.sdp is None: # we can't do anything if we haven't received the SDP IP yet or # it was in a private range return ip, port = self.sdp # we learnt the IP, empty the STUN packets queue if self.stun_queue: for data in self.stun_queue: self.transport.write(data, (ip, port)) self.stun_queue = [] if not is_stun: if not self.send_packet_count % RelayConfig.userspace_transmit_every: self.transport.write(data, (ip, port)) self.send_packet_count += 1 def _stun_test(data): # Check if data is a STUN request and if it's a binding request if len(data) < 20: return False, False msg_type, msg_len, magic = struct.unpack("!HHI", data[:8]) if msg_type & 0xc == 0 and magic == 0x2112A442: if msg_type == 0x0001: return True, True else: return True, False else: return False, False class MediaSubParty(object): def __init__(self, substream, listener): self.substream = substream self.listener = listener self.listener.protocol.cb_func = self.got_data self.remote = Address(None, None) host = self.listener.protocol.transport.getHost() self.local = Address(host.host, host.port) self.timer = None self.codec = "Unknown" self.got_stun_probing = False self.reset() def reset(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, "no-traffic timeout", RelayConfig.stream_timeout) self.remote.in_use = False # keep remote address around but mark it as obsolete self.remote.got_rtp = False self.got_stun_probing = False self.listener.protocol.send_packet_count = 0 def before_hold(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = reactor.callLater(RelayConfig.on_hold_timeout, self.substream.expired, "on hold timeout", RelayConfig.on_hold_timeout) def after_hold(self): if self.timer and self.timer.active(): self.timer.cancel() if not self.remote.in_use: self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, "no-traffic timeout", RelayConfig.stream_timeout) def got_data(self, host, port, data): if (host, port) == tuple(self.remote): if self.remote.obsolete: # the received packet matches the previously used IP/port, # which has been made obsolete, so ignore it return else: if self.remote.in_use: # the received packet is different than the recorded IP/port, # so we will discard it return # we have learnt the remote IP/port self.remote.host, self.remote.port = host, port self.remote.in_use = True log.debug("Got traffic information for stream: %s" % self.substream.stream) is_stun, is_binding_request = _stun_test(data) self.substream.send_data(self, data, is_stun) if not self.remote.got_rtp and not is_stun: # This is the first RTP packet received self.remote.got_rtp = True if self.timer: if self.timer.active(): self.timer.cancel() self.timer = None if self.codec == "Unknown" and self.substream is self.substream.stream.rtp: try: pt = ord(data[1]) & 127 except IndexError: pass else: if pt > 95: self.codec = "Dynamic(%d)" % pt elif pt in rtp_payloads: self.codec = rtp_payloads[pt] else: self.codec = "Unknown(%d)" % pt self.substream.check_create_conntrack() if is_binding_request: self.got_stun_probing = True def cleanup(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = None self.listener.protocol.cb_func = None self.substream = None class MediaSubStream(object): def __init__(self, stream, listener_caller, listener_callee): self.stream = stream self.forwarding_rule = None self.caller = MediaSubParty(self, listener_caller) self.callee = MediaSubParty(self, listener_callee) self._counters = Counters(caller_bytes=0, callee_bytes=0, caller_packets=0, callee_packets=0) @property def counters(self): """Accumulated counters from all the forwarding rules the stream had""" if self.forwarding_rule is None: return self._counters else: try: return self._counters + self.forwarding_rule.counters except _conntrack.Error: return self._counters def _stop_relaying(self): if self.forwarding_rule is not None: try: self._counters += self.forwarding_rule.counters except _conntrack.Error: pass self.forwarding_rule = None def reset(self, party): if party == "caller": self.caller.reset() else: self.callee.reset() self._stop_relaying() def check_create_conntrack(self): if self.stream.first_media_time is None: self.stream.first_media_time = time() if self.caller.remote.in_use and self.caller.remote.got_rtp and self.callee.remote.in_use and self.callee.remote.got_rtp: self.forwarding_rule = _conntrack.ForwardingRule(self.caller.remote, self.caller.local, self.callee.remote, self.callee.local, self.stream.session.mark) self.forwarding_rule.expired_func = self.conntrack_expired def send_data(self, source, data, is_stun): if source is self.caller: dest = self.callee else: dest = self.caller if dest.remote: # if we have already learnt the remote address of the destination, use that ip, port = dest.remote.host, dest.remote.port dest.listener.protocol.send(data, is_stun, ip, port) else: # otherwise use the IP/port specified in the SDP, if public dest.listener.protocol.send(data, is_stun) def conntrack_expired(self): try: timeout_wait = int(open(UDP_TIMEOUT_FILE).read()) except: timeout_wait = 0 self.expired("conntrack timeout", timeout_wait) def expired(self, reason, timeout_wait): self._stop_relaying() self.stream.substream_expired(self, reason, timeout_wait) def cleanup(self): self.caller.cleanup() self.callee.cleanup() self._stop_relaying() self.stream = None class MediaParty(object): def __init__(self, stream): self.manager = stream.session.manager self._remote_sdp = None self.is_on_hold = False self.uses_ice = False while True: self.listener_rtp = None self.ports = port_rtp, port_rtcp = self.manager.get_ports() try: self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=RelayConfig.relay_ip) self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=RelayConfig.relay_ip) except CannotListenError: if self.listener_rtp is not None: self.listener_rtp.stopListening() self.manager.set_bad_ports(self.ports) log.warn("Cannot use port pair %d/%d" % self.ports) else: break def _get_remote_sdp(self): return self._remote_sdp def _set_remote_sdp(self, (ip, port)): self._remote_sdp = ip, port self.listener_rtp.protocol.set_remote_sdp(ip, port) remote_sdp = property(_get_remote_sdp, _set_remote_sdp) def cleanup(self): self.listener_rtp.stopListening() self.listener_rtcp.stopListening() self.manager.free_ports(self.ports) self.manager = None class MediaStream(object): def __init__(self, session, media_type, media_ip, media_port, direction, media_parameters, initiating_party): self.is_alive = True self.session = session self.media_type = media_type self.caller = MediaParty(self) self.callee = MediaParty(self) self.rtp = MediaSubStream(self, self.caller.listener_rtp, self.callee.listener_rtp) self.rtcp = MediaSubStream(self, self.caller.listener_rtcp, self.callee.listener_rtcp) getattr(self, initiating_party).remote_sdp = (media_ip, media_port) getattr(self, initiating_party).uses_ice = (media_parameters.get("ice", "no") == "yes") self.check_hold(initiating_party, direction, media_ip) self.create_time = time() self.first_media_time = None self.start_time = None self.end_time = None self.status = "active" self.timeout_wait = 0 def __str__(self): if self.caller.remote_sdp is None: src = "Unknown" else: src = "%s:%d" % self.caller.remote_sdp if self.caller.is_on_hold: src += " ON HOLD" if self.caller.uses_ice: src += " (ICE)" if self.callee.remote_sdp is None: dst = "Unknown" else: dst = "%s:%d" % self.callee.remote_sdp if self.callee.is_on_hold: dst += " ON HOLD" if self.callee.uses_ice: dst += " (ICE)" rtp = self.rtp rtcp = self.rtcp return "(%s) %s (RTP: %s, RTCP: %s) <-> %s <-> %s <-> %s (RTP: %s, RTCP: %s)" % ( self.media_type, src, rtp.caller.remote, rtcp.caller.remote, rtp.caller.local, rtp.callee.local, dst, rtp.callee.remote, rtcp.callee.remote) @property def counters(self): return self.rtp.counters + self.rtcp.counters @property def is_on_hold(self): return self.caller.is_on_hold or self.callee.is_on_hold def check_hold(self, party, direction, ip): previous_hold = self.is_on_hold party = getattr(self, party) if direction == "sendonly" or direction == "inactive": party.is_on_hold = True elif ip == "0.0.0.0": party.is_on_hold = True else: party.is_on_hold = False if previous_hold and not self.is_on_hold: for substream in [self.rtp, self.rtcp]: for subparty in [substream.caller, substream.callee]: self.status = "active" subparty.after_hold() if not previous_hold and self.is_on_hold: for substream in [self.rtp, self.rtcp]: for subparty in [substream.caller, substream.callee]: self.status = "on hold" subparty.before_hold() def reset(self, party, media_ip, media_port): self.rtp.reset(party) self.rtcp.reset(party) getattr(self, party).remote_sdp = (media_ip, media_port) def substream_expired(self, substream, reason, timeout_wait): if substream is self.rtp and reason == "no-traffic timeout" and self.caller.uses_ice and self.callee.uses_ice and (substream.caller.got_stun_probing or substream.callee.got_stun_probing): reason = "unselected ICE candidate" if substream is self.rtcp or (self.is_on_hold and reason=='conntrack timeout'): # Forget about the remote addresses, this will cause any # re-occurence of the same traffic to be forwarded again substream.caller.remote.forget() substream.caller.listener.protocol.send_packet_count = 0 substream.callee.remote.forget() substream.callee.listener.protocol.send_packet_count = 0 else: session = self.session self.cleanup(reason) self.timeout_wait = timeout_wait session.stream_expired(self) def cleanup(self, status="closed"): if self.is_alive: self.is_alive = False self.status = status self.caller.cleanup() self.callee.cleanup() self.rtp.cleanup() self.rtcp.cleanup() self.session = None self.end_time = time() class Session(object): def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark = 0): self.manager = manager self.dispatcher = dispatcher self.call_id = call_id self.from_tag = from_tag self.to_tag = None self.mark = mark self.from_uri = from_uri self.to_uri = to_uri self.caller_ua = None self.callee_ua = None self.cseq = None self.previous_cseq = None self.streams = {} self.start_time = None self.end_time = None self.update_media(cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq) def __str__(self): return "%s: %s (%s) --> %s" % (self.call_id, self.from_uri, self.from_tag, self.to_uri) def update_media(self, cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq): if self.cseq is None: old_cseq = (0,0) else: old_cseq = self.cseq if is_caller_cseq: cseq = (cseq, old_cseq[1]) if self.to_tag is None and to_tag is not None: self.to_tag = to_tag else: cseq = (old_cseq[0], cseq) if is_downstream: party = "caller" if self.caller_ua is None: self.caller_ua = user_agent else: party = "callee" if self.callee_ua is None: self.callee_ua = user_agent if self.cseq is None or cseq > self.cseq: if not media_list: return log.debug("Received new SDP offer") self.streams[cseq] = new_streams = [] if self.cseq is None: old_streams = [] else: old_streams = self.streams[self.cseq] for media_type, media_ip, media_port, media_direction, media_parameters in media_list: stream = None for old_stream in old_streams: old_remote = getattr(old_stream, party).remote_sdp if old_remote is not None: old_ip, old_port = old_remote else: old_ip, old_port = None, None if old_stream.is_alive and old_stream.media_type == media_type and ((media_ip, media_port) in ((old_ip, old_port), ('0.0.0.0', old_port), (old_ip, 0))): stream = old_stream stream.check_hold(party, media_direction, media_ip) log.debug("Found matching existing stream: %s" % stream) break if stream is None: stream = MediaStream(self, media_type, media_ip, media_port, media_direction, media_parameters, party) log.debug("Added new stream: %s" % stream) if media_port == 0: stream.cleanup() log.debug("Stream explicitly closed: %s" % stream) new_streams.append(stream) if self.previous_cseq is not None: for stream in self.streams[self.previous_cseq]: if stream not in self.streams[self.cseq] + new_streams: stream.cleanup() self.previous_cseq = self.cseq self.cseq = cseq elif self.cseq == cseq: log.debug("Received updated SDP answer") now = time() if self.start_time is None: self.start_time = now current_streams = self.streams[cseq] for stream in current_streams: if stream.start_time is None: stream.start_time = now if to_tag is not None and not media_list: return if len(media_list) < len(current_streams): for stream in current_streams[len(media_list):]: log.debug("Stream rejected by not being included in the SDP answer: %s" % stream) stream.cleanup("rejected") if stream.start_time is None: stream.start_time = now for stream, (media_type, media_ip, media_port, media_direction, media_parameters) in zip(current_streams, media_list): if stream.media_type != media_type: raise ValueError('Media types do not match: "%s" and "%s"' % (stream.media_type, media_type)) if media_port == 0: log.debug("Stream explicitly rejected: %s" % stream) stream.cleanup("rejected") continue stream.check_hold(party, media_direction, media_ip) party_info = getattr(stream, party) party_info.uses_ice = (media_parameters.get("ice", "no") == "yes") if party_info.remote_sdp is None or party_info.remote_sdp[0] == "0.0.0.0": party_info.remote_sdp = (media_ip, media_port) log.debug("Got initial answer from %s for stream: %s" % (party, stream)) else: if party_info.remote_sdp[1] != media_port or (party_info.remote_sdp[0] != media_ip != '0.0.0.0'): stream.reset(party, media_ip, media_port) log.debug("Updated %s for stream: %s" % (party, stream)) else: log.debug("Unchanged stream: %s" % stream) if self.previous_cseq is not None: for stream in [stream for stream in self.streams[self.previous_cseq] if stream not in current_streams]: log.debug("Removing old stream: %s" % stream) stream.cleanup() else: log.debug("Received old CSeq %d:%d, ignoring" % cseq) def get_local_media(self, is_downstream, cseq, is_caller_cseq): if is_caller_cseq: pos = 0 else: pos = 1 try: cseq = max(key for key in self.streams.keys() if key[pos] == cseq) except ValueError: return None if is_downstream: retval = [(stream.status in ["active", "on hold"]) and tuple(stream.rtp.callee.local) or (stream.rtp.callee.local.host, 0) for stream in self.streams[cseq]] else: retval = [(stream.status in ["active", "on hold"]) and tuple(stream.rtp.caller.local) or (stream.rtp.caller.local.host, 0) for stream in self.streams[cseq]] return retval def cleanup(self): self.end_time = time() for cseq in [self.previous_cseq, self.cseq]: if cseq is not None: for stream in self.streams[cseq]: stream.cleanup() def stream_expired(self, stream): active_streams = set() for cseq in [self.previous_cseq, self.cseq]: if cseq is not None: active_streams.update([stream for stream in self.streams[cseq] if stream.is_alive]) if len(active_streams) == 0: self.manager.session_expired(self.call_id, self.from_tag) @property def duration(self): if self.start_time is not None: if self.end_time is not None: return int(self.end_time - self.start_time) else: return int(time() - self.start_time) else: return 0 @property def relayed_bytes(self): return sum(stream.counters.relayed_bytes for stream in set(chain(*self.streams.itervalues()))) @property def statistics(self): all_streams = set(chain(*self.streams.itervalues())) attributes = ('call_id', 'from_tag', 'from_uri', 'to_tag', 'to_uri', 'start_time', 'duration') stats = dict((name, getattr(self, name)) for name in attributes) stats['caller_ua'] = self.caller_ua or 'Unknown' stats['callee_ua'] = self.callee_ua or 'Unknown' stats['streams'] = streams = [] stream_attributes = ('media_type', 'status', 'timeout_wait') for stream in sorted(all_streams, key=attrgetter('start_time')): info = dict((name, getattr(stream, name)) for name in stream_attributes) info['caller_codec'] = stream.rtp.caller.codec info['callee_codec'] = stream.rtp.callee.codec if stream.start_time is None: info['start_time'] = info['end_time'] = None elif self.start_time is None: info['start_time'] = info['end_time'] = 0 else: info['start_time'] = max(int(stream.start_time - self.start_time), 0) if stream.status == 'rejected': info['end_time'] = info['start_time'] else: if stream.end_time is None: info['end_time'] = stats['duration'] else: info['end_time'] = min(int(stream.end_time - self.start_time), self.duration) if stream.first_media_time is None: info['post_dial_delay'] = None else: info['post_dial_delay'] = stream.first_media_time - stream.create_time caller = stream.rtp.caller callee = stream.rtp.callee info.update(stream.counters) info['caller_local'] = str(caller.local) info['callee_local'] = str(callee.local) info['caller_remote'] = str(caller.remote) info['callee_remote'] = str(callee.remote) streams.append(info) return stats class SessionManager(Logger): implements(IReadDescriptor) def __init__(self, relay, start_port, end_port): self.relay = relay self.ports = deque((i, i+1) for i in xrange(start_port, end_port, 2)) self.bad_ports = deque() self.sessions = {} self.watcher = _conntrack.ExpireWatcher() self.active_byte_counter = 0 # relayed byte counter for sessions active during last speed measurement self.closed_byte_counter = 0 # relayed byte counter for sessions closed after last speed measurement self.bps_relayed = 0 if RelayConfig.traffic_sampling_period > 0: self.speed_calculator = RecurrentCall(RelayConfig.traffic_sampling_period, self._measure_speed) else: self.speed_calculator = None reactor.addReader(self) def _measure_speed(self): start_time = time() current_byte_counter = sum(session.relayed_bytes for session in self.sessions.itervalues()) self.bps_relayed = 8 * (current_byte_counter + self.closed_byte_counter - self.active_byte_counter) / RelayConfig.traffic_sampling_period self.active_byte_counter = current_byte_counter self.closed_byte_counter = 0 us_taken = int((time() - start_time) * 1000000) if us_taken > 10000: log.warn("Aggregate speed calculation time exceeded 10ms: %d us for %d sessions" % (us_taken, len(self.sessions))) return KeepRunning # implemented for IReadDescriptor def fileno(self): return self.watcher.fd def doRead(self): stream = self.watcher.read() if stream: stream.expired_func() def connectionLost(self, reason): reactor.removeReader(self) # port management def get_ports(self): if len(self.bad_ports) > len(self.ports): log.debug("Excessive amount of bad ports, doing cleanup") self.ports.extend(self.bad_ports) self.bad_ports = deque() try: return self.ports.popleft() except IndexError: raise RelayPortsExhaustedError() def set_bad_ports(self, ports): self.bad_ports.append(ports) def free_ports(self, ports): self.ports.append(ports) # called by higher level def _find_session_key(self, call_id, from_tag, to_tag): key_from = (call_id, from_tag) if key_from in self.sessions: return key_from if to_tag: key_to = (call_id, to_tag) if key_to in self.sessions: return key_to return None def has_session(self, call_id, from_tag, to_tag=None, **kw): return any((call_id, tag) in self.sessions for tag in (from_tag, to_tag) if tag is not None) def update_session(self, dispatcher, call_id, from_tag, from_uri, to_uri, cseq, user_agent, type, media=[], to_tag=None, **kw): key = self._find_session_key(call_id, from_tag, to_tag) if key: session = self.sessions[key] log.debug("updating existing session %s" % session) is_downstream = (session.from_tag != from_tag) ^ (type == "request") is_caller_cseq = (session.from_tag == from_tag) session.update_media(cseq, to_tag, user_agent, media, is_downstream, is_caller_cseq) elif type == "reply" and not media: return None else: is_downstream = type == "request" is_caller_cseq = True session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq) self.sessions[(call_id, from_tag)] = session self.relay.add_session(dispatcher) log.debug("created new session %s" % session) return session.get_local_media(is_downstream, cseq, is_caller_cseq) def remove_session(self, call_id, from_tag, to_tag=None, **kw): key = self._find_session_key(call_id, from_tag, to_tag) try: session = self.sessions[key] except KeyError: log.warn("The dispatcher tried to remove a session which is no longer present on the relay") return None log.debug("removing session %s" % session) session.cleanup() self.closed_byte_counter += session.relayed_bytes del self.sessions[key] reactor.callLater(0, self.relay.remove_session, session.dispatcher) return session def session_expired(self, call_id, from_tag): key = (call_id, from_tag) try: session = self.sessions[key] except KeyError: log.warn("A session expired that was no longer present on the relay") return log.debug("expired session %s" % session) session.cleanup() self.closed_byte_counter += session.relayed_bytes del self.sessions[key] self.relay.session_expired(session) self.relay.remove_session(session.dispatcher) def cleanup(self): if self.speed_calculator is not None: self.speed_calculator.cancel() for key in self.sessions.keys(): self.session_expired(*key) @property def statistics(self): return [session.statistics for session in self.sessions.itervalues()] @property def stream_count(self): stream_count = {} for session in self.sessions.itervalues(): for stream in set(chain(*session.streams.itervalues())): if stream.is_alive: stream_count[stream.media_type] = stream_count.get(stream.media_type, 0) + 1 return stream_count diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py index 76e6c72..56d8283 100644 --- a/mediaproxy/relay.py +++ b/mediaproxy/relay.py @@ -1,389 +1,388 @@ -# Copyright (C) 2008 AG Projects -# Author: Ruud Klaver +# Copyright (C) 2008-2014 AG Projects # """Implementation of the MediaProxy relay""" import cjson import signal import resource from time import time try: from twisted.internet import epollreactor; epollreactor.install() except: raise RuntimeError("mandatory epoll reactor support is missing from the twisted framework") from application import log from application.process import process from gnutls.errors import CertificateError, CertificateSecurityError from twisted.protocols.basic import LineOnlyReceiver from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError from twisted.internet.protocol import ClientFactory from twisted.internet.defer import DeferredList, succeed from twisted.internet import reactor from twisted.python import failure from twisted.names import dns from twisted.names.client import lookupService from twisted.names.error import DomainError from mediaproxy import __version__ from mediaproxy.configuration import RelayConfig from mediaproxy.headers import DecodingDict, DecodingError from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials ## Increase the system limit for the maximum number of open file descriptors ## to be able to handle connections to all ports in port_range try: fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000 resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit)) except ValueError: raise RuntimeError("Cannot set resource limit for maximum open file descriptors to %d" % fd_limit) else: new_limits = resource.getrlimit(resource.RLIMIT_NOFILE) if new_limits < (fd_limit, fd_limit): raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit)) else: log.msg("Set resource limit for maximum open file descriptors to %d" % fd_limit) class RelayClientProtocol(LineOnlyReceiver): noisy = False required_headers = {'update': set(['call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type']), 'remove': set(['call_id', 'from_tag']), 'summary': set(), 'sessions': set()} def __init__(self): self.command = None self.seq = None self._connection_watcher = None self._queued_keepalives = 0 def _send_keepalive(self): if self._queued_keepalives >= 3: # 3 keepalives in a row didn't get an answer. assume connection is down. log.error("missed 3 keepalive answers in a row. assuming the connection is down.") # do not use loseConnection() as it waits to flush the output buffers. reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) return None self.transport.write("ping\r\n") self._queued_keepalives += 1 return KeepRunning def connectionMade(self): peer = self.transport.getPeer() log.debug("Connected to dispatcher at %s:%d" % (peer.host, peer.port)) if RelayConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not RelayConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive) def connectionLost(self, reason): if self._connection_watcher is not None: self._connection_watcher.cancel() self._connection_watcher = None self._queued_keepalives = 0 def lineReceived(self, line): if line == 'pong': self._queued_keepalives -= 1 return if self.command is None: try: command, seq = line.split() except ValueError: log.error("Could not decode command/sequence number pair from dispatcher: %s" % line) return if command in self.required_headers: self.command = command self.seq = seq self.headers = DecodingDict() else: log.error("Unknown command: %s" % command) self.transport.write("%s error\r\n" % seq) elif line == "": try: missing_headers = self.required_headers[self.command].difference(self.headers) if missing_headers: for header in missing_headers: log.error("Missing mandatory header '%s' from '%s' command" % (header, self.command)) response = "error" else: try: response = self.factory.parent.got_command(self.factory.host, self.command, self.headers) except: log.err() response = "error" finally: self.transport.write("%s %s\r\n" % (self.seq, response)) self.command = None else: try: name, value = line.split(": ", 1) except ValueError: log.error("Unable to parse header: %s" % line) else: try: self.headers[name] = value except DecodingError, e: log.error("Could not decode header: %s" % e) class DispatcherConnectingFactory(ClientFactory): noisy = False protocol = RelayClientProtocol def __init__(self, parent, host, port): self.parent = parent self.host = (host, port) self.delayed = None self.connection_lost = False def __eq__(self, other): return self.host == other.host def clientConnectionFailed(self, connector, reason): log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value)) if self.parent.connector_needs_reconnect(connector): self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) def clientConnectionLost(self, connector, reason): self.cancel_delayed() if reason.type != ConnectionDone: log.error("Connection with dispatcher at %(host)s:%(port)d was lost: %%s" % connector.__dict__ % reason.value) else: log.msg("Connection with dispatcher at %(host)s:%(port)d was closed" % connector.__dict__) if self.parent.connector_needs_reconnect(connector): if isinstance(reason.value, CertificateError) or self.connection_lost: self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) else: self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect) self.connection_lost = True def buildProtocol(self, addr): self.delayed = reactor.callLater(5, self._connected_successfully) return ClientFactory.buildProtocol(self, addr) def _connected_successfully(self): self.connection_lost = False def cancel_delayed(self): if self.delayed: if self.delayed.active(): self.delayed.cancel() self.delayed = None class SRVMediaRelayBase(object): def __init__(self): self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup) self._do_lookup() def _do_lookup(self): defers = [] for addr, port, is_domain in RelayConfig.dispatchers: if is_domain: defer = lookupService("_sip._udp.%s" % addr) defer.addCallback(self._cb_got_srv, port) defer.addErrback(self._eb_no_srv, addr, port) defers.append(defer) else: defers.append(succeed((addr, port))) defer = DeferredList(defers) defer.addCallback(self._cb_got_all) return KeepRunning def _cb_got_srv(self, (answers, auth, add), port): for answer in answers: if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."): return str(answer.payload.target), port raise DomainError def _eb_no_srv(self, failure, addr, port): failure.trap(DomainError) return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr) def _eb_no_dns(self, failure, addr): failure.trap(DNSLookupError) log.error("Could resolve neither SRV nor A record for '%s'" % addr) def _cb_got_all(self, results): if not self.shutting_down: dispatchers = [result[1] for result in results if result[0] and result[1] is not None] self.update_dispatchers(dispatchers) def update_dispatchers(self, dispatchers): raise NotImplementedError() def run(self): process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) reactor.run(installSignalHandlers=False) def _handle_SIGHUP(self, *args): log.msg("Received SIGHUP, shutting down after all sessions have expired.") reactor.callFromThread(self.shutdown, graceful=True) def _handle_SIGINT(self, *args): if process._daemon: log.msg("Received SIGINT, shutting down.") else: log.msg("Received KeyboardInterrupt, exiting.") reactor.callFromThread(self.shutdown) def _handle_SIGTERM(self, *args): log.msg("Received SIGTERM, shutting down.") reactor.callFromThread(self.shutdown) def shutdown(self, graceful=False): raise NotImplementedError() def on_shutdown(self): pass def _shutdown(self): reactor.stop() self.on_shutdown() try: from mediaproxy.sipthor import SIPThorMediaRelayBase MediaRelayBase = SIPThorMediaRelayBase except ImportError: MediaRelayBase = SRVMediaRelayBase class MediaRelay(MediaRelayBase): def __init__(self): self.cred = X509Credentials(cert_name='relay') self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end) self.dispatchers = set() self.dispatcher_session_count = {} self.dispatcher_connectors = {} self.old_connectors = {} self.shutting_down = False self.graceful_shutdown = False self.start_time = time() MediaRelayBase.__init__(self) @property def status(self): if self.graceful_shutdown or self.shutting_down: return 'halting' else: return 'active' def update_dispatchers(self, dispatchers): dispatchers = set(dispatchers) for new_dispatcher in dispatchers.difference(self.dispatchers): if new_dispatcher in self.old_connectors.iterkeys(): log.debug('Restoring old dispatcher at %s:%d' % new_dispatcher) self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher) else: log.debug('Adding new dispatcher at %s:%d' % new_dispatcher) dispatcher_addr, dispatcher_port = new_dispatcher factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port) self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.cred) for old_dispatcher in self.dispatchers.difference(dispatchers): log.debug('Removing old dispatcher at %s:%d' % old_dispatcher) self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher) self._check_disconnect(old_dispatcher) self.dispatchers = dispatchers def got_command(self, dispatcher, command, headers): if command == "summary": summary = {'ip' : RelayConfig.relay_ip, 'version' : __version__, 'status' : self.status, 'uptime' : int(time() - self.start_time), 'session_count' : len(self.session_manager.sessions), 'stream_count' : self.session_manager.stream_count, 'bps_relayed' : self.session_manager.bps_relayed} return cjson.encode(summary) elif command == "sessions": return cjson.encode(self.session_manager.statistics) elif command == "update": if self.graceful_shutdown or self.shutting_down: if not self.session_manager.has_session(**headers): log.debug("cannot add new session: media-relay is shutting down") return 'halting' try: local_media = self.session_manager.update_session(dispatcher, **headers) except RelayPortsExhaustedError: log.error("Could not reserve relay ports for session, all allocated ports are being used") return "error" if local_media: return " ".join([RelayConfig.advertised_ip or local_media[0][0]] + [str(media[1]) for media in local_media]) else: # remove session = self.session_manager.remove_session(**headers) if session is None: return "error" else: return cjson.encode(session.statistics) def session_expired(self, session): connector = self.dispatcher_connectors.get(session.dispatcher) if connector is None: connector = self.old_connectors.get(session.dispatcher) if connector and connector.state == "connected": connector.transport.write(" ".join(["expired", cjson.encode(session.statistics)]) + "\r\n") else: log.warn("dispatcher for expired session is no longer online, statistics are lost!") def add_session(self, dispatcher): self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1 def remove_session(self, dispatcher): self.dispatcher_session_count[dispatcher] -= 1 if self.dispatcher_session_count[dispatcher] == 0: del self.dispatcher_session_count[dispatcher] if self.graceful_shutdown and not self.dispatcher_session_count: self.shutdown() elif dispatcher in self.old_connectors: self._check_disconnect(dispatcher) def _check_disconnect(self, dispatcher): connector = self.old_connectors[dispatcher] if self.dispatcher_session_count.get(dispatcher, 0) == 0: old_state = connector.state connector.factory.cancel_delayed() connector.disconnect() if old_state == "disconnected": del self.old_connectors[dispatcher] if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() def connector_needs_reconnect(self, connector): if connector in self.dispatcher_connectors.values(): return True else: for dispatcher, old_connector in self.old_connectors.items(): if old_connector is connector: if self.dispatcher_session_count.get(dispatcher, 0) > 0: return True else: del self.old_connectors[dispatcher] break if self.shutting_down: if len(self.old_connectors) == 0: self._shutdown() return False def shutdown(self, graceful=False): if graceful: self.graceful_shutdown = True if self.dispatcher_session_count: return if not self.shutting_down: self.shutting_down = True self.srv_monitor.cancel() self.session_manager.cleanup() if len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() else: self.update_dispatchers([]) diff --git a/mediaproxy/scheduler.py b/mediaproxy/scheduler.py index 5e7ee37..48eb6db 100644 --- a/mediaproxy/scheduler.py +++ b/mediaproxy/scheduler.py @@ -1,48 +1,48 @@ -# Copyright (C) 2007-2008 Dan Pascu +# Copyright (C) 2007-2014 Dan Pascu # """Schedule calls on the twisted reactor""" __all__ = ['RecurrentCall', 'KeepRunning'] from time import time class KeepRunning: """Return this class from a recurrent function to indicate that it should keep running""" pass class RecurrentCall(object): """Execute a function repeatedly at the given interval, until signaled to stop""" def __init__(self, period, func, *args, **kwargs): from twisted.internet import reactor self.func = func self.args = args self.kwargs = kwargs self.period = period self.now = None self.next = None self.callid = reactor.callLater(period, self) def __call__(self): from twisted.internet import reactor self.callid = None if self.now is None: self.now = time() self.next = self.now + self.period else: self.now, self.next = self.next, self.next + self.period result = self.func(*self.args, **self.kwargs) if result is KeepRunning: delay = max(self.next-time(), 0) self.callid = reactor.callLater(delay, self) def cancel(self): if self.callid is not None: try: self.callid.cancel() except ValueError: pass self.callid = None diff --git a/mediaproxy/sipthor.py b/mediaproxy/sipthor.py index b022050..fe12300 100644 --- a/mediaproxy/sipthor.py +++ b/mediaproxy/sipthor.py @@ -1,65 +1,64 @@ -# Copyright (C) 2008 AG Projects -# Author: Ruud Klaver +# Copyright (C) 2008-2014 AG Projects # """SIP Thor backend""" from application import log from gnutls.constants import COMP_LZO, COMP_DEFLATE, COMP_NULL from thor.entities import ThorEntities, GenericThorEntity from thor.eventservice import EventServiceClient, ThorEvent from thor.tls import X509Credentials from mediaproxy import __version__ from mediaproxy.configuration import ThorNetworkConfig from mediaproxy.configuration.datatypes import DispatcherIPAddress from mediaproxy.relay import SRVMediaRelayBase if ThorNetworkConfig.domain is None: ## SIP Thor is installed but disabled. Fake an ImportError to start in standalone media relay mode. log.warn("SIP Thor is installed but disabled from the configuration") raise ImportError("SIP Thor is disabled") class SIPThorMediaRelayBase(EventServiceClient, SRVMediaRelayBase): topics = ["Thor.Members"] def __init__(self): self.node = GenericThorEntity(ThorNetworkConfig.node_ip, ["media_relay"], version=__version__) self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(cert_name='relay') credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL) self.sipthor_dispatchers = [] self.additional_dispatchers = [] EventServiceClient.__init__(self, ThorNetworkConfig.domain, credentials) SRVMediaRelayBase.__init__(self) def handle_event(self, event): if not self.shutting_down: sip_proxy_ips = [node.ip for node in ThorEntities(event.message, role="sip_proxy")] self.sipthor_dispatchers = [(ip, DispatcherIPAddress.default_port) for ip in sip_proxy_ips] self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers) def _cb_got_all(self, results): if not self.shutting_down: self.additional_dispatchers = [result[1] for result in results if result[0] and result[1] is not None] self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers) def update_dispatchers(self, dispatchers): raise NotImplementedError() def _handle_SIGHUP(self, *args): SRVMediaRelayBase._handle_SIGHUP(self, *args) def _handle_SIGINT(self, *args): SRVMediaRelayBase._handle_SIGINT(self, *args) def _handle_SIGTERM(self, *args): SRVMediaRelayBase._handle_SIGTERM(self, *args) def shutdown(self, graceful=False): raise NotImplementedError() diff --git a/mediaproxy/tls.py b/mediaproxy/tls.py index 23078fd..091c0f2 100644 --- a/mediaproxy/tls.py +++ b/mediaproxy/tls.py @@ -1,86 +1,86 @@ -# Copyright (C) 2007-2008 AG Projects. +# Copyright (C) 2007-2014 AG Projects. # """TLS support""" __all__ = ['X509Credentials'] import os import stat from application.process import process from gnutls import crypto from gnutls.interfaces import twisted from mediaproxy.configuration import TLSConfig class FileDescriptor(object): def __init__(self, name, type): certs_path = os.path.normpath(TLSConfig.certs_path) self.path = os.path.join(certs_path, name) self.klass = type self.timestamp = 0 self.object = None def get(self): path = process.config_file(self.path) if path is None: raise RuntimeError("missing or unreadable file: %s" % self.path) mtime = os.stat(path)[stat.ST_MTIME] if self.timestamp < mtime: f = open(path) try: self.object = self.klass(f.read()) self.timestamp = mtime finally: f.close() return self.object class X509Entity(object): type = None def __init__(self, name_attr): self.name_attr = name_attr self.descriptors = {} def __get__(self, obj, type_=None): name = getattr(obj or type_, self.name_attr, None) if name is None: return None descriptor = self.descriptors.setdefault(name, FileDescriptor(name, self.type)) return descriptor.get() def __set__(self, obj, value): raise AttributeError("cannot set attribute") def __delete__(self, obj): raise AttributeError("cannot delete attribute") class X509Certificate(X509Entity): type = crypto.X509Certificate class X509PrivateKey(X509Entity): type = crypto.X509PrivateKey class X509CRL(X509Entity): type = crypto.X509CRL class X509Credentials(twisted.X509Credentials): """SIPThor X509 credentials""" X509cert_name = None ## will be defined by each instance X509key_name = None ## will be defined by each instance X509ca_name = 'ca.pem' X509crl_name = 'crl.pem' X509cert = X509Certificate(name_attr='X509cert_name') X509key = X509PrivateKey(name_attr='X509key_name') X509ca = X509Certificate(name_attr='X509ca_name') X509crl = X509CRL(name_attr='X509crl_name') def __init__(self, cert_name): self.X509cert_name = '%s.crt' % cert_name self.X509key_name = '%s.key' % cert_name twisted.X509Credentials.__init__(self, self.X509cert, self.X509key, [self.X509ca], [self.X509crl]) self.verify_peer = True self.verify_period = TLSConfig.verify_interval diff --git a/setup.py b/setup.py index 6848115..e625258 100644 --- a/setup.py +++ b/setup.py @@ -1,77 +1,75 @@ #!/usr/bin/python import re import sys from ctypes import CDLL from ctypes.util import find_library from distutils.core import setup as _setup, Extension # Get the title and description from README readme = open('README').read() title, description = re.findall(r'^\s*([^\n]+)\s+(.*)$', readme, re.DOTALL)[0] def get_version(): return re.search(r"""__version__\s+=\s+(?P['"])(?P.+?)(?P=quote)""", open('mediaproxy/__init__.py').read()).group('version') def get_link_libraries(): libiptc = CDLL(find_library('iptc')) libip4tc = CDLL(find_library('ip4tc')) try: libiptc.iptc_commit except AttributeError: try: libip4tc.iptc_commit except AttributeError: print 'No valid iptc library was found on the system. Please install iptables development libraries.' sys.exit(1) else: return ['netfilter_conntrack', 'ip4tc'] else: return ['netfilter_conntrack', 'iptc'] def setup(*args, **kwargs): """Mangle setup to ignore media-relay on non-linux platforms""" if not sys.platform.startswith('linux2'): print "WARNING: skipping the media relay component as this is a non-linux platform" kwargs.pop('ext_modules', None) kwargs['scripts'].remove('media-relay') _setup(*args, **kwargs) setup(name = "mediaproxy", version = get_version(), - author = "Ruud Klaver", + author = "AG Projects", author_email = "support@ag-projects.com", - maintainer = "AG Projects", - maintainer_email = "support@ag-projects.com", url = "http://www.ag-projects.com/MediaProxy.html", description = title, long_description = description, license = "GPL", platforms = ["Linux"], classifiers = [ #"Development Status :: 1 - Planning", #"Development Status :: 2 - Pre-Alpha", #"Development Status :: 3 - Alpha", #"Development Status :: 4 - Beta", "Development Status :: 5 - Production/Stable", #"Development Status :: 6 - Mature", #"Development Status :: 7 - Inactive", "Intended Audience :: Service Providers", "License :: GNU General Public License (GPL)", "Operating System :: POSIX :: Linux", "Programming Language :: Python", "Programming Language :: C" ], packages = ['mediaproxy', 'mediaproxy.configuration', 'mediaproxy.interfaces', 'mediaproxy.interfaces.accounting', 'mediaproxy.interfaces.system'], scripts = ['media-relay', 'media-dispatcher'], ext_modules = [ Extension(name = 'mediaproxy.interfaces.system._conntrack', sources = ['mediaproxy/interfaces/system/_conntrack.c'], libraries = get_link_libraries(), define_macros = [('MODULE_VERSION', get_version())]) ] )