diff --git a/config.ini.sample b/config.ini.sample index 9660118..cc0efe9 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -1,246 +1,240 @@ [Relay] ; A list of dispatchers to connect to, separated by spaces. The format is ; "host[:port] [host[:port] ...]". If a port is not specified the default port ; of 25060 will be used. "host" can be one of the following: ; - A domain name that has a SRV record for a SIP proxy, i.e. at ; "_sip._udp.". If the DNS lookup for this succeeds the relay ; will connect to the IP address of the SIP proxy on the port specified in ; this configuration. ; - A hostname. The lookup for this will be performed if the SRV lookup ; fails. ; - An IP address. The relay will connect directly to this address. ; Both the SRV and hostname lookups will be periodically refreshed (see ; "dns_check_interval" below). ; ;dispatchers = example.com 1.2.3.4:12345 ; Specify extra checks to be performed on the dispatcher TLS credentials before ; considering the connection with the dispatcher succesful. The passport is ; specified as a list of attribute/value pairs in the form: ; AN:value[, AN:value...] ; where the attribute name (AN) is one of the available attribute names from ; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a ; string that has to match with the corresponding attribute value from the ; dispatcher certificate. A wildcard (*) can be used in the value at the ; beginning or the end of the string to indicate that the corresponding ; attribute from the dispatcher certificate must end with respectively to ; start with the given string (excluding the wildcard). ; For example using this passport: ; passport = O:AG Projects, CN:*dispatcher ; means that a connection with a dispatcher will only be accepted if the ; dispatcher certificate subject has organization set to "AG Projects" and ; the common name ends with "dispatcher". To specify that no additional ; identity checks need to be performed, use the keyword None. If passport ; is None, then only the certificate signature is verified agains the ; certificate authority in tls/ca.pem (signature is always verified even ; when passport is None). ; ; Default value is None. ; ;passport = None ; The host IP address used for relaying streams. The default for this value ; is to use the IP address of the interface that has the default route. This ; is the most appropriate choice for almost any situation. Unless you need to ; use a very specific interface, which is not the default one, there is no need ; to set this option. Leave this option commented to use the default value. ;relay_ip = ; The host IP address to return when a session is allocated in the relay. This ; could be of use in case the relay is behind NAT but it has a 1 to 1 mapping ; with a public IP address, like Amazon EC2, for example. ;advertised_ip = ; The port range to use for relaying media streams in the form start:end with ; start and end being even numbers in the [1024, 65536] range and start < end ; The default range is 50000:60000. You should allocate 4 times the number of ; streams you plan for the relay to handle simultaneously. The default range ; having 10000 ports, is able to handle up to 2500 streams. ; ;port_range = 50000:60000 ; The minimum level log messages need to have in order to appear in syslog ; or on the console, depending on the mode the relay is running in. ; In order of severity, this can be one of CRITICAL, ERROR, WARNING, INFO or ; DEBUG. ;log_level = DEBUG ; The amount of time to wait for a stream in a new SDP offer to start sending ; data before the relay decides that it has timed out. The default value is 90 ; seconds. This only applies to the initial setup stage, before the first ; packet for a stream is received (from both ends). After the stream is started ; and the conntrack rule is in place, the idle timeout (how long before the ; conntrack rule expires when no traffic is received) is controlled by a kernel ; setting that defaults to 180 seconds and can be adjusted in: ; /proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream ; ;stream_timeout = 90 ; Amount of time a call can be on hold before it is declared expired by the ; relay. The default value is 7200 seconds (2 hours). ; ;on_hold_timeout = 7200 ; How often to check in DNS if the SRV and A records for the dispatcher have ; changed. Interval is in seconds and the default value is 60 seconds. ; ;dns_check_interval = 60 ; If the relay cannot connect to a dispatcher is should retry after this ; amount of seconds. The default value is 10 seconds. ; ;reconnect_delay = 10 ; How often to sample the aggregate ammount of data processed by the relay, in ; order to compute an average of the relayed traffic over that period. The ; value is expressed in seconds and the default value is 15 seconds. ; Use 0 to disable it in case you have to many streams processed by the relay ; and it warns you in syslog that gathering this information takes too long. ; ;traffic_sampling_period = 15 ; Specify a list of network ranges (in CIDR notation) for which media is relayed ; even if no packet was received from the endpoint and the IP address is private. ;routable_private_ranges = 192.168.1.0/24 [Dispatcher] ; Local socket on which to communicate with OpenSIPS. The OpenSIPS mediaproxy ; module should be configured to connect to this socket. If a relative path, ; /var/run/mediaproxy will be prepended. Default value is dispatcher.sock. ; ;socket_path = dispatcher.sock ; Listen address for incoming connections from the relays. The format is ; "ip[:port]". If the ip is "0.0.0.0" or the keyword "any", the dispatcher ; will listen on all interfaces of this host. If the port is not specified, ; the dispatcher will listen on the default port of 25060. ; ;listen = 0.0.0.0 ; Listen address for incoming management interface connections. Clients can ; connect to this and issue commands to query the status of the relays and ; their sessions. The format is "ip[:port]". If the ip is "0.0.0.0" or the ; keyword "any", the dispatcher will listen on all interfaces of this host. ; If the port is not specified, the dispatcher will listen on the default ; port of 25061. ; ;listen_management = 0.0.0.0 ; Whether or not to use TLS on the management interface. Note that the same ; TLS credentials are used for both the relay and the management interface ; connections. ; ; Default value is yes. ; ;management_use_tls = yes ; Specify extra checks to be performed on the relay TLS credentials before ; considering the connection with the relay succesful. The passport is ; specified as a list of attribute/value pairs in the form: ; AN:value[, AN:value...] ; where the attribute name (AN) is one of the available attribute names from ; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a ; string that has to match with the corresponding attribute value from the ; relay certificate. A wildcard (*) can be used in the value at the beginning ; or the end of the string to indicate that the corresponding attribute from ; the relay certificate must end with respectively to start with the given ; string (excluding the wildcard). ; For example using this passport: ; passport = O:AG Projects, CN:relay* ; means that a connection with a relay will only be accepted if the relay ; certificate subject has organization set to "AG Projects" and the common ; name starts with "relay". To specify that no additional identity checks ; need to be performed, use the keyword None. If passport is None, then only ; the certificate signature is verified agains the certificate authority in ; tls/ca.pem (signature is always verified even when passport is None). ; ; Default value is None. ; ;passport = None ; This option is similar to passport above, but applies to the management ; interface connections instead of relay connections. It specifies extra ; checks to be performed on the TLS credentials suplied by an entity that ; connects to the management interface. Please consult passport above for ; a detailed description of the possible values for this option. ; ; If management_use_tls is false, this option is ignored. ; ; Default value is None. ; ;management_passport = None ; The minimum level log messages need to have in order to appear in syslog ; or on the console, depending on the mode the dispatcher is running in. ; In order of severity, this can be one of CRITICAL, ERROR, WARNING, INFO or ; DEBUG. ;log_level = DEBUG ; Timeout value in second for individual relays. When a command is sent from ; the dispatcher to a relay it will wait this amount of seconds for a reply. ; The default is 5 seconds. ; ;relay_timeout = 5 ; A comma separated list of accounting backends that will be used to save ; accounting data with the session information once a session has finished. ; Currently 2 backends are available: "radius" and "database". If enabled ; they can be configured below in their respective sections. The default ; is to use no accounting backend. ; ;accounting = [TLS] ; Path to the certificates. If relative, it will be looked up in both the ; application directory (for a standalone installation) and /etc/mediaproxy, ; the former taking precedence if found. ; ;certs_path = tls ; How often (in seconds) to verify the peer certificate for expiration and ; revocation. Default value is 300 seconds (5 minutes) ; ;verify_interval = 300 [Database] ; This section needs to be configured if database accounting is enabled ; Database URI in the form: scheme://user:password@host/database ;dburi = mysql://mediaproxy:CHANGEME@localhost/mediaproxy ; Name for the table. ;sessions_table = media_sessions ; Column names. Columns are strings except for info which is a BLOB ; ;callid_column = call_id ;fromtag_column = from_tag ;totag_column = to_tag ;info_column = info [Radius] ; This section needs to be configured if radius accounting is enabled ; OpenSIPS RADIUS configuration file. All RADIUS cofiguration parameters will ; be read from this file, including dictionary files. ; ;config_file = /etc/opensips/radius/client.conf ; Additional dictionary file with MediaProxy specific attributes. ;additional_dictionary = radius/dictionary [OpenSIPS] ; Configure interaction between the media dispatcher and OpenSIPS ; Path to OpenSIPS's UNIX filesystem socket from the mi_datagram module. ;socket_path = /var/run/opensips/socket -; Maximum number of connections to open with OpenSIPS's mi_datagram socket. -; Please note that connections will be opened on a need basis depending on -; load, but never more than the number configured below. -; -;max_connections = 10 - diff --git a/mediaproxy/configuration/__init__.py b/mediaproxy/configuration/__init__.py index 775aad4..7a756e1 100644 --- a/mediaproxy/configuration/__init__.py +++ b/mediaproxy/configuration/__init__.py @@ -1,88 +1,88 @@ from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import IPAddress, NetworkRangeList from application.system import host from mediaproxy import configuration_filename from mediaproxy.configuration.datatypes import AccountingModuleList, DispatcherIPAddress, DispatcherAddressList, DispatcherManagementAddress, PortRange, PositiveInteger, SIPThorDomain, X509NameValidator class DispatcherConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Dispatcher' socket_path = "dispatcher.sock" listen = ConfigSetting(type=DispatcherIPAddress, value=DispatcherIPAddress("any")) listen_management = ConfigSetting(type=DispatcherManagementAddress, value=DispatcherManagementAddress("any")) relay_timeout = 5 # How much to wait for an answer from a relay relay_recover_interval = 60 # How much to wait for an unresponsive relay to recover, before disconnecting it cleanup_dead_relays_after = 43200 # 12 hours cleanup_expired_sessions_after = 86400 # 24 hours management_use_tls = True accounting = ConfigSetting(type=AccountingModuleList, value=[]) passport = ConfigSetting(type=X509NameValidator, value=None) management_passport = ConfigSetting(type=X509NameValidator, value=None) class RelayConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Relay' relay_ip = ConfigSetting(type=IPAddress, value=host.default_ip) advertised_ip = ConfigSetting(type=IPAddress, value=None) stream_timeout = 90 on_hold_timeout = 7200 traffic_sampling_period = 15 userspace_transmit_every = 1 dispatchers = ConfigSetting(type=DispatcherAddressList, value=[]) port_range = PortRange("50000:60000") dns_check_interval = PositiveInteger(60) keepalive_interval = PositiveInteger(10) reconnect_delay = PositiveInteger(10) passport = ConfigSetting(type=X509NameValidator, value=None) routable_private_ranges = ConfigSetting(type=NetworkRangeList, value=[]) class OpenSIPSConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'OpenSIPS' socket_path = '/var/run/opensips/socket' - max_connections = 10 + location_table = 'location' class RadiusConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Radius' config_file = "/etc/opensips/radius/client.conf" additional_dictionary = "radius/dictionary" class DatabaseConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Database' dburi = "" sessions_table = "media_sessions" callid_column = "call_id" fromtag_column = "from_tag" totag_column = "to_tag" info_column = "info" class TLSConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'TLS' certs_path = 'tls' verify_interval = 300 class ThorNetworkConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'ThorNetwork' domain = ConfigSetting(type=SIPThorDomain, value=None) node_ip = host.default_ip diff --git a/mediaproxy/dispatcher.py b/mediaproxy/dispatcher.py index 0a9e7b1..680a444 100644 --- a/mediaproxy/dispatcher.py +++ b/mediaproxy/dispatcher.py @@ -1,550 +1,535 @@ """Implementation of the MediaProxy dispatcher""" import random import signal import cPickle as pickle import cjson from collections import deque from itertools import ifilter from time import time from application import log from application.process import process from application.system import unlink from gnutls.errors import CertificateSecurityError from gnutls.interfaces.twisted import TLSContext from twisted.protocols.basic import LineOnlyReceiver from twisted.python import failure from twisted.internet.error import ConnectionDone, TCPTimedOutError from twisted.internet.protocol import Factory from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed from twisted.internet import reactor from mediaproxy import __version__ from mediaproxy.configuration import DispatcherConfig from mediaproxy.interfaces import opensips from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials log.msg("Twisted is using %s" % reactor.__module__.rsplit('.', 1)[-1]) class ControlProtocol(LineOnlyReceiver): noisy = False def __init__(self): self.in_progress = 0 def lineReceived(self, line): raise NotImplementedError() def connectionLost(self, reason): log.debug("Connection to %s lost: %s" % (self.description, reason.value)) self.factory.connection_lost(self) def reply(self, reply): self.transport.write(reply + "\r\n") def _relay_error(self, failure): failure.trap(RelayError) log.error(failure.value) self.transport.write("error\r\n") def _catch_all(self, failure): log.error(failure.getBriefTraceback()) self.transport.write("error\r\n") def _decrement(self, result): self.in_progress = 0 if self.factory.shutting_down: self.transport.loseConnection() def _add_callbacks(self, defer): defer.addCallback(self.reply) defer.addErrback(self._relay_error) defer.addErrback(self._catch_all) defer.addBoth(self._decrement) class OpenSIPSControlProtocol(ControlProtocol): description = "OpenSIPS" def __init__(self): self.line_buf = [] ControlProtocol.__init__(self) def lineReceived(self, line): if line == "": if self.line_buf: self.in_progress += 1 defer = self.factory.dispatcher.send_command(self.line_buf[0], self.line_buf[1:]) self._add_callbacks(defer) self.line_buf = [] elif not line.endswith(": "): self.line_buf.append(line) class ManagementControlProtocol(ControlProtocol): description = "Management interface client" def connectionMade(self): if DispatcherConfig.management_use_tls and DispatcherConfig.management_passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.management_passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return def lineReceived(self, line): if line in ["quit", "exit"]: self.transport.loseConnection() elif line == "summary": defer = self.factory.dispatcher.relay_factory.get_summary() self._add_callbacks(defer) elif line == "sessions": defer = self.factory.dispatcher.relay_factory.get_statistics() self._add_callbacks(defer) elif line == "version": self.reply(__version__) else: log.error("Unknown command on management interface: %s" % line) self.reply("error") class ControlFactory(Factory): noisy = False def __init__(self, dispatcher): self.dispatcher = dispatcher self.protocols = [] self.shutting_down = False def buildProtocol(self, addr): prot = Factory.buildProtocol(self, addr) self.protocols.append(prot) return prot def connection_lost(self, prot): self.protocols.remove(prot) if self.shutting_down and len(self.protocols) == 0: self.defer.callback(None) def shutdown(self): if self.shutting_down: return self.shutting_down = True if len(self.protocols) == 0: return succeed(None) else: for prot in self.protocols: if prot.in_progress == 0: prot.transport.loseConnection() self.defer = Deferred() return self.defer class OpenSIPSControlFactory(ControlFactory): protocol = OpenSIPSControlProtocol class ManagementControlFactory(ControlFactory): protocol = ManagementControlProtocol class RelayError(Exception): pass class ConnectionReplaced(ConnectionDone): pass class RelayServerProtocol(LineOnlyReceiver): noisy = False MAX_LENGTH = 4096*1024 ## (4MB) def __init__(self): self.commands = {} self.halting = False self.timedout = False self.disconnect_timer = None self.sequence_number = 0 self.authenticated = False @property def active(self): return not self.halting and not self.timedout def send_command(self, command, headers): log.debug('Issuing "%s" command to relay at %s' % (command, self.ip)) seq = str(self.sequence_number) self.sequence_number += 1 defer = Deferred() timer = reactor.callLater(DispatcherConfig.relay_timeout, self._timeout, seq, defer) self.commands[seq] = (command, defer, timer) self.transport.write("\r\n".join([" ".join([command, seq])] + headers + ["", ""])) return defer def _timeout(self, seq, defer): del self.commands[seq] defer.errback(RelayError("Relay at %s timed out" % self.ip)) if self.timedout is False: self.timedout = True self.disconnect_timer = reactor.callLater(DispatcherConfig.relay_recover_interval, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) def connectionMade(self): if DispatcherConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return self.authenticated = True self.factory.new_relay(self) def lineReceived(self, line): try: first, rest = line.split(" ", 1) except ValueError: first = line rest = "" if first == "expired": try: stats = cjson.decode(rest) except cjson.DecodeError: log.error("Error decoding JSON from relay at %s" % self.ip) else: call_id = stats['call_id'] session = self.factory.sessions.get(call_id, None) if session is None: log.error("Unknown session with call_id %s expired at relay %s" % (call_id, self.ip)) return if session.relay_ip != self.ip: log.error("session with call_id %s expired at relay %s, but is actually at relay %s, ignoring" % (call_id, self.ip, session.relay_ip)) return all_streams_ice = all(stream_info["status"] == "unselected ICE candidate" for stream_info in stats["streams"]) if all_streams_ice: log.msg("session with call_id %s from relay %s removed because ICE was used" % (call_id, session.relay_ip)) stats["timed_out"] = False else: log.msg("session with call_id %s from relay %s did timeout" % (call_id, session.relay_ip)) stats["timed_out"] = True stats["dialog_id"] = session.dialog_id stats["all_streams_ice"] = all_streams_ice self.factory.dispatcher.update_statistics(stats) if session.dialog_id is not None and stats["start_time"] is not None and not all_streams_ice: self.factory.dispatcher.opensips_management.end_dialog(session.dialog_id) session.expire_time = time() else: del self.factory.sessions[call_id] return elif first == "ping": if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.transport.write("pong\r\n") return try: command, defer, timer = self.commands.pop(first) except KeyError: log.error("Got unexpected response from relay at %s: %s" % (self.ip, line)) return timer.cancel() if rest == "error": defer.errback(RelayError("Received error from relay at %s in response to `%s' command" % (self.ip, command))) elif rest == "halting": self.halting = True defer.errback(RelayError("Relay at %s is shutting down" % self.ip)) elif command == "remove": try: stats = cjson.decode(rest) except cjson.DecodeError: log.error("Error decoding JSON from relay at %s" % self.ip) else: call_id = stats['call_id'] session = self.factory.sessions[call_id] stats["dialog_id"] = session.dialog_id stats["timed_out"] = False self.factory.dispatcher.update_statistics(stats) del self.factory.sessions[call_id] defer.callback("removed") else: # update command defer.callback(rest) def connectionLost(self, reason): if reason.type == ConnectionDone: log.msg("Connection with relay at %s was closed" % self.ip) elif reason.type == ConnectionReplaced: log.warn("Old connection with relay at %s was lost" % self.ip) else: log.error("Connection with relay at %s was lost: %s" % (self.ip, reason.value)) for command, defer, timer in self.commands.itervalues(): timer.cancel() defer.errback(RelayError("Relay at %s disconnected" % self.ip)) if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.factory.connection_lost(self) -class DialogID(str): - def __new__(cls, did): - if did is None: - return None - try: - h_entry, h_id = did.split(':') - except: - log.error("invalid dialog_id value: `%s'" % did) - return None - instance = str.__new__(cls, did) - instance.h_entry = h_entry - instance.h_id = h_id - return instance - - class RelaySession(object): def __init__(self, relay_ip, command_headers): self.relay_ip = relay_ip - self.dialog_id = DialogID(command_headers.get('dialog_id')) + self.dialog_id = command_headers.get('dialog_id') self.expire_time = None class RelayFactory(Factory): noisy = False protocol = RelayServerProtocol def __init__(self, dispatcher): self.dispatcher = dispatcher self.relays = {} self.shutting_down = False state_file = process.runtime_file("dispatcher_state") try: self.sessions = pickle.load(open(state_file)) except: self.sessions = {} self.cleanup_timers = {} else: self.cleanup_timers = dict((ip, reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, ip)) for ip in set(session.relay_ip for session in self.sessions.itervalues())) unlink(state_file) self.expired_cleaner = RecurrentCall(600, self._remove_expired_sessions) def _remove_expired_sessions(self): now, limit = time(), DispatcherConfig.cleanup_expired_sessions_after obsolete = [k for k, s in ifilter(lambda (k, s): s.expire_time and (now-s.expire_time>=limit), self.sessions.iteritems())] if obsolete: [self.sessions.pop(call_id) for call_id in obsolete] log.warn("found %d expired sessions which were not removed during the last %d hours" % (len(obsolete), round(limit/3600.0))) return KeepRunning def buildProtocol(self, addr): ip = addr.host log.debug("Connection from relay at %s" % ip) prot = Factory.buildProtocol(self, addr) prot.ip = ip return prot def new_relay(self, relay): old_relay = self.relays.pop(relay.ip, None) if old_relay is not None: log.warn("Relay at %s reconnected, closing old connection" % relay.ip) reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced("relay reconnected"))) self.relays[relay.ip] = relay timer = self.cleanup_timers.pop(relay.ip, None) if timer is not None: timer.cancel() defer = relay.send_command("sessions", []) defer.addCallback(self._cb_purge_sessions, relay.ip) def _cb_purge_sessions(self, result, relay_ip): relay_sessions = cjson.decode(result) relay_call_ids = [session["call_id"] for session in relay_sessions] for session_id, session in self.sessions.items(): if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids: log.warn("Session %s is no longer on relay %s, statistics are probably lost" % (session_id, relay_ip)) if session.dialog_id is not None: self.dispatcher.opensips_management.end_dialog(session.dialog_id) del self.sessions[session_id] def send_command(self, command, headers): try: parsed_headers = dict(header.split(": ", 1) for header in headers) except: raise RelayError("Could not parse headers from OpenSIPs") try: call_id = parsed_headers["call_id"] except KeyError: raise RelayError("Missing call_id header") session = self.sessions.get(call_id, None) if session and session.expire_time is None: relay = session.relay_ip if relay not in self.relays: raise RelayError("Relay for this session (%s) is no longer connected" % relay) return self.relays[relay].send_command(command, headers) ## We do not have a session for this call_id or the session is already expired if command == "update": preferred_relay = parsed_headers.get("media_relay") try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay) random.shuffle(try_relays) if preferred_relay is not None: protocol = self.relays.get(preferred_relay) if protocol is not None and protocol.active: try_relays.appendleft(protocol) else: log.warn("user requested media_relay %s is not available" % preferred_relay) defer = self._try_next(try_relays, command, headers) defer.addCallback(self._add_session, try_relays, call_id, parsed_headers) return defer elif command == 'remove' and session: ## This is the remove we received for an expired session for which we triggered dialog termination del self.sessions[call_id] return 'removed' else: raise RelayError("Got `%s' command from OpenSIPS for unknown session with call-id `%s'" % (command, call_id)) def _add_session(self, result, try_relays, call_id, parsed_headers): self.sessions[call_id] = RelaySession(try_relays[0].ip, parsed_headers) return result def _relay_error(self, failure, try_relays, command, headers): failure.trap(RelayError) failed_relay = try_relays.popleft() log.warn("Relay %s failed: %s" % (failed_relay, failure.value)) return self._try_next(try_relays, command, headers) def _try_next(self, try_relays, command, headers): if len(try_relays) == 0: raise RelayError("No suitable relay found") defer = try_relays[0].send_command(command, headers) defer.addErrback(self._relay_error, try_relays, command, headers) return defer def get_summary(self): defer = DeferredList([relay.send_command("summary", []).addErrback(self._summary_error, ip) for ip, relay in self.relays.iteritems()]) defer.addCallback(self._got_summaries) return defer def _summary_error(self, failure, ip): log.error("Error processing query at relay %s: %s" % (ip, failure.value)) return cjson.encode(dict(status="error", ip=ip)) def _got_summaries(self, results): return "[%s]" % ', '.join(result for succeeded, result in results if succeeded) def get_statistics(self): defer = DeferredList([relay.send_command("sessions", []) for relay in self.relays.itervalues()]) defer.addCallback(self._got_statistics) return defer def _got_statistics(self, results): return "[%s]" % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result!='[]') def connection_lost(self, relay): if relay not in self.relays.itervalues(): return if relay.authenticated: del self.relays[relay.ip] if self.shutting_down: if len(self.relays) == 0: self.defer.callback(None) else: self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip) def _do_cleanup(self, ip): log.debug("Doing cleanup for old relay %s" % ip) del self.cleanup_timers[ip] for call_id in [call_id for call_id, session in self.sessions.items() if session.relay_ip == ip]: del self.sessions[call_id] def shutdown(self): if self.shutting_down: return self.shutting_down = True for timer in self.cleanup_timers.itervalues(): timer.cancel() if len(self.relays) == 0: retval = succeed(None) else: for prot in self.relays.itervalues(): prot.transport.loseConnection() self.defer = Deferred() retval = self.defer retval.addCallback(self._save_state) return retval def _save_state(self, result): pickle.dump(self.sessions, open(process.runtime_file("dispatcher_state"), "w")) class Dispatcher(object): def __init__(self): self.accounting = [__import__("mediaproxy.interfaces.accounting.%s" % mod.lower(), globals(), locals(), [""]).Accounting() for mod in set(DispatcherConfig.accounting)] self.cred = X509Credentials(cert_name='dispatcher') self.tls_context = TLSContext(self.cred) self.relay_factory = RelayFactory(self) dispatcher_addr, dispatcher_port = DispatcherConfig.listen self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr) self.opensips_factory = OpenSIPSControlFactory(self) socket_path = process.runtime_file(DispatcherConfig.socket_path) unlink(socket_path) self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory) self.opensips_management = opensips.ManagementInterface() self.management_factory = ManagementControlFactory(self) management_addr, management_port = DispatcherConfig.listen_management if DispatcherConfig.management_use_tls: self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.tls_context, interface=management_addr) else: self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr) def run(self): process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) for accounting_module in self.accounting: accounting_module.start() reactor.run(installSignalHandlers=False) def send_command(self, command, headers): return maybeDeferred(self.relay_factory.send_command, command, headers) def update_statistics(self, stats): log.debug("Got statistics: %s" % stats) if stats["start_time"] is not None: for accounting in self.accounting: try: accounting.do_accounting(stats) except Exception, e: log.error("An unhandled error occured while doing accounting: %s" % e) log.err() def _handle_SIGHUP(self, *args): log.msg("Received SIGHUP, shutting down.") reactor.callFromThread(self._shutdown) def _handle_SIGINT(self, *args): if process._daemon: log.msg("Received SIGINT, shutting down.") else: log.msg("Received KeyboardInterrupt, exiting.") reactor.callFromThread(self._shutdown) def _handle_SIGTERM(self, *args): log.msg("Received SIGTERM, shutting down.") reactor.callFromThread(self._shutdown) def _shutdown(self): defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None]) defer.addCallback(lambda x: self.opensips_factory.shutdown()) defer.addCallback(lambda x: self.management_factory.shutdown()) defer.addCallback(lambda x: self.relay_factory.shutdown()) defer.addCallback(lambda x: self._stop()) def _stop(self): for act in self.accounting: act.stop() reactor.stop() diff --git a/mediaproxy/interfaces/opensips.py b/mediaproxy/interfaces/opensips.py index f9d6f8b..eb96e22 100644 --- a/mediaproxy/interfaces/opensips.py +++ b/mediaproxy/interfaces/opensips.py @@ -1,157 +1,240 @@ -"""The OpenSIPS Management Interface""" - - +import json import socket -from collections import deque -from twisted.internet import reactor, defer -from twisted.internet.protocol import DatagramProtocol -from twisted.internet.error import CannotListenError -from twisted.python.failure import Failure +import urlparse + +from abc import ABCMeta, abstractmethod, abstractproperty +from application import log from application.python.types import Singleton from application.process import process from application.system import unlink -from application import log +from random import getrandbits +from twisted.internet import reactor, defer +from twisted.internet.protocol import DatagramProtocol +from twisted.python.failure import Failure from mediaproxy.configuration import OpenSIPSConfig -class Error(Exception): pass -class CommandError(Error): pass -class TimeoutError(Error): pass -class NegativeReplyError(Error): pass +class Error(Exception): + pass + + +class TimeoutError(Error): + pass + + +class OpenSIPSError(Error): + pass + + +class NegativeReplyError(OpenSIPSError): + def __init__(self, code, message): + super(NegativeReplyError, self).__init__(code, message) + self.code = code + self.message = message + + def __repr__(self): + return '{0.__class__.__name__}({0.code!r}, {0.message!r})'.format(self) + + def __str__(self): + return '[{0.code}] {0.message}'.format(self) class Request(object): - def __init__(self, command): - self.command = command + __metaclass__ = ABCMeta + + method = abstractproperty() + + @abstractmethod + def __init__(self, *args): + self.id = '{:x}'.format(getrandbits(32)) + self.args = list(args) self.deferred = defer.Deferred() + @property + def __data__(self): + return dict(jsonrpc='2.0', id=self.id, method=self.method, params=self.args) + + @abstractmethod + def process_response(self, response): + raise NotImplementedError + + +# noinspection PyAbstractClass +class BooleanRequest(Request): + """A request that returns True if successful, False otherwise""" + def process_response(self, response): + return not isinstance(response, Failure) + + +class AddressReload(BooleanRequest): + method = 'address_reload' + + def __init__(self): + super(AddressReload, self).__init__() + + +class DomainReload(BooleanRequest): + method = 'domain_reload' + + def __init__(self): + super(DomainReload, self).__init__() + + +class EndDialog(BooleanRequest): + method = 'dlg_end_dlg' + + def __init__(self, dialog_id): + super(EndDialog, self).__init__(dialog_id) + + +class RefreshWatchers(BooleanRequest): + method = 'refresh_watchers' + + def __init__(self, account, refresh_type): + super(RefreshWatchers, self).__init__('sip:{}'.format(account), 'presence', refresh_type) + + +class UpdateSubscriptions(BooleanRequest): + method = 'rls_update_subscriptions' + + def __init__(self, account): + super(UpdateSubscriptions, self).__init__('sip:{}'.format(account)) + + +class GetOnlineDevices(Request): + method = 'ul_show_contact' + + def __init__(self, account): + super(GetOnlineDevices, self).__init__(OpenSIPSConfig.location_table, account) + + def process_response(self, response): + if isinstance(response, Failure): + if response.type is NegativeReplyError and response.value.code == 404: + return [] + return response + return [ContactData(contact) for contact in response[u'Contacts']] + + +class ContactData(dict): + __fields__ = {u'contact', u'expires', u'received', u'user_agent'} + + def __init__(self, data): + super(ContactData, self).__init__({key: value for key, value in ((key.lower().replace(u'-', u'_'), value) for key, value in data.iteritems()) if key in self.__fields__}) + self.setdefault(u'user_agent', None) + if u'received' in self: + parsed_received = urlparse.parse_qs(self[u'received']) + if u'target' in parsed_received: + self[u'NAT_contact'] = parsed_received[u'target'][0] + else: + self[u'NAT_contact'] = self[u'received'] + del self[u'received'] + else: + self[u'NAT_contact'] = self[u'contact'] + class UNIXSocketProtocol(DatagramProtocol): noisy = False def datagramReceived(self, data, address): - deferred = self.transport.deferred - if deferred is None or deferred.called: - return - # accumulate in a buffer until message end (do this later when implemented by opensips) -Dan - if not data: - failure = Failure(CommandError("Empty reply from OpenSIPS")) - deferred.errback(failure) - return + log.debug('Got MI response: {}'.format(data)) try: - status, msg = data.split('\n', 1) + response = json.loads(data) except ValueError: - failure = Failure(CommandError("Missing line terminator after status line in OpenSIPS reply")) - deferred.errback(failure) - return - if status.upper() == '200 OK': - deferred.callback(msg) + code, _, message = data.partition(' ') + try: + code = int(code) + except ValueError: + log.error('Received un-parsable response from OpenSIPS: {!r}'.format(data)) + return + # we got one of the 'code message' type of replies. This means either parsing error or internal error in OpenSIPS. + # if we only have one request pending, we can associate the response with it, otherwise is impossible to tell to + # which request the response corresponds. The failed request will fail with timeout later. + if len(self.transport.requests) == 1: + _, request = self.transport.requests.popitem() + request.deferred.errback(Failure(NegativeReplyError(code, message))) + log.error('MI request {.method} failed with: {} {}'.format(request, code, message)) + else: + log.error('Got non-JSON error reply from OpenSIPS that cannot be associated with a request: {!r}'.format(data)) else: - deferred.errback(Failure(NegativeReplyError(status))) + try: + request_id = response['id'] + except KeyError: + log.error('MI JSON response from OpenSIPS lacks id field: {!r}'.format(response)) + return + if request_id not in self.transport.requests: + log.error('Received MI response from OpenSIPS with unknown id: {!r}'.format(response)) + return + request = self.transport.requests.pop(request_id) + if 'result' in response: + request.deferred.callback(response['result']) + elif 'error' in response: + log.error('MI request {0.method} failed with: {1[error][code]} {1[error][message]}'.format(request, response)) + request.deferred.errback(Failure(NegativeReplyError(response['error']['code'], response['error']['message']))) + else: + log.error('Got invalid MI response from OpenSIPS: {!r}'.format(response)) + request.deferred.errback(Failure(OpenSIPSError('Invalid response from OpenSIPS'))) class UNIXSocketConnection(object): timeout = 3 - def __init__(self, socket_path): - self._initialized = False + def __init__(self): + socket_path = process.runtime_file('opensips.sock') + unlink(socket_path) self.path = socket_path self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol()) + self.transport.requests = {} reactor.addSystemEventTrigger('during', 'shutdown', self.close) - self.transport.deferred = None ## placeholder for the deferred used by a request - self._initialized = True def close(self): - if self._initialized: - self.transport.stopListening() - unlink(self.path) - - def _get_deferred(self): - return self.transport.deferred - def _set_deferred(self, d): - self.transport.deferred = d - deferred = property(_get_deferred, _set_deferred) - - def _did_timeout(self, deferred): - if deferred.called: - return - deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) + for request in self.transport.requests.values(): + if not request.deferred.called: + request.deferred.errback(Error('shutting down')) + self.transport.requests.clear() + self.transport.stopListening() + unlink(self.path) def send(self, request): - self.deferred = request.deferred try: - self.transport.write(request.command, OpenSIPSConfig.socket_path) - except socket.error, why: - log.error("cannot write request to `%s': %s" % (OpenSIPSConfig.socket_path, why[1])) - self.deferred.errback(Failure(CommandError("Cannot send request to OpenSIPS"))) + self.transport.write(json.dumps(request.__data__), OpenSIPSConfig.socket_path) + except socket.error as e: + log.error("cannot write request to %s: %s" % (OpenSIPSConfig.socket_path, e[1])) + request.deferred.errback(Failure(Error("Cannot send MI request %s to OpenSIPS" % request.method))) else: - reactor.callLater(self.timeout, self._did_timeout, self.deferred) - - -class UNIXSocketConnectionPool(object): - """Pool of UNIX socket connection to OpenSIPS""" - - def __init__(self, max_connections=10, pool_id=''): - assert max_connections > 0, 'maximum should be > 0' - self.max = max_connections - self.id = pool_id - self.workers = 0 - self.waiters = deque() - self.connections = deque() - - def _create_connections_as_needed(self): - while self.workers < self.max and len(self.waiters) > len(self.connections): - socket_name = "opensips_%s%02d.sock" % (self.id, self.workers+1) - socket_path = process.runtime_file(socket_name) - unlink(socket_path) - try: - conn = UNIXSocketConnection(socket_path) - except CannotListenError, why: - log.error("cannot create an OpenSIPS UNIX socket connection: %s" % str(why)) - break - self.connections.append(conn) - self.workers += 1 - - def _release_connection(self, result, conn): - self.connections.append(conn) - self._process_waiters() - return result - - def _process_waiters(self): - while self.waiters: - try: - conn = self.connections.popleft() - except IndexError: - return - request = self.waiters.popleft() - request.deferred.addBoth(self._release_connection, conn) - conn.send(request) - - def defer_to_connection(self, command): - request = Request(command) - self.waiters.append(request) - self._create_connections_as_needed() - self._process_waiters() + self.transport.requests[request.id] = request + request.deferred.addBoth(request.process_response) + reactor.callLater(self.timeout, self._did_timeout, request) + log.debug('Send MI request: {}'.format(request.__data__)) return request.deferred + def _did_timeout(self, request): + if not request.deferred.called: + request.deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) + self.transport.requests.pop(request.id) + class ManagementInterface(object): __metaclass__ = Singleton - + def __init__(self): - self.pool = UNIXSocketConnectionPool(OpenSIPSConfig.max_connections) + self.connection = UNIXSocketConnection() - ## Reply handlers __RH_xxx - - def __RH_end_dialog(self, result): - if isinstance(result, Failure): - log.error("failed to end dialog: %s" % result.value) - return False - return True + def reload_domains(self): + return self.connection.send(DomainReload()) + + def reload_addresses(self): + return self.connection.send(AddressReload()) def end_dialog(self, dialog_id): - cmd = ':dlg_end_dlg:\n%s\n%s\n\n' % (dialog_id.h_entry, dialog_id.h_id) - return self.pool.defer_to_connection(cmd).addBoth(self.__RH_end_dialog) + return self.connection.send(EndDialog(dialog_id)) + + def get_online_devices(self, account): + return self.connection.send(GetOnlineDevices(account)) + + def refresh_watchers(self, account, refresh_type): + return self.connection.send(RefreshWatchers(account, refresh_type)) + def update_subscriptions(self, account): + return self.connection.send(UpdateSubscriptions(account))