diff --git a/INSTALL b/INSTALL index cb015ed..f8ba6fc 100644 --- a/INSTALL +++ b/INSTALL @@ -1,221 +1,221 @@ MediaProxy installation procedure --------------------------------- Copyright (c) 2008-2012 AG Projects http://ag-projects.com Authors: Ruud Klaver, Dan Pascu, Saul Ibarra Home page: http://mediaproxy.ag-projects.com For the list of changes between revisions please consult debian/changelog For information about the MediaProxy architecture, configuring and running the dispatcher and the relay, as well as details about the supported features and how to use them, please consult the README file. Prerequisites ------------- In order to build and install, MediaProxy has the following requirements: - Linux (at least 2.6.18) with the following features compiled in: - netfilter support - connection tracking support - connection tracking netlink interface - connection tracking event notification API - netfilter "NOTRACK" target support - netfilter "CONNMARK" target support - netfilter "connmark" match support - IPv4 connection tracking support - IP tables support - IP tables Full NAT support Distribution provided kernel images should normally provide of all these features as modules. The Debian kernel images have all these features available and can be used out of the box. - libnetfilter-conntrack (at least version 0.0.89) Most of the Linux distributions separate a library package into runtime and development packages. To build MediaProxy, the development version is needed (it usually has a -dev suffix in the package name). - iptables (at least version 1.4.3) To build MediaProxy, the development version is needed (usually has a -dev suffix in the package name). For running the development package is not needed, only plain iptables is enough. - Python (at least 2.5) http://python.org - Twisted framework (at least 2.5.0 with epollreactor support) http://twistedmatrix.com - python-zopeinterface (this is also a requirement for twisted) http://zope.org/Products/ZopeInterface - python-application (at least 1.2.8) http://pypi.python.org/pypi/python-application - GNU-TLS http://www.gnu.org/software/gnutls - - python-gnutls (at least 1.1.8) + - python-gnutls (at least 3.0.0) http://pypi.python.org/pypi/python-gnutls - python-cjson http://pypi.python.org/pypi/python-cjson For the database accounting module: - SQLObject http://sqlobject.org For the RADIUS accounting module: - pyrad (at least 1.1) http://www.wiggy.net/code/pyrad/ Installation ------------ For people running Debian or Ubuntu on an i386 or amd64 architecture there are official public repositories provided by AG Projects. Modify your /etc/apt/sources.list depending on the distribution you are using, check here for the appropriate lines: http://mediaproxy.ag-projects.com/wiki/InstallationGuide Install the AG Projects debian software signing key: wget http://download.ag-projects.com/agp-debian-gpg.key apt-key add agp-debian-gpg.key After that, run: apt-get update apt-get install mediaproxy-dispatcher mediaproxy-relay mediaproxy-web-sessions to install all the packages, or you can install only the packages you actually need on that specific system. In case you want to build your own, please look below to Packaging section. Installing from source ---------------------- When installing from source, first make sure the above mentioned prerequisites are installed. If the distribution you are running has them already packaged, you should install the distribution provided packages, else you'll have to install them from source. If you install them as packages, make sure that you also install the development versions for python and libnetfilter-conntrack in order to be able to build MediaProxy. If you have to install something from source, please consult the installation instructions for that specific package in order to find out how to install it. For python packages there is a simple method to install them by running easy_install (make sure to run them as root): easy_install twisted easy_install zope.interface easy_install python-application easy_install python-cjson easy_install python-gnutls # this needs libgnutls-dev >= 2.4.1 installed easy_install sqlobject easy_install pyrad All of the above should work out of the box, except python-gnutls which needs libgnutls-dev at least version 2.4.1 to be installed to succeed. An alternative method to install the python packages is to download, unpack and run (as root): ./setup.py build; ./setup.py install for each of them in the directories where they were unpacked. It should be noted that this only needs to be done for the packages that are not provided already by your distribution, otherwise it is recommended to use the distribution provided packages unless they do not meet the minimum version requirements mentioned above or if they exhibit problems at runtime. After all the prerequisites are installed, MediaProxy can be installed either as a system wide package or in a standalone directory. 1. To install it as a system wide package, run (as root): ./setup.py build ./setup.py install in the directory where you unpacked MediaProxy. 2. To install in a standalone directory, unpack MediaProxy to the directory where you want it placed. Then change to that directory and run: ./build_inplace After this MediaProxy components can be run from that directory. In both cases, you can use the Debian startup scripts in the Debian subdirectory, mediaproxy-dispatcher.init and mediaproxy-relay.init as examples to create your own startup scripts for your distribution. Packaging --------- The MediaProxy source already includes the necessary files to build Debian packages. They should probably also work without changes for Ubuntu, though they have not been tested with it. To build Debian/Ubuntu packages, you can do the following (this is known to work with Debian testing and unstable and should work without changes in Ubuntu 8.04 Hardy as well, though they were not tested there): apt-get update apt-get install devscripts cdbs debhelper python-all-dev python-support \ iptables-dev libnetfilter-conntrack-dev python-application python-cjson \ python-gnutls python-twisted-core python-twisted-names \ python-zopeinterface python-pyrad python-sqlobject then unpack MediaProxy and in the directory where it was unpacked run: debuild -us -uc You can safely ignore the pgp signing error at the end of the build process, that is only because you do not have the pgp key for the person who is listed as maintainer for the package. The packages are build fine even if they are not signed. After building them, you can find the .deb packages in the parent directory, from where you can install them using dpkg: cd ../ dpkg -i mediaproxy-*.deb or you can install just the ones you need on that particular system. Please note that mediaproxy-dispatcher and mediaproxy-relay both depend on mediaproxy-common so you have to install it too along with either of them. Configuration file ------------------ The configuration file is named config.ini and a config.ini.sample file is provided in the source. You can copy config.ini.sample to config.ini and modify it to suit your needs. The sample configuration file is commented and self-explanatory. Both the dispatcher and the relay read their configuration from the same file but from different sections. If either of them is not installed on a given system, its specific sections are ignored, so you only need to configure the sections for the installed component(s). MediaProxy will look for both a local configuration file, which is placed in the same directory as the media-relay and media-dispather scripts, and a system configuration file which is placed in /etc/mediaproxy/ Even though a local configuration file can be used in any case, it only makes sense to be used in the standalone installation case, where MediaProxy lives in its own directory and there is a reason to contain all the MediaProxy related files to a single directory. For system wide installations, where the media-relay and media-dispatcher scripts reside in /usr/bin or /usr/local/bin, it makes little sense to place a local configuration file there, so in this case using the system configuration file in /etc/mediaproxy/config.ini is recommended. When both configuration files are present, both will be read and the settings in the local configuration will override the ones in the system configuration. diff --git a/debian/control b/debian/control index 082da5b..82ae486 100644 --- a/debian/control +++ b/debian/control @@ -1,127 +1,127 @@ Source: mediaproxy Section: net Priority: optional Maintainer: Dan Pascu Uploaders: Adrian Georgescu , Saul Ibarra Build-Depends: debhelper (>= 7.3.5), python-all-dev (>= 2.7), python-all-dbg (>= 2.7), libnetfilter-conntrack-dev (>= 0.0.89), iptables-dev (>=1.4.3) Standards-Version: 3.9.6 Package: mediaproxy-common Architecture: any -Depends: ${python:Depends}, ${shlibs:Depends}, ${misc:Depends}, iptables (>= 1.4.3 ), python-application (>= 1.2.8), python-cjson, python-gnutls (>= 1.1.8), python-twisted-core (>= 2.5.0), python-twisted-names, python-zope.interface +Depends: ${python:Depends}, ${shlibs:Depends}, ${misc:Depends}, iptables (>= 1.4.3 ), python-application (>= 1.2.8), python-cjson, python-gnutls (>= 3.0.0), python-twisted-core (>= 2.5.0), python-twisted-names, python-zope.interface Recommends: python-pyrad (>= 1.1), python-sqlobject Description: MediaProxy common files MediaProxy is a distributed far end NAT traversal solution for media streams of SIP calls. MediaProxy has a dispatcher running on the same host with the OpenSIPS proxy and multiple media relays distributed over the network. The media relays work by manipulating conntrack rules in the Linux kernel to create paths that forward the media streams between the 2 SIP user agents participating in the call. Because it avoids to copy stream data from kernel space to user space and back to kernel space like other implementations, MediaProxy can handle much more media streams at a time, limited only to the network interface bandwidth and the Linux kernel network layer processing speed. . MediaProxy features secure encrypted communication between the dispatcher and the relays, advanced accounting capabilities using multiple backends, support for any combination of audio and video streams, realtime statistics, T.38 fax support as well as automatic load balancing and redundancy among the active relays. . This package includes files common to all MediaProxy packages. Package: mediaproxy-common-dbg Architecture: any Priority: extra Section: debug Depends: ${shlibs:Depends}, ${misc:Depends}, mediaproxy-common (= ${binary:Version}) Recommends: python-all-dbg Description: MediaProxy common files MediaProxy is a distributed far end NAT traversal solution for media streams of SIP calls. MediaProxy has a dispatcher running on the same host with the OpenSIPS proxy and multiple media relays distributed over the network. The media relays work by manipulating conntrack rules in the Linux kernel to create paths that forward the media streams between the 2 SIP user agents participating in the call. Because it avoids to copy stream data from kernel space to user space and back to kernel space like other implementations, MediaProxy can handle much more media streams at a time, limited only to the network interface bandwidth and the Linux kernel network layer processing speed. . MediaProxy features secure encrypted communication between the dispatcher and the relays, advanced accounting capabilities using multiple backends, support for any combination of audio and video streams, realtime statistics, T.38 fax support as well as automatic load balancing and redundancy among the active relays. . This package includes files common to all MediaProxy packages. Package: mediaproxy-dispatcher Architecture: all Depends: ${python:Depends}, ${misc:Depends}, mediaproxy-common (>= ${source:Version}), lsb-base Description: MediaProxy dispatcher MediaProxy is a distributed far end NAT traversal solution for media streams of SIP calls. MediaProxy has a dispatcher running on the same host with the OpenSIPS proxy and multiple media relays distributed over the network. The media relays work by manipulating conntrack rules in the Linux kernel to create paths that forward the media streams between the 2 SIP user agents participating in the call. Because it avoids to copy stream data from kernel space to user space and back to kernel space like other implementations, MediaProxy can handle much more media streams at a time, limited only to the network interface bandwidth and the Linux kernel network layer processing speed. . MediaProxy features secure encrypted communication between the dispatcher and the relays, advanced accounting capabilities using multiple backends, support for any combination of audio and video streams, realtime statistics, T.38 fax support as well as automatic load balancing and redundancy among the active relays. . This package provides the MediaProxy dispatcher. Package: mediaproxy-relay Architecture: all Depends: ${python:Depends}, ${misc:Depends}, mediaproxy-common (>= ${source:Version}), lsb-base Description: MediaProxy relay MediaProxy is a distributed far end NAT traversal solution for media streams of SIP calls. MediaProxy has a dispatcher running on the same host with the OpenSIPS proxy and multiple media relays distributed over the network. The media relays work by manipulating conntrack rules in the Linux kernel to create paths that forward the media streams between the 2 SIP user agents participating in the call. Because it avoids to copy stream data from kernel space to user space and back to kernel space like other implementations, MediaProxy can handle much more media streams at a time, limited only to the network interface bandwidth and the Linux kernel network layer processing speed. . MediaProxy features secure encrypted communication between the dispatcher and the relays, advanced accounting capabilities using multiple backends, support for any combination of audio and video streams, realtime statistics, T.38 fax support as well as automatic load balancing and redundancy among the active relays. . This package provides the MediaProxy relay. Package: mediaproxy-web-sessions Architecture: all Depends: ${misc:Depends}, php5 | php5-cgi Description: MediaProxy sessions web page MediaProxy is a distributed far end NAT traversal solution for media streams of SIP calls. MediaProxy has a dispatcher running on the same host with the OpenSIPS proxy and multiple media relays distributed over the network. The media relays work by manipulating conntrack rules in the Linux kernel to create paths that forward the media streams between the 2 SIP user agents participating in the call. Because it avoids to copy stream data from kernel space to user space and back to kernel space like other implementations, MediaProxy can handle much more media streams at a time, limited only to the network interface bandwidth and the Linux kernel network layer processing speed. . MediaProxy features secure encrypted communication between the dispatcher and the relays, advanced accounting capabilities using multiple backends, support for any combination of audio and video streams, realtime statistics, T.38 fax support as well as automatic load balancing and redundancy among the active relays. . This package provides a simple web page to display active media sessions. diff --git a/mediaproxy/dispatcher.py b/mediaproxy/dispatcher.py index 8d58d01..0a9e7b1 100644 --- a/mediaproxy/dispatcher.py +++ b/mediaproxy/dispatcher.py @@ -1,548 +1,550 @@ """Implementation of the MediaProxy dispatcher""" import random import signal import cPickle as pickle import cjson from collections import deque from itertools import ifilter from time import time from application import log from application.process import process from application.system import unlink from gnutls.errors import CertificateSecurityError +from gnutls.interfaces.twisted import TLSContext from twisted.protocols.basic import LineOnlyReceiver from twisted.python import failure from twisted.internet.error import ConnectionDone, TCPTimedOutError from twisted.internet.protocol import Factory from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed from twisted.internet import reactor from mediaproxy import __version__ from mediaproxy.configuration import DispatcherConfig from mediaproxy.interfaces import opensips from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials log.msg("Twisted is using %s" % reactor.__module__.rsplit('.', 1)[-1]) class ControlProtocol(LineOnlyReceiver): noisy = False def __init__(self): self.in_progress = 0 def lineReceived(self, line): raise NotImplementedError() def connectionLost(self, reason): log.debug("Connection to %s lost: %s" % (self.description, reason.value)) self.factory.connection_lost(self) def reply(self, reply): self.transport.write(reply + "\r\n") def _relay_error(self, failure): failure.trap(RelayError) log.error(failure.value) self.transport.write("error\r\n") def _catch_all(self, failure): log.error(failure.getBriefTraceback()) self.transport.write("error\r\n") def _decrement(self, result): self.in_progress = 0 if self.factory.shutting_down: self.transport.loseConnection() def _add_callbacks(self, defer): defer.addCallback(self.reply) defer.addErrback(self._relay_error) defer.addErrback(self._catch_all) defer.addBoth(self._decrement) class OpenSIPSControlProtocol(ControlProtocol): description = "OpenSIPS" def __init__(self): self.line_buf = [] ControlProtocol.__init__(self) def lineReceived(self, line): if line == "": if self.line_buf: self.in_progress += 1 defer = self.factory.dispatcher.send_command(self.line_buf[0], self.line_buf[1:]) self._add_callbacks(defer) self.line_buf = [] elif not line.endswith(": "): self.line_buf.append(line) class ManagementControlProtocol(ControlProtocol): description = "Management interface client" def connectionMade(self): if DispatcherConfig.management_use_tls and DispatcherConfig.management_passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.management_passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return def lineReceived(self, line): if line in ["quit", "exit"]: self.transport.loseConnection() elif line == "summary": defer = self.factory.dispatcher.relay_factory.get_summary() self._add_callbacks(defer) elif line == "sessions": defer = self.factory.dispatcher.relay_factory.get_statistics() self._add_callbacks(defer) elif line == "version": self.reply(__version__) else: log.error("Unknown command on management interface: %s" % line) self.reply("error") class ControlFactory(Factory): noisy = False def __init__(self, dispatcher): self.dispatcher = dispatcher self.protocols = [] self.shutting_down = False def buildProtocol(self, addr): prot = Factory.buildProtocol(self, addr) self.protocols.append(prot) return prot def connection_lost(self, prot): self.protocols.remove(prot) if self.shutting_down and len(self.protocols) == 0: self.defer.callback(None) def shutdown(self): if self.shutting_down: return self.shutting_down = True if len(self.protocols) == 0: return succeed(None) else: for prot in self.protocols: if prot.in_progress == 0: prot.transport.loseConnection() self.defer = Deferred() return self.defer class OpenSIPSControlFactory(ControlFactory): protocol = OpenSIPSControlProtocol class ManagementControlFactory(ControlFactory): protocol = ManagementControlProtocol class RelayError(Exception): pass class ConnectionReplaced(ConnectionDone): pass class RelayServerProtocol(LineOnlyReceiver): noisy = False MAX_LENGTH = 4096*1024 ## (4MB) def __init__(self): self.commands = {} self.halting = False self.timedout = False self.disconnect_timer = None self.sequence_number = 0 self.authenticated = False @property def active(self): return not self.halting and not self.timedout def send_command(self, command, headers): log.debug('Issuing "%s" command to relay at %s' % (command, self.ip)) seq = str(self.sequence_number) self.sequence_number += 1 defer = Deferred() timer = reactor.callLater(DispatcherConfig.relay_timeout, self._timeout, seq, defer) self.commands[seq] = (command, defer, timer) self.transport.write("\r\n".join([" ".join([command, seq])] + headers + ["", ""])) return defer def _timeout(self, seq, defer): del self.commands[seq] defer.errback(RelayError("Relay at %s timed out" % self.ip)) if self.timedout is False: self.timedout = True self.disconnect_timer = reactor.callLater(DispatcherConfig.relay_recover_interval, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) def connectionMade(self): if DispatcherConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return self.authenticated = True self.factory.new_relay(self) def lineReceived(self, line): try: first, rest = line.split(" ", 1) except ValueError: first = line rest = "" if first == "expired": try: stats = cjson.decode(rest) except cjson.DecodeError: log.error("Error decoding JSON from relay at %s" % self.ip) else: call_id = stats['call_id'] session = self.factory.sessions.get(call_id, None) if session is None: log.error("Unknown session with call_id %s expired at relay %s" % (call_id, self.ip)) return if session.relay_ip != self.ip: log.error("session with call_id %s expired at relay %s, but is actually at relay %s, ignoring" % (call_id, self.ip, session.relay_ip)) return all_streams_ice = all(stream_info["status"] == "unselected ICE candidate" for stream_info in stats["streams"]) if all_streams_ice: log.msg("session with call_id %s from relay %s removed because ICE was used" % (call_id, session.relay_ip)) stats["timed_out"] = False else: log.msg("session with call_id %s from relay %s did timeout" % (call_id, session.relay_ip)) stats["timed_out"] = True stats["dialog_id"] = session.dialog_id stats["all_streams_ice"] = all_streams_ice self.factory.dispatcher.update_statistics(stats) if session.dialog_id is not None and stats["start_time"] is not None and not all_streams_ice: self.factory.dispatcher.opensips_management.end_dialog(session.dialog_id) session.expire_time = time() else: del self.factory.sessions[call_id] return elif first == "ping": if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.transport.write("pong\r\n") return try: command, defer, timer = self.commands.pop(first) except KeyError: log.error("Got unexpected response from relay at %s: %s" % (self.ip, line)) return timer.cancel() if rest == "error": defer.errback(RelayError("Received error from relay at %s in response to `%s' command" % (self.ip, command))) elif rest == "halting": self.halting = True defer.errback(RelayError("Relay at %s is shutting down" % self.ip)) elif command == "remove": try: stats = cjson.decode(rest) except cjson.DecodeError: log.error("Error decoding JSON from relay at %s" % self.ip) else: call_id = stats['call_id'] session = self.factory.sessions[call_id] stats["dialog_id"] = session.dialog_id stats["timed_out"] = False self.factory.dispatcher.update_statistics(stats) del self.factory.sessions[call_id] defer.callback("removed") else: # update command defer.callback(rest) def connectionLost(self, reason): if reason.type == ConnectionDone: log.msg("Connection with relay at %s was closed" % self.ip) elif reason.type == ConnectionReplaced: log.warn("Old connection with relay at %s was lost" % self.ip) else: log.error("Connection with relay at %s was lost: %s" % (self.ip, reason.value)) for command, defer, timer in self.commands.itervalues(): timer.cancel() defer.errback(RelayError("Relay at %s disconnected" % self.ip)) if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.factory.connection_lost(self) class DialogID(str): def __new__(cls, did): if did is None: return None try: h_entry, h_id = did.split(':') except: log.error("invalid dialog_id value: `%s'" % did) return None instance = str.__new__(cls, did) instance.h_entry = h_entry instance.h_id = h_id return instance class RelaySession(object): def __init__(self, relay_ip, command_headers): self.relay_ip = relay_ip self.dialog_id = DialogID(command_headers.get('dialog_id')) self.expire_time = None class RelayFactory(Factory): noisy = False protocol = RelayServerProtocol def __init__(self, dispatcher): self.dispatcher = dispatcher self.relays = {} self.shutting_down = False state_file = process.runtime_file("dispatcher_state") try: self.sessions = pickle.load(open(state_file)) except: self.sessions = {} self.cleanup_timers = {} else: self.cleanup_timers = dict((ip, reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, ip)) for ip in set(session.relay_ip for session in self.sessions.itervalues())) unlink(state_file) self.expired_cleaner = RecurrentCall(600, self._remove_expired_sessions) def _remove_expired_sessions(self): now, limit = time(), DispatcherConfig.cleanup_expired_sessions_after obsolete = [k for k, s in ifilter(lambda (k, s): s.expire_time and (now-s.expire_time>=limit), self.sessions.iteritems())] if obsolete: [self.sessions.pop(call_id) for call_id in obsolete] log.warn("found %d expired sessions which were not removed during the last %d hours" % (len(obsolete), round(limit/3600.0))) return KeepRunning def buildProtocol(self, addr): ip = addr.host log.debug("Connection from relay at %s" % ip) prot = Factory.buildProtocol(self, addr) prot.ip = ip return prot def new_relay(self, relay): old_relay = self.relays.pop(relay.ip, None) if old_relay is not None: log.warn("Relay at %s reconnected, closing old connection" % relay.ip) reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced("relay reconnected"))) self.relays[relay.ip] = relay timer = self.cleanup_timers.pop(relay.ip, None) if timer is not None: timer.cancel() defer = relay.send_command("sessions", []) defer.addCallback(self._cb_purge_sessions, relay.ip) def _cb_purge_sessions(self, result, relay_ip): relay_sessions = cjson.decode(result) relay_call_ids = [session["call_id"] for session in relay_sessions] for session_id, session in self.sessions.items(): if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids: log.warn("Session %s is no longer on relay %s, statistics are probably lost" % (session_id, relay_ip)) if session.dialog_id is not None: self.dispatcher.opensips_management.end_dialog(session.dialog_id) del self.sessions[session_id] def send_command(self, command, headers): try: parsed_headers = dict(header.split(": ", 1) for header in headers) except: raise RelayError("Could not parse headers from OpenSIPs") try: call_id = parsed_headers["call_id"] except KeyError: raise RelayError("Missing call_id header") session = self.sessions.get(call_id, None) if session and session.expire_time is None: relay = session.relay_ip if relay not in self.relays: raise RelayError("Relay for this session (%s) is no longer connected" % relay) return self.relays[relay].send_command(command, headers) ## We do not have a session for this call_id or the session is already expired if command == "update": preferred_relay = parsed_headers.get("media_relay") try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay) random.shuffle(try_relays) if preferred_relay is not None: protocol = self.relays.get(preferred_relay) if protocol is not None and protocol.active: try_relays.appendleft(protocol) else: log.warn("user requested media_relay %s is not available" % preferred_relay) defer = self._try_next(try_relays, command, headers) defer.addCallback(self._add_session, try_relays, call_id, parsed_headers) return defer elif command == 'remove' and session: ## This is the remove we received for an expired session for which we triggered dialog termination del self.sessions[call_id] return 'removed' else: raise RelayError("Got `%s' command from OpenSIPS for unknown session with call-id `%s'" % (command, call_id)) def _add_session(self, result, try_relays, call_id, parsed_headers): self.sessions[call_id] = RelaySession(try_relays[0].ip, parsed_headers) return result def _relay_error(self, failure, try_relays, command, headers): failure.trap(RelayError) failed_relay = try_relays.popleft() log.warn("Relay %s failed: %s" % (failed_relay, failure.value)) return self._try_next(try_relays, command, headers) def _try_next(self, try_relays, command, headers): if len(try_relays) == 0: raise RelayError("No suitable relay found") defer = try_relays[0].send_command(command, headers) defer.addErrback(self._relay_error, try_relays, command, headers) return defer def get_summary(self): defer = DeferredList([relay.send_command("summary", []).addErrback(self._summary_error, ip) for ip, relay in self.relays.iteritems()]) defer.addCallback(self._got_summaries) return defer def _summary_error(self, failure, ip): log.error("Error processing query at relay %s: %s" % (ip, failure.value)) return cjson.encode(dict(status="error", ip=ip)) def _got_summaries(self, results): return "[%s]" % ', '.join(result for succeeded, result in results if succeeded) def get_statistics(self): defer = DeferredList([relay.send_command("sessions", []) for relay in self.relays.itervalues()]) defer.addCallback(self._got_statistics) return defer def _got_statistics(self, results): return "[%s]" % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result!='[]') def connection_lost(self, relay): if relay not in self.relays.itervalues(): return if relay.authenticated: del self.relays[relay.ip] if self.shutting_down: if len(self.relays) == 0: self.defer.callback(None) else: self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip) def _do_cleanup(self, ip): log.debug("Doing cleanup for old relay %s" % ip) del self.cleanup_timers[ip] for call_id in [call_id for call_id, session in self.sessions.items() if session.relay_ip == ip]: del self.sessions[call_id] def shutdown(self): if self.shutting_down: return self.shutting_down = True for timer in self.cleanup_timers.itervalues(): timer.cancel() if len(self.relays) == 0: retval = succeed(None) else: for prot in self.relays.itervalues(): prot.transport.loseConnection() self.defer = Deferred() retval = self.defer retval.addCallback(self._save_state) return retval def _save_state(self, result): pickle.dump(self.sessions, open(process.runtime_file("dispatcher_state"), "w")) class Dispatcher(object): def __init__(self): self.accounting = [__import__("mediaproxy.interfaces.accounting.%s" % mod.lower(), globals(), locals(), [""]).Accounting() for mod in set(DispatcherConfig.accounting)] self.cred = X509Credentials(cert_name='dispatcher') + self.tls_context = TLSContext(self.cred) self.relay_factory = RelayFactory(self) dispatcher_addr, dispatcher_port = DispatcherConfig.listen - self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.cred, interface=dispatcher_addr) + self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr) self.opensips_factory = OpenSIPSControlFactory(self) socket_path = process.runtime_file(DispatcherConfig.socket_path) unlink(socket_path) self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory) self.opensips_management = opensips.ManagementInterface() self.management_factory = ManagementControlFactory(self) management_addr, management_port = DispatcherConfig.listen_management if DispatcherConfig.management_use_tls: - self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.cred, interface=management_addr) + self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.tls_context, interface=management_addr) else: self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr) def run(self): process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) for accounting_module in self.accounting: accounting_module.start() reactor.run(installSignalHandlers=False) def send_command(self, command, headers): return maybeDeferred(self.relay_factory.send_command, command, headers) def update_statistics(self, stats): log.debug("Got statistics: %s" % stats) if stats["start_time"] is not None: for accounting in self.accounting: try: accounting.do_accounting(stats) except Exception, e: log.error("An unhandled error occured while doing accounting: %s" % e) log.err() def _handle_SIGHUP(self, *args): log.msg("Received SIGHUP, shutting down.") reactor.callFromThread(self._shutdown) def _handle_SIGINT(self, *args): if process._daemon: log.msg("Received SIGINT, shutting down.") else: log.msg("Received KeyboardInterrupt, exiting.") reactor.callFromThread(self._shutdown) def _handle_SIGTERM(self, *args): log.msg("Received SIGTERM, shutting down.") reactor.callFromThread(self._shutdown) def _shutdown(self): defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None]) defer.addCallback(lambda x: self.opensips_factory.shutdown()) defer.addCallback(lambda x: self.management_factory.shutdown()) defer.addCallback(lambda x: self.relay_factory.shutdown()) defer.addCallback(lambda x: self._stop()) def _stop(self): for act in self.accounting: act.stop() reactor.stop() diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py index 277e59f..da8b997 100644 --- a/mediaproxy/relay.py +++ b/mediaproxy/relay.py @@ -1,386 +1,388 @@ """Implementation of the MediaProxy relay""" import cjson import signal import resource from time import time try: from twisted.internet import epollreactor; epollreactor.install() except: raise RuntimeError("mandatory epoll reactor support is missing from the twisted framework") from application import log from application.process import process from gnutls.errors import CertificateError, CertificateSecurityError +from gnutls.interfaces.twisted import TLSContext from twisted.protocols.basic import LineOnlyReceiver from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError from twisted.internet.protocol import ClientFactory from twisted.internet.defer import DeferredList, succeed from twisted.internet import reactor from twisted.python import failure from twisted.names import dns from twisted.names.client import lookupService from twisted.names.error import DomainError from mediaproxy import __version__ from mediaproxy.configuration import RelayConfig from mediaproxy.headers import DecodingDict, DecodingError from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials ## Increase the system limit for the maximum number of open file descriptors ## to be able to handle connections to all ports in port_range try: fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000 resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit)) except ValueError: raise RuntimeError("Cannot set resource limit for maximum open file descriptors to %d" % fd_limit) else: new_limits = resource.getrlimit(resource.RLIMIT_NOFILE) if new_limits < (fd_limit, fd_limit): raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit)) else: log.msg("Set resource limit for maximum open file descriptors to %d" % fd_limit) class RelayClientProtocol(LineOnlyReceiver): noisy = False required_headers = {'update': set(['call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type']), 'remove': set(['call_id', 'from_tag']), 'summary': set(), 'sessions': set()} def __init__(self): self.command = None self.seq = None self._connection_watcher = None self._queued_keepalives = 0 def _send_keepalive(self): if self._queued_keepalives >= 3: # 3 keepalives in a row didn't get an answer. assume connection is down. log.error("missed 3 keepalive answers in a row. assuming the connection is down.") # do not use loseConnection() as it waits to flush the output buffers. reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) return None self.transport.write("ping\r\n") self._queued_keepalives += 1 return KeepRunning def connectionMade(self): peer = self.transport.getPeer() log.debug("Connected to dispatcher at %s:%d" % (peer.host, peer.port)) if RelayConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not RelayConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive) def connectionLost(self, reason): if self._connection_watcher is not None: self._connection_watcher.cancel() self._connection_watcher = None self._queued_keepalives = 0 def lineReceived(self, line): if line == 'pong': self._queued_keepalives -= 1 return if self.command is None: try: command, seq = line.split() except ValueError: log.error("Could not decode command/sequence number pair from dispatcher: %s" % line) return if command in self.required_headers: self.command = command self.seq = seq self.headers = DecodingDict() else: log.error("Unknown command: %s" % command) self.transport.write("%s error\r\n" % seq) elif line == "": try: missing_headers = self.required_headers[self.command].difference(self.headers) if missing_headers: for header in missing_headers: log.error("Missing mandatory header '%s' from '%s' command" % (header, self.command)) response = "error" else: try: response = self.factory.parent.got_command(self.factory.host, self.command, self.headers) except: log.err() response = "error" finally: self.transport.write("%s %s\r\n" % (self.seq, response)) self.command = None else: try: name, value = line.split(": ", 1) except ValueError: log.error("Unable to parse header: %s" % line) else: try: self.headers[name] = value except DecodingError, e: log.error("Could not decode header: %s" % e) class DispatcherConnectingFactory(ClientFactory): noisy = False protocol = RelayClientProtocol def __init__(self, parent, host, port): self.parent = parent self.host = (host, port) self.delayed = None self.connection_lost = False def __eq__(self, other): return self.host == other.host def clientConnectionFailed(self, connector, reason): log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value)) if self.parent.connector_needs_reconnect(connector): self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) def clientConnectionLost(self, connector, reason): self.cancel_delayed() if reason.type != ConnectionDone: log.error("Connection with dispatcher at %(host)s:%(port)d was lost: %%s" % connector.__dict__ % reason.value) else: log.msg("Connection with dispatcher at %(host)s:%(port)d was closed" % connector.__dict__) if self.parent.connector_needs_reconnect(connector): if isinstance(reason.value, CertificateError) or self.connection_lost: self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) else: self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect) self.connection_lost = True def buildProtocol(self, addr): self.delayed = reactor.callLater(5, self._connected_successfully) return ClientFactory.buildProtocol(self, addr) def _connected_successfully(self): self.connection_lost = False def cancel_delayed(self): if self.delayed: if self.delayed.active(): self.delayed.cancel() self.delayed = None class SRVMediaRelayBase(object): def __init__(self): self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup) self._do_lookup() def _do_lookup(self): defers = [] for addr, port, is_domain in RelayConfig.dispatchers: if is_domain: defer = lookupService("_sip._udp.%s" % addr) defer.addCallback(self._cb_got_srv, port) defer.addErrback(self._eb_no_srv, addr, port) defers.append(defer) else: defers.append(succeed((addr, port))) defer = DeferredList(defers) defer.addCallback(self._cb_got_all) return KeepRunning def _cb_got_srv(self, (answers, auth, add), port): for answer in answers: if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."): return str(answer.payload.target), port raise DomainError def _eb_no_srv(self, failure, addr, port): failure.trap(DomainError) return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr) def _eb_no_dns(self, failure, addr): failure.trap(DNSLookupError) log.error("Could resolve neither SRV nor A record for '%s'" % addr) def _cb_got_all(self, results): if not self.shutting_down: dispatchers = [result[1] for result in results if result[0] and result[1] is not None] self.update_dispatchers(dispatchers) def update_dispatchers(self, dispatchers): raise NotImplementedError() def run(self): process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) reactor.run(installSignalHandlers=False) def _handle_SIGHUP(self, *args): log.msg("Received SIGHUP, shutting down after all sessions have expired.") reactor.callFromThread(self.shutdown, graceful=True) def _handle_SIGINT(self, *args): if process._daemon: log.msg("Received SIGINT, shutting down.") else: log.msg("Received KeyboardInterrupt, exiting.") reactor.callFromThread(self.shutdown) def _handle_SIGTERM(self, *args): log.msg("Received SIGTERM, shutting down.") reactor.callFromThread(self.shutdown) def shutdown(self, graceful=False): raise NotImplementedError() def on_shutdown(self): pass def _shutdown(self): reactor.stop() self.on_shutdown() try: from mediaproxy.sipthor import SIPThorMediaRelayBase MediaRelayBase = SIPThorMediaRelayBase except ImportError: MediaRelayBase = SRVMediaRelayBase class MediaRelay(MediaRelayBase): def __init__(self): self.cred = X509Credentials(cert_name='relay') + self.tls_context = TLSContext(self.cred) self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end) self.dispatchers = set() self.dispatcher_session_count = {} self.dispatcher_connectors = {} self.old_connectors = {} self.shutting_down = False self.graceful_shutdown = False self.start_time = time() MediaRelayBase.__init__(self) @property def status(self): if self.graceful_shutdown or self.shutting_down: return 'halting' else: return 'active' def update_dispatchers(self, dispatchers): dispatchers = set(dispatchers) for new_dispatcher in dispatchers.difference(self.dispatchers): if new_dispatcher in self.old_connectors.iterkeys(): log.debug('Restoring old dispatcher at %s:%d' % new_dispatcher) self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher) else: log.debug('Adding new dispatcher at %s:%d' % new_dispatcher) dispatcher_addr, dispatcher_port = new_dispatcher factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port) - self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.cred) + self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.tls_context) for old_dispatcher in self.dispatchers.difference(dispatchers): log.debug('Removing old dispatcher at %s:%d' % old_dispatcher) self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher) self._check_disconnect(old_dispatcher) self.dispatchers = dispatchers def got_command(self, dispatcher, command, headers): if command == "summary": summary = {'ip' : RelayConfig.relay_ip, 'version' : __version__, 'status' : self.status, 'uptime' : int(time() - self.start_time), 'session_count' : len(self.session_manager.sessions), 'stream_count' : self.session_manager.stream_count, 'bps_relayed' : self.session_manager.bps_relayed} return cjson.encode(summary) elif command == "sessions": return cjson.encode(self.session_manager.statistics) elif command == "update": if self.graceful_shutdown or self.shutting_down: if not self.session_manager.has_session(**headers): log.debug("cannot add new session: media-relay is shutting down") return 'halting' try: local_media = self.session_manager.update_session(dispatcher, **headers) except RelayPortsExhaustedError: log.error("Could not reserve relay ports for session, all allocated ports are being used") return "error" if local_media: return " ".join([RelayConfig.advertised_ip or local_media[0][0]] + [str(media[1]) for media in local_media]) else: # remove session = self.session_manager.remove_session(**headers) if session is None: return "error" else: return cjson.encode(session.statistics) def session_expired(self, session): connector = self.dispatcher_connectors.get(session.dispatcher) if connector is None: connector = self.old_connectors.get(session.dispatcher) if connector and connector.state == "connected": connector.transport.write(" ".join(["expired", cjson.encode(session.statistics)]) + "\r\n") else: log.warn("dispatcher for expired session is no longer online, statistics are lost!") def add_session(self, dispatcher): self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1 def remove_session(self, dispatcher): self.dispatcher_session_count[dispatcher] -= 1 if self.dispatcher_session_count[dispatcher] == 0: del self.dispatcher_session_count[dispatcher] if self.graceful_shutdown and not self.dispatcher_session_count: self.shutdown() elif dispatcher in self.old_connectors: self._check_disconnect(dispatcher) def _check_disconnect(self, dispatcher): connector = self.old_connectors[dispatcher] if self.dispatcher_session_count.get(dispatcher, 0) == 0: old_state = connector.state connector.factory.cancel_delayed() connector.disconnect() if old_state == "disconnected": del self.old_connectors[dispatcher] if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() def connector_needs_reconnect(self, connector): if connector in self.dispatcher_connectors.values(): return True else: for dispatcher, old_connector in self.old_connectors.items(): if old_connector is connector: if self.dispatcher_session_count.get(dispatcher, 0) > 0: return True else: del self.old_connectors[dispatcher] break if self.shutting_down: if len(self.old_connectors) == 0: self._shutdown() return False def shutdown(self, graceful=False): if graceful: self.graceful_shutdown = True if self.dispatcher_session_count: return if not self.shutting_down: self.shutting_down = True self.srv_monitor.cancel() self.session_manager.cleanup() if len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() else: self.update_dispatchers([]) diff --git a/mediaproxy/sipthor.py b/mediaproxy/sipthor.py index 5d2ed87..c3bb199 100644 --- a/mediaproxy/sipthor.py +++ b/mediaproxy/sipthor.py @@ -1,62 +1,60 @@ """SIP Thor backend""" from application import log -from gnutls.constants import COMP_LZO, COMP_DEFLATE, COMP_NULL from thor.entities import ThorEntities, GenericThorEntity from thor.eventservice import EventServiceClient, ThorEvent from thor.tls import X509Credentials from mediaproxy import __version__ from mediaproxy.configuration import ThorNetworkConfig from mediaproxy.configuration.datatypes import DispatcherIPAddress from mediaproxy.relay import SRVMediaRelayBase if ThorNetworkConfig.domain is None: ## SIP Thor is installed but disabled. Fake an ImportError to start in standalone media relay mode. log.warn("SIP Thor is installed but disabled from the configuration") raise ImportError("SIP Thor is disabled") class SIPThorMediaRelayBase(EventServiceClient, SRVMediaRelayBase): topics = ["Thor.Members"] def __init__(self): self.node = GenericThorEntity(ThorNetworkConfig.node_ip, ["media_relay"], version=__version__) self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(cert_name='relay') - credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL) self.sipthor_dispatchers = [] self.additional_dispatchers = [] EventServiceClient.__init__(self, ThorNetworkConfig.domain, credentials) SRVMediaRelayBase.__init__(self) def handle_event(self, event): if not self.shutting_down: sip_proxy_ips = [node.ip for node in ThorEntities(event.message, role="sip_proxy")] self.sipthor_dispatchers = [(ip, DispatcherIPAddress.default_port) for ip in sip_proxy_ips] self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers) def _cb_got_all(self, results): if not self.shutting_down: self.additional_dispatchers = [result[1] for result in results if result[0] and result[1] is not None] self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers) def update_dispatchers(self, dispatchers): raise NotImplementedError() def _handle_SIGHUP(self, *args): SRVMediaRelayBase._handle_SIGHUP(self, *args) def _handle_SIGINT(self, *args): SRVMediaRelayBase._handle_SIGINT(self, *args) def _handle_SIGTERM(self, *args): SRVMediaRelayBase._handle_SIGTERM(self, *args) def shutdown(self, graceful=False): raise NotImplementedError()