diff --git a/config.ini.sample b/config.ini.sample index d91a797..3913f32 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -1,240 +1,234 @@ [Relay] ; A list of dispatchers to connect to, separated by spaces. The format is ; "host[:port] [host[:port] ...]". If a port is not specified the default port ; of 25060 will be used. "host" can be one of the following: ; - A domain name that has a SRV record for a SIP proxy, i.e. at ; "_sip._udp.". If the DNS lookup for this succeeds the relay ; will connect to the IP address of the SIP proxy on the port specified in ; this configuration. ; - A hostname. The lookup for this will be performed if the SRV lookup ; fails. ; - An IP address. The relay will connect directly to this address. ; Both the SRV and hostname lookups will be periodically refreshed (see ; "dns_check_interval" below). ; ;dispatchers = example.com 1.2.3.4:12345 ; Specify extra checks to be performed on the dispatcher TLS credentials before ; considering the connection with the dispatcher successful. The passport is ; specified as a list of attribute/value pairs in the form: ; AN:value[, AN:value...] ; where the attribute name (AN) is one of the available attribute names from ; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a ; string that has to match with the corresponding attribute value from the ; dispatcher certificate. A wildcard (*) can be used in the value at the ; beginning or the end of the string to indicate that the corresponding ; attribute from the dispatcher certificate must end with respectively to ; start with the given string (excluding the wildcard). ; For example using this passport: ; passport = O:AG Projects, CN:*dispatcher ; means that a connection with a dispatcher will only be accepted if the ; dispatcher certificate subject has organization set to "AG Projects" and ; the common name ends with "dispatcher". To specify that no additional ; identity checks need to be performed, use the keyword None. If passport ; is None, then only the certificate signature is verified against the ; certificate authority in tls/ca.pem (signature is always verified even ; when passport is None). ; ; Default value is None. ; ;passport = None ; The host IP address used for relaying streams. The default for this value ; is to use the IP address of the interface that has the default route. This ; is the most appropriate choice for almost any situation. Unless you need to ; use a very specific interface, which is not the default one, there is no need ; to set this option. Leave this option commented to use the default value. ;relay_ip = ; The host IP address to return when a session is allocated in the relay. This ; could be of use in case the relay is behind NAT but it has a 1 to 1 mapping ; with a public IP address, like Amazon EC2, for example. ;advertised_ip = ; The port range to use for relaying media streams in the form start:end with ; start and end being even numbers in the [1024, 65536] range and start < end ; The default range is 50000:60000. You should allocate 4 times the number of ; streams you plan for the relay to handle simultaneously. The default range ; having 10000 ports, is able to handle up to 2500 streams. ; ;port_range = 50000:60000 -; The minimum level log messages need to have in order to appear in syslog -; or on the console, depending on the mode the relay is running in. -; In order of severity, this can be one of CRITICAL, ERROR, WARNING, INFO or -; DEBUG. -;log_level = DEBUG +; Logging level (one of CRITICAL, ERROR, WARNING, INFO or DEBUG) +;log_level = INFO ; The amount of time to wait for a stream in a new SDP offer to start sending ; data before the relay decides that it has timed out. The default value is 90 ; seconds. This only applies to the initial setup stage, before the first ; packet for a stream is received (from both ends). After the stream is started ; and the conntrack rule is in place, the idle timeout (how long before the ; conntrack rule expires when no traffic is received) is controlled by a kernel ; setting that defaults to 180 seconds and can be adjusted in: ; /proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream ; ;stream_timeout = 90 ; Amount of time a call can be on hold before it is declared expired by the ; relay. The default value is 7200 seconds (2 hours). ; ;on_hold_timeout = 7200 ; How often to check in DNS if the SRV and A records for the dispatcher have ; changed. Interval is in seconds and the default value is 60 seconds. ; ;dns_check_interval = 60 ; If the relay cannot connect to a dispatcher is should retry after this ; amount of seconds. The default value is 10 seconds. ; ;reconnect_delay = 10 ; How often to sample the aggregate amount of data processed by the relay, in ; order to compute an average of the relayed traffic over that period. The ; value is expressed in seconds and the default value is 15 seconds. ; Use 0 to disable it in case you have to many streams processed by the relay ; and it warns you in syslog that gathering this information takes too long. ; ;traffic_sampling_period = 15 ; Specify a list of network ranges (in CIDR notation) for which media is relayed ; even if no packet was received from the endpoint and the IP address is private. ;routable_private_ranges = 192.168.1.0/24 [Dispatcher] ; Local socket on which to communicate with OpenSIPS. The OpenSIPS mediaproxy ; module should be configured to connect to this socket. If a relative path, ; the runtime directory will be prepended. Default value is dispatcher.sock. ; ;socket_path = dispatcher.sock ; Listen address for incoming connections from the relays. The format is ; "ip[:port]". If the ip is "0.0.0.0" or the keyword "any", the dispatcher ; will listen on all interfaces of this host. If the port is not specified, ; the dispatcher will listen on the default port of 25060. ; ;listen = 0.0.0.0 ; Listen address for incoming management interface connections. Clients can ; connect to this and issue commands to query the status of the relays and ; their sessions. The format is "ip[:port]". If the ip is "0.0.0.0" or the ; keyword "any", the dispatcher will listen on all interfaces of this host. ; If the port is not specified, the dispatcher will listen on the default ; port of 25061. ; ;listen_management = 0.0.0.0 ; Whether or not to use TLS on the management interface. Note that the same ; TLS credentials are used for both the relay and the management interface ; connections. ; ; Default value is yes. ; ;management_use_tls = yes ; Specify extra checks to be performed on the relay TLS credentials before ; considering the connection with the relay successful. The passport is ; specified as a list of attribute/value pairs in the form: ; AN:value[, AN:value...] ; where the attribute name (AN) is one of the available attribute names from ; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a ; string that has to match with the corresponding attribute value from the ; relay certificate. A wildcard (*) can be used in the value at the beginning ; or the end of the string to indicate that the corresponding attribute from ; the relay certificate must end with respectively to start with the given ; string (excluding the wildcard). ; For example using this passport: ; passport = O:AG Projects, CN:relay* ; means that a connection with a relay will only be accepted if the relay ; certificate subject has organization set to "AG Projects" and the common ; name starts with "relay". To specify that no additional identity checks ; need to be performed, use the keyword None. If passport is None, then only ; the certificate signature is verified against the certificate authority in ; tls/ca.pem (signature is always verified even when passport is None). ; ; Default value is None. ; ;passport = None ; This option is similar to passport above, but applies to the management ; interface connections instead of relay connections. It specifies extra ; checks to be performed on the TLS credentials supplied by an entity that ; connects to the management interface. Please consult passport above for ; a detailed description of the possible values for this option. ; ; If management_use_tls is false, this option is ignored. ; ; Default value is None. ; ;management_passport = None -; The minimum level log messages need to have in order to appear in syslog -; or on the console, depending on the mode the dispatcher is running in. -; In order of severity, this can be one of CRITICAL, ERROR, WARNING, INFO or -; DEBUG. -;log_level = DEBUG +; Logging level (one of CRITICAL, ERROR, WARNING, INFO or DEBUG) +;log_level = INFO ; Timeout value in second for individual relays. When a command is sent from ; the dispatcher to a relay it will wait this amount of seconds for a reply. ; The default is 5 seconds. ; ;relay_timeout = 5 ; A comma separated list of accounting backends that will be used to save ; accounting data with the session information once a session has finished. ; Currently 2 backends are available: "radius" and "database". If enabled ; they can be configured below in their respective sections. The default ; is to use no accounting backend. ; ;accounting = [TLS] ; Path to the certificates. If relative, it will be looked up in both the ; application directory (for a standalone installation) and /etc/mediaproxy, ; the former taking precedence if found. ; ;certs_path = tls ; How often (in seconds) to verify the peer certificate for expiration and ; revocation. Default value is 300 seconds (5 minutes) ; ;verify_interval = 300 [Database] ; This section needs to be configured if database accounting is enabled ; Database URI in the form: scheme://user:password@host/database ;dburi = mysql://mediaproxy:CHANGEME@localhost/mediaproxy ; Name for the table. ;sessions_table = media_sessions ; Column names. Columns are strings except for info which is a BLOB ; ;callid_column = call_id ;fromtag_column = from_tag ;totag_column = to_tag ;info_column = info [Radius] ; This section needs to be configured if radius accounting is enabled ; OpenSIPS RADIUS configuration file. All RADIUS configuration parameters ; will be read from this file, including dictionary files. ; ;config_file = /etc/opensips/radius/client.conf ; Additional dictionary file with MediaProxy specific attributes. ;additional_dictionary = radius/dictionary [OpenSIPS] ; Configure interaction between the media dispatcher and OpenSIPS ; Path to OpenSIPS's UNIX filesystem socket from the mi_datagram module. ;socket_path = /run/opensips/socket diff --git a/media-dispatcher b/media-dispatcher index b81988f..58411a2 100644 --- a/media-dispatcher +++ b/media-dispatcher @@ -1,80 +1,77 @@ #!/usr/bin/env python if __name__ == '__main__': import mediaproxy import sys from application import log - from application.configuration import ConfigFile, datatypes from application.process import process, ProcessError from argparse import ArgumentParser name = 'media-dispatcher' fullname = 'MediaProxy Dispatcher' description = 'MediaProxy Dispatcher component' process.configuration.user_directory = None process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory process.runtime.subdirectory = mediaproxy.mediaproxy_subdirectory parser = ArgumentParser(usage='%(prog)s [options]') parser.add_argument('--version', action='version', version='%(prog)s {}'.format(mediaproxy.__version__)) parser.add_argument('--systemd', action='store_true', help='run as a systemd simple service and log to journal') parser.add_argument('--no-fork', action='store_false', dest='fork', help='run in the foreground and log to the terminal') parser.add_argument('--config-dir', dest='config_directory', default=None, help='the configuration directory ({})'.format(process.configuration.system_directory), metavar='PATH') parser.add_argument('--runtime-dir', dest='runtime_directory', default=None, help='the runtime directory ({})'.format(process.runtime.directory), metavar='PATH') parser.add_argument('--debug', action='store_true', help='enable verbose logging') parser.add_argument('--debug-memory', action='store_true', help='enable memory debugging') options = parser.parse_args() log.Formatter.prefix_format = '{record.levelname:<8s} ' if options.config_directory is not None: process.configuration.local_directory = options.config_directory if options.runtime_directory is not None: process.runtime.directory = options.runtime_directory - config_file = ConfigFile(mediaproxy.configuration_file) - log.level.current = config_file.get_setting('Dispatcher', 'log_level', type=datatypes.LogLevel, default=log.level.DEBUG) - try: process.runtime.create_directory() except ProcessError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) if options.systemd: from systemd.journal import JournalHandler log.set_handler(JournalHandler(SYSLOG_IDENTIFIER=name)) log.capture_output() elif options.fork: try: process.daemonize(pidfile='{}.pid'.format(name)) except ProcessError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) log.use_syslog(name) log.info('Starting %s %s' % (fullname, mediaproxy.__version__)) from mediaproxy.dispatcher import Dispatcher + from mediaproxy.configuration import DispatcherConfig + + log.level.current = log.level.DEBUG if options.debug else DispatcherConfig.log_level - if options.debug: - log.level.current = log.level.DEBUG if options.debug_memory: from application.debug.memory import memory_dump try: dispatcher = Dispatcher() except Exception as e: log.critical('Failed to create %s: %s' % (fullname, e)) if type(e) is not RuntimeError: log.exception() sys.exit(1) dispatcher.run() if options.debug_memory: memory_dump() diff --git a/media-relay b/media-relay index 40f491e..05b5162 100644 --- a/media-relay +++ b/media-relay @@ -1,124 +1,120 @@ #!/usr/bin/env python if __name__ == '__main__': import mediaproxy import errno import sys import subprocess from application import log - from application.configuration import ConfigFile, datatypes from application.process import process, ProcessError from application.version import Version from argparse import ArgumentParser IP_FORWARD_FILE = '/proc/sys/net/ipv4/ip_forward' CONNTRACK_ACCT_FILE = '/proc/sys/net/netfilter/nf_conntrack_acct' KERNEL_VERSION_FILE = '/proc/sys/kernel/osrelease' name = 'media-relay' fullname = 'MediaProxy Relay' description = 'MediaProxy Relay component' process.configuration.user_directory = None process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory process.runtime.subdirectory = mediaproxy.mediaproxy_subdirectory parser = ArgumentParser(usage='%(prog)s [options]') parser.add_argument('--version', action='version', version='%(prog)s {}'.format(mediaproxy.__version__)) parser.add_argument('--systemd', action='store_true', help='run as a systemd simple service and log to journal') parser.add_argument('--no-fork', action='store_false', dest='fork', help='run in the foreground and log to the terminal') parser.add_argument('--config-dir', dest='config_directory', default=None, help='the configuration directory ({})'.format(process.configuration.system_directory), metavar='PATH') parser.add_argument('--runtime-dir', dest='runtime_directory', default=None, help='the runtime directory ({})'.format(process.runtime.directory), metavar='PATH') parser.add_argument('--debug', action='store_true', help='enable verbose logging') parser.add_argument('--debug-memory', action='store_true', help='enable memory debugging') options = parser.parse_args() log.Formatter.prefix_format = '{record.levelname:<8s} ' if options.config_directory is not None: process.configuration.local_directory = options.config_directory if options.runtime_directory is not None: process.runtime.directory = options.runtime_directory if not sys.platform.startswith('linux'): log.critical('Cannot start %s. A Linux host is required for operation.' % fullname) sys.exit(1) try: subprocess.call(['modprobe', 'ip_tables'], env={'PATH': '/usr/sbin:/sbin:/usr/bin:/bin'}) except OSError as e: log.critical('Cannot start %s: failed to load the ip_tables kernel module: %s' % (fullname, e)) sys.exit(1) try: kernel_version = Version.parse(open(KERNEL_VERSION_FILE).read().strip()) except (OSError, IOError, ValueError): log.critical('Could not determine Linux kernel version') sys.exit(1) if kernel_version < Version(2, 6, 18): log.critical('Linux kernel version 2.6.18 or newer is required to run the media relay') sys.exit(1) try: ip_forward = bool(int(open(IP_FORWARD_FILE).read())) except (OSError, IOError, ValueError): ip_forward = False if not ip_forward: log.critical('IP forwarding is not available or not enabled (check %s)' % IP_FORWARD_FILE) sys.exit(1) try: with open(CONNTRACK_ACCT_FILE, 'w') as acct_file: acct_file.write('1') except (IOError, OSError) as e: if e.errno != errno.ENOENT: log.critical('Could not enable conntrack rule counters (check %s): %s' % (CONNTRACK_ACCT_FILE, e)) sys.exit(1) - config_file = ConfigFile(mediaproxy.configuration_file) - log.level.current = config_file.get_setting('Relay', 'log_level', type=datatypes.LogLevel, default=log.level.DEBUG) - if options.systemd: from systemd.journal import JournalHandler log.set_handler(JournalHandler(SYSLOG_IDENTIFIER=name)) log.capture_output() elif options.fork: try: process.daemonize(pidfile='{}.pid'.format(name)) except ProcessError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) log.use_syslog(name) log.info('Starting %s %s' % (fullname, mediaproxy.__version__)) try: process.wait_for_network(wait_time=10, wait_message='Waiting for network to become available...') except KeyboardInterrupt: sys.exit(0) except RuntimeError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) try: from mediaproxy.relay import MediaRelay - if options.debug: - log.level.current = log.level.DEBUG + from mediaproxy.configuration import RelayConfig + log.level.current = log.level.DEBUG if options.debug else RelayConfig.log_level if options.debug_memory: from application.debug.memory import memory_dump relay = MediaRelay() except Exception as e: log.critical('Failed to create %s: %s' % (fullname, e)) if type(e) is not RuntimeError: log.exception() sys.exit(1) relay.run() if options.debug_memory: memory_dump() diff --git a/mediaproxy/configuration/__init__.py b/mediaproxy/configuration/__init__.py index 5cee53e..f7599fb 100644 --- a/mediaproxy/configuration/__init__.py +++ b/mediaproxy/configuration/__init__.py @@ -1,87 +1,90 @@ +from application import log from application.configuration import ConfigSection, ConfigSetting -from application.configuration.datatypes import IPAddress, NetworkRangeList +from application.configuration.datatypes import IPAddress, LogLevel, NetworkRangeList from application.system import host from mediaproxy import configuration_file from mediaproxy.configuration.datatypes import AccountingModuleList, DispatcherIPAddress, DispatcherAddressList, DispatcherManagementAddress, PortRange, PositiveInteger, SIPThorDomain, X509NameValidator class DispatcherConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'Dispatcher' socket_path = 'dispatcher.sock' listen = ConfigSetting(type=DispatcherIPAddress, value=DispatcherIPAddress('any')) listen_management = ConfigSetting(type=DispatcherManagementAddress, value=DispatcherManagementAddress('any')) relay_timeout = 5 # How much to wait for an answer from a relay relay_recover_interval = 60 # How much to wait for an unresponsive relay to recover, before disconnecting it cleanup_dead_relays_after = 43200 # 12 hours cleanup_expired_sessions_after = 86400 # 24 hours management_use_tls = True accounting = ConfigSetting(type=AccountingModuleList, value=[]) passport = ConfigSetting(type=X509NameValidator, value=None) management_passport = ConfigSetting(type=X509NameValidator, value=None) + log_level = ConfigSetting(type=LogLevel, value=log.level.INFO) class RelayConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'Relay' relay_ip = ConfigSetting(type=IPAddress, value=host.default_ip) advertised_ip = ConfigSetting(type=IPAddress, value=None) stream_timeout = 90 on_hold_timeout = 7200 traffic_sampling_period = 15 userspace_transmit_every = 1 dispatchers = ConfigSetting(type=DispatcherAddressList, value=[]) port_range = PortRange('50000:60000') dns_check_interval = PositiveInteger(60) keepalive_interval = PositiveInteger(10) reconnect_delay = PositiveInteger(10) passport = ConfigSetting(type=X509NameValidator, value=None) routable_private_ranges = ConfigSetting(type=NetworkRangeList, value=[]) + log_level = ConfigSetting(type=LogLevel, value=log.level.INFO) class OpenSIPSConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'OpenSIPS' socket_path = '/run/opensips/socket' location_table = 'location' class RadiusConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'Radius' config_file = '/etc/opensips/radius/client.conf' additional_dictionary = 'radius/dictionary' class DatabaseConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'Database' dburi = '' sessions_table = 'media_sessions' callid_column = 'call_id' fromtag_column = 'from_tag' totag_column = 'to_tag' info_column = 'info' class TLSConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'TLS' certs_path = 'tls' verify_interval = 300 class ThorNetworkConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'ThorNetwork' domain = ConfigSetting(type=SIPThorDomain, value=None) node_ip = host.default_ip diff --git a/mediaproxy/dispatcher.py b/mediaproxy/dispatcher.py index 61a1e89..4be8953 100644 --- a/mediaproxy/dispatcher.py +++ b/mediaproxy/dispatcher.py @@ -1,600 +1,609 @@ """Implementation of the MediaProxy dispatcher""" import hashlib import random import signal import cPickle as pickle import cjson from base64 import b64encode as base64_encode from collections import deque from itertools import ifilter from time import time from application import log from application.process import process from application.system import unlink from gnutls.errors import CertificateSecurityError from gnutls.interfaces.twisted import TLSContext from twisted.protocols.basic import LineOnlyReceiver from twisted.python import failure from twisted.internet.error import ConnectionDone, TCPTimedOutError from twisted.internet.protocol import Factory, connectionDone from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed from twisted.internet import reactor from mediaproxy import __version__ from mediaproxy.configuration import DispatcherConfig from mediaproxy.interfaces import opensips from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials class CommandError(Exception): pass class Command(object): def __init__(self, name, headers=None): self.name = name self.headers = headers or [] try: self.parsed_headers = dict(header.split(': ', 1) for header in self.headers) except Exception: raise CommandError('Could not parse command headers') else: self.__dict__['session_id'] = None if self.call_id is None else base64_encode(hashlib.md5(self.call_id).digest()).rstrip('=') @property def call_id(self): return self.parsed_headers.get('call_id') @property def dialog_id(self): return self.parsed_headers.get('dialog_id') @property def session_id(self): return self.__dict__['session_id'] class ProtocolLogger(log.ContextualLogger): def __init__(self, name): super(ProtocolLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend self.name = name def apply_context(self, message): return '[{0}] {1}'.format(self.name, message) if message != '' else '' class SessionLogger(log.ContextualLogger): def __init__(self, session): super(SessionLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend self.session_id = session.session_id self.relay_ip = session.relay_ip def apply_context(self, message): return '[session {0.session_id} at {0.relay_ip}] {1}'.format(self, message) if message != '' else '' class ControlProtocol(LineOnlyReceiver): logger = None # type: ProtocolLogger noisy = False def __init__(self): self.in_progress = 0 def lineReceived(self, line): raise NotImplementedError() def connectionLost(self, reason=connectionDone): if isinstance(reason.value, connectionDone.type): self.logger.info('Connection closed') else: self.logger.warning('Connection lost: {}'.format(reason.value)) self.factory.connection_lost(self) def reply(self, reply): self.transport.write(reply + self.delimiter) def _error_handler(self, failure): failure.trap(CommandError, RelayError) self.logger.error(failure.value) self.reply('error') def _catch_all(self, failure): self.logger.error(failure.getTraceback()) self.reply('error') def _decrement(self, result): self.in_progress = 0 if self.factory.shutting_down: self.transport.loseConnection() def _add_callbacks(self, defer): defer.addCallback(self.reply) defer.addErrback(self._error_handler) defer.addErrback(self._catch_all) defer.addBoth(self._decrement) class OpenSIPSControlProtocol(ControlProtocol): logger = ProtocolLogger(name='OpenSIPS Interface') def __init__(self): self.request_lines = [] ControlProtocol.__init__(self) def lineReceived(self, line): if line == '': if self.request_lines: self.in_progress += 1 defer = maybeDeferred(self.handle_request, self.request_lines) self._add_callbacks(defer) self.request_lines = [] elif not line.endswith(': '): self.request_lines.append(line) def handle_request(self, request_lines): command = Command(name=request_lines[0], headers=request_lines[1:]) if command.call_id is None: raise CommandError('Request is missing the call_id header') return self.factory.dispatcher.send_command(command) class ManagementControlProtocol(ControlProtocol): logger = ProtocolLogger(name='Management Interface') def connectionMade(self): if DispatcherConfig.management_use_tls and DispatcherConfig.management_passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.management_passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return def lineReceived(self, line): if line in ['quit', 'exit']: self.transport.loseConnection() elif line == 'summary': defer = self.factory.dispatcher.relay_factory.get_summary() self._add_callbacks(defer) elif line == 'sessions': defer = self.factory.dispatcher.relay_factory.get_statistics() self._add_callbacks(defer) elif line == 'version': self.reply(__version__) else: self.logger.error('Unknown command: %s' % line) self.reply('error') class ControlFactory(Factory): noisy = False def __init__(self, dispatcher): self.dispatcher = dispatcher self.protocols = [] self.shutting_down = False def buildProtocol(self, addr): protocol = Factory.buildProtocol(self, addr) self.protocols.append(protocol) return protocol def connection_lost(self, prot): self.protocols.remove(prot) if self.shutting_down and len(self.protocols) == 0: self.defer.callback(None) def shutdown(self): if self.shutting_down: return self.shutting_down = True if len(self.protocols) == 0: return succeed(None) else: for prot in self.protocols: if prot.in_progress == 0: prot.transport.loseConnection() self.defer = Deferred() return self.defer class OpenSIPSControlFactory(ControlFactory): protocol = OpenSIPSControlProtocol class ManagementControlFactory(ControlFactory): protocol = ManagementControlProtocol class RelayError(Exception): pass class ConnectionReplaced(ConnectionDone): pass class RelayServerProtocol(LineOnlyReceiver): MAX_LENGTH = 4096*1024 # 4MB noisy = False def __init__(self): self.ip = None # type: str self.logger = None # type: ProtocolLogger self.commands = {} self.halting = False self.timedout = False self.disconnect_timer = None self.sequence_number = 0 self.authenticated = False @property def active(self): return not self.halting and not self.timedout def send_command(self, command): if command.call_id: self.logger.info('Requesting {0.name!r} for session {0.session_id}'.format(command)) else: self.logger.info('Requesting {0.name!r}'.format(command)) sequence_number = str(self.sequence_number) self.sequence_number += 1 defer = Deferred() timer = reactor.callLater(DispatcherConfig.relay_timeout, self._timeout, sequence_number) self.commands[sequence_number] = (command, defer, timer) self.transport.write(self.delimiter.join(['{} {}'.format(command.name, sequence_number)] + command.headers) + 2*self.delimiter) return defer def reply(self, reply): self.transport.write(reply + self.delimiter) def _timeout(self, sequence_number): command, defer, timer = self.commands.pop(sequence_number) defer.errback(RelayError('%r command failed: relay at %s timed out' % (command.name, self.ip))) if self.timedout is False: self.timedout = True self.disconnect_timer = reactor.callLater(DispatcherConfig.relay_recover_interval, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) def connectionMade(self): if DispatcherConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not DispatcherConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) return self.authenticated = True self.factory.new_relay(self) def lineReceived(self, line): try: first, rest = line.split(' ', 1) except ValueError: first = line rest = '' if first == 'expired': try: stats = cjson.decode(rest) except cjson.DecodeError as e: self.logger.error('Could not decode JSON: {}'.format(e)) else: call_id = stats['call_id'] session = self.factory.sessions.get(call_id, None) if session is None: self.logger.error('Expired session has unknown call_id %s' % call_id) return if session.relay_ip != self.ip: session.logger.error('relay at %s reported the session as expired, ignoring' % self.ip) return all_streams_ice = all(stream_info['status'] == 'unselected ICE candidate' for stream_info in stats['streams']) if all_streams_ice: session.logger.info('removed because ICE was used') stats['timed_out'] = False else: session.logger.info('did timeout') stats['timed_out'] = True stats['dialog_id'] = session.dialog_id stats['all_streams_ice'] = all_streams_ice self.factory.dispatcher.update_statistics(session, stats) if session.dialog_id is not None and stats['start_time'] is not None and not all_streams_ice: self.factory.dispatcher.opensips_management.end_dialog(session.dialog_id) session.expire_time = time() else: del self.factory.sessions[call_id] return elif first == 'ping': if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.reply('pong') return try: command, defer, timer = self.commands.pop(first) except KeyError: self.logger.error('Got unexpected response: {}'.format(line)) return timer.cancel() if rest == 'error': defer.errback(RelayError('Relay replied with error')) elif rest == 'halting': self.halting = True defer.errback(RelayError('Relay is shutting down')) elif command.name == 'remove': try: stats = cjson.decode(rest) except cjson.DecodeError: self.logger.error('Error decoding JSON') else: call_id = stats['call_id'] session = self.factory.sessions[call_id] stats['dialog_id'] = session.dialog_id stats['timed_out'] = False self.factory.dispatcher.update_statistics(session, stats) del self.factory.sessions[call_id] defer.callback('removed') else: # update command defer.callback(rest) def connectionLost(self, reason=connectionDone): if reason.type == ConnectionDone: self.logger.info('Connection closed') elif reason.type == ConnectionReplaced: self.logger.warning('Connection replaced') else: self.logger.error('Connection lost: {}'.format(reason.value)) for command, defer, timer in self.commands.itervalues(): timer.cancel() defer.errback(RelayError('Relay disconnected')) if self.timedout is True: self.timedout = False if self.disconnect_timer.active(): self.disconnect_timer.cancel() self.disconnect_timer = None self.factory.connection_lost(self) class RelaySession(object): def __init__(self, relay, command): self.relay_ip = relay.ip self.call_id = command.call_id self.session_id = command.session_id self.dialog_id = command.dialog_id self.logger = SessionLogger(self) self.expire_time = None class RelayFactory(Factory): protocol = RelayServerProtocol noisy = False def __init__(self, dispatcher): self.dispatcher = dispatcher self.relays = {} self.shutting_down = False state_file = process.runtime.file('dispatcher_state') try: self.sessions = pickle.load(open(state_file)) except: self.sessions = {} self.cleanup_timers = {} else: self.cleanup_timers = dict((ip, reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, ip)) for ip in set(session.relay_ip for session in self.sessions.itervalues())) unlink(state_file) self.expired_cleaner = RecurrentCall(600, self._remove_expired_sessions) def _remove_expired_sessions(self): now, limit = time(), DispatcherConfig.cleanup_expired_sessions_after obsolete = [k for k, s in ifilter(lambda (k, s): s.expire_time and (now-s.expire_time>=limit), self.sessions.iteritems())] if obsolete: [self.sessions.pop(call_id) for call_id in obsolete] log.warning('found %d expired sessions which were not removed during the last %d hours' % (len(obsolete), round(limit / 3600.0))) return KeepRunning def buildProtocol(self, addr): protocol = Factory.buildProtocol(self, addr) protocol.ip = addr.host protocol.logger = ProtocolLogger(name='relay {}'.format(addr.host)) protocol.logger.info('Connection established') return protocol def new_relay(self, relay): old_relay = self.relays.pop(relay.ip, None) if old_relay is not None: relay.logger.warning('Reconnected, closing old connection') reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced('relay reconnected'))) self.relays[relay.ip] = relay timer = self.cleanup_timers.pop(relay.ip, None) if timer is not None: timer.cancel() defer = relay.send_command(Command('sessions')) defer.addCallback(self._cb_purge_sessions, relay.ip) def _cb_purge_sessions(self, result, relay_ip): relay_sessions = cjson.decode(result) relay_call_ids = [session['call_id'] for session in relay_sessions] for session_id, session in self.sessions.items(): if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids: session.logger.warning('Relay does not have the session anymore, statistics are probably lost') if session.dialog_id is not None: self.dispatcher.opensips_management.end_dialog(session.dialog_id) del self.sessions[session_id] def send_command(self, command): session = self.sessions.get(command.call_id, None) if session and session.expire_time is None: relay = session.relay_ip if relay not in self.relays: session.logger.error('Request {0.name!r} failed: relay no longer connected'.format(command)) raise RelayError('Request {0.name!r} failed: relay no longer connected'.format(command)) return self.relays[relay].send_command(command) # We do not have a session for this call_id or the session is already expired if command.name == 'update': preferred_relay = command.parsed_headers.get('media_relay') try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay) random.shuffle(try_relays) if preferred_relay is not None: protocol = self.relays.get(preferred_relay) if protocol is not None and protocol.active: try_relays.appendleft(protocol) else: log.warning('user requested media_relay %s is not available' % preferred_relay) defer = self._try_next(try_relays, command) defer.addCallback(self._add_session, try_relays, command) return defer elif command.name == 'remove' and session: # This is the remove we received for an expired session for which we triggered dialog termination del self.sessions[command.call_id] return 'removed' else: raise RelayError('Got {0.name!r} for unknown session {0.session_id}'.format(command)) def _add_session(self, result, try_relays, command): self.sessions[command.call_id] = RelaySession(try_relays[0], command) return result def _relay_error(self, failure, try_relays, command): failure.trap(RelayError) failed_relay = try_relays.popleft() failed_relay.logger.warning('The {0.name!r} request failed: {1.value}'.format(command, failure)) return self._try_next(try_relays, command) def _try_next(self, try_relays, command): if len(try_relays) == 0: raise RelayError('No suitable relay found') defer = try_relays[0].send_command(command) defer.addErrback(self._relay_error, try_relays, command) return defer def get_summary(self): command = Command('summary') defer = DeferredList([relay.send_command(command).addErrback(self._summary_error, command, relay) for relay in self.relays.itervalues()]) defer.addCallback(self._got_summaries) return defer def _summary_error(self, failure, command, relay): relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure)) return cjson.encode(dict(status='error', ip=relay.ip)) def _got_summaries(self, results): return '[%s]' % ', '.join(result for succeeded, result in results if succeeded) def get_statistics(self): command = Command('sessions') defer = DeferredList([relay.send_command(command).addErrback(self._statistics_error, command, relay) for relay in self.relays.itervalues()]) defer.addCallback(self._got_statistics) return defer def _statistics_error(self, failure, command, relay): relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure)) return cjson.encode([]) def _got_statistics(self, results): return '[%s]' % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result != '[]') def connection_lost(self, relay): if relay not in self.relays.itervalues(): return if relay.authenticated: del self.relays[relay.ip] if self.shutting_down: if len(self.relays) == 0: self.defer.callback(None) else: self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip) def _do_cleanup(self, ip): log.debug('Cleaning up after old relay at %s' % ip) del self.cleanup_timers[ip] for call_id in (call_id for call_id, session in self.sessions.items() if session.relay_ip == ip): del self.sessions[call_id] def shutdown(self): if self.shutting_down: return self.shutting_down = True for timer in self.cleanup_timers.itervalues(): timer.cancel() if len(self.relays) == 0: retval = succeed(None) else: for prot in self.relays.itervalues(): prot.transport.loseConnection() self.defer = Deferred() retval = self.defer retval.addCallback(self._save_state) return retval def _save_state(self, result): pickle.dump(self.sessions, open(process.runtime.file('dispatcher_state'), 'w')) class Dispatcher(object): def __init__(self): self.accounting = [__import__('mediaproxy.interfaces.accounting.%s' % mod.lower(), globals(), locals(), ['']).Accounting() for mod in set(DispatcherConfig.accounting)] self.cred = X509Credentials(cert_name='dispatcher') self.tls_context = TLSContext(self.cred) self.relay_factory = RelayFactory(self) dispatcher_addr, dispatcher_port = DispatcherConfig.listen self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr) self.opensips_factory = OpenSIPSControlFactory(self) socket_path = process.runtime.file(DispatcherConfig.socket_path) unlink(socket_path) self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory) self.opensips_management = opensips.ManagementInterface() self.management_factory = ManagementControlFactory(self) management_addr, management_port = DispatcherConfig.listen_management if DispatcherConfig.management_use_tls: self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.tls_context, interface=management_addr) else: self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr) def run(self): log.debug('Using {0.__class__.__name__}'.format(reactor)) process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) + process.signals.add_handler(signal.SIGUSR1, self._handle_SIGUSR1) for accounting_module in self.accounting: accounting_module.start() reactor.run(installSignalHandlers=False) def send_command(self, command): return maybeDeferred(self.relay_factory.send_command, command) def update_statistics(self, session, stats): session.logger.info('statistics: {}'.format(stats)) if stats['start_time'] is not None: for accounting in self.accounting: try: accounting.do_accounting(stats) except Exception, e: log.exception('An unhandled error occurred while doing accounting: %s' % e) def _handle_SIGHUP(self, *args): log.info('Received SIGHUP, shutting down.') reactor.callFromThread(self._shutdown) def _handle_SIGINT(self, *args): if process.daemon: log.info('Received SIGINT, shutting down.') else: log.info('Received KeyboardInterrupt, exiting.') reactor.callFromThread(self._shutdown) def _handle_SIGTERM(self, *args): log.info('Received SIGTERM, shutting down.') reactor.callFromThread(self._shutdown) + def _handle_SIGUSR1(self, *args): + if log.level.current != log.level.DEBUG: + log.level.current = log.level.DEBUG + log.info('Switched logging level to DEBUG') + else: + log.info('Switched logging level to {}'.format(DispatcherConfig.log_level)) + log.level.current = DispatcherConfig.log_level + def _shutdown(self): defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None]) defer.addCallback(lambda x: self.opensips_factory.shutdown()) defer.addCallback(lambda x: self.management_factory.shutdown()) defer.addCallback(lambda x: self.relay_factory.shutdown()) defer.addCallback(lambda x: self._stop()) def _stop(self): for act in self.accounting: act.stop() reactor.stop() diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py index 0302ade..87a7d6f 100644 --- a/mediaproxy/relay.py +++ b/mediaproxy/relay.py @@ -1,392 +1,400 @@ """Implementation of the MediaProxy relay""" import cjson import signal import resource try: from twisted.internet import epollreactor; epollreactor.install() except: raise RuntimeError('mandatory epoll reactor support is not available from the twisted framework') from application import log from application.process import process from gnutls.errors import CertificateError, CertificateSecurityError from gnutls.interfaces.twisted import TLSContext from time import time from twisted.protocols.basic import LineOnlyReceiver from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError from twisted.internet.protocol import ClientFactory, connectionDone from twisted.internet.defer import DeferredList, succeed from twisted.internet import reactor from twisted.python import failure from twisted.names import dns from twisted.names.client import lookupService from twisted.names.error import DomainError from mediaproxy import __version__ from mediaproxy.configuration import RelayConfig from mediaproxy.headers import DecodingDict, DecodingError from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials # Increase the system limit for the maximum number of open file descriptors # to be able to handle connections to all ports in port_range fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000 try: resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit)) except ValueError: raise RuntimeError('Cannot set resource limit for maximum open file descriptors to %d' % fd_limit) else: new_limits = resource.getrlimit(resource.RLIMIT_NOFILE) if new_limits < (fd_limit, fd_limit): raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit)) else: log.info('Set resource limit for maximum open file descriptors to %d' % fd_limit) class RelayClientProtocol(LineOnlyReceiver): noisy = False required_headers = {'update': {'call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type'}, 'remove': {'call_id', 'from_tag'}, 'summary': set(), 'sessions': set()} def __init__(self): self.command = None self.seq = None self.headers = DecodingDict() self._connection_watcher = None self._queued_keepalives = 0 def _send_keepalive(self): if self._queued_keepalives >= 3: log.error('missed 3 keepalive answers in a row. assuming the connection is down.') # do not use loseConnection() as it waits to flush the output buffers. reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) return None self.transport.write('ping' + self.delimiter) self._queued_keepalives += 1 return KeepRunning def reply(self, reply): self.transport.write(reply + self.delimiter) def connectionMade(self): peer = self.transport.getPeer() log.info('Connected to dispatcher at %s:%d' % (peer.host, peer.port)) if RelayConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not RelayConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive) def connectionLost(self, reason=connectionDone): if self._connection_watcher is not None: self._connection_watcher.cancel() self._connection_watcher = None self._queued_keepalives = 0 def lineReceived(self, line): if line == 'pong': self._queued_keepalives -= 1 return if self.command is None: try: command, seq = line.split() except ValueError: log.error('Could not decode command/sequence number pair from dispatcher: %s' % line) return if command in self.required_headers: self.command = command self.seq = seq self.headers = DecodingDict() else: log.error('Unknown command: %s' % command) self.reply('{} error'.format(seq)) elif line == '': missing_headers = self.required_headers[self.command].difference(self.headers) if missing_headers: for header in missing_headers: log.error('Missing mandatory header %r from %r command' % (header, self.command)) response = 'error' else: # noinspection PyBroadException try: response = self.factory.parent.got_command(self.factory.host, self.command, self.headers) except Exception: log.exception() response = 'error' self.reply('{} {}'.format(self.seq, response)) self.command = None else: try: name, value = line.split(": ", 1) except ValueError: log.error('Unable to parse header: %s' % line) else: try: self.headers[name] = value except DecodingError, e: log.error('Could not decode header: %s' % e) class DispatcherConnectingFactory(ClientFactory): noisy = False protocol = RelayClientProtocol def __init__(self, parent, host, port): self.parent = parent self.host = (host, port) self.delayed = None self.connection_lost = False def __eq__(self, other): return self.host == other.host def clientConnectionFailed(self, connector, reason): log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value)) if self.parent.connector_needs_reconnect(connector): self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) def clientConnectionLost(self, connector, reason): self.cancel_delayed() if reason.type != ConnectionDone: log.error('Connection with dispatcher at %(host)s:%(port)d was lost: %%s' % connector.__dict__ % reason.value) else: log.info('Connection with dispatcher at %(host)s:%(port)d was closed' % connector.__dict__) if self.parent.connector_needs_reconnect(connector): if isinstance(reason.value, CertificateError) or self.connection_lost: self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) else: self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect) self.connection_lost = True def buildProtocol(self, addr): self.delayed = reactor.callLater(5, self._connected_successfully) return ClientFactory.buildProtocol(self, addr) def _connected_successfully(self): self.connection_lost = False def cancel_delayed(self): if self.delayed: if self.delayed.active(): self.delayed.cancel() self.delayed = None class SRVMediaRelayBase(object): - def __init__(self): self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup) self._do_lookup() def _do_lookup(self): defers = [] for addr, port, is_domain in RelayConfig.dispatchers: if is_domain: defer = lookupService("_sip._udp.%s" % addr) defer.addCallback(self._cb_got_srv, port) defer.addErrback(self._eb_no_srv, addr, port) defers.append(defer) else: defers.append(succeed((addr, port))) defer = DeferredList(defers) defer.addCallback(self._cb_got_all) return KeepRunning def _cb_got_srv(self, (answers, auth, add), port): for answer in answers: if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."): return str(answer.payload.target), port raise DomainError def _eb_no_srv(self, failure, addr, port): failure.trap(DomainError) return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr) def _eb_no_dns(self, failure, addr): failure.trap(DNSLookupError) log.error("Could resolve neither SRV nor A record for '%s'" % addr) def _cb_got_all(self, results): if not self.shutting_down: dispatchers = [result[1] for result in results if result[0] and result[1] is not None] self.update_dispatchers(dispatchers) def update_dispatchers(self, dispatchers): raise NotImplementedError() def run(self): process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) + process.signals.add_handler(signal.SIGUSR1, self._handle_SIGUSR1) reactor.run(installSignalHandlers=False) def _handle_SIGHUP(self, *args): log.info('Received SIGHUP, shutting down after all sessions have expired.') reactor.callFromThread(self.shutdown, graceful=True) def _handle_SIGINT(self, *args): if process.daemon: log.info('Received SIGINT, shutting down.') else: log.info('Received KeyboardInterrupt, exiting.') reactor.callFromThread(self.shutdown) def _handle_SIGTERM(self, *args): log.info('Received SIGTERM, shutting down.') reactor.callFromThread(self.shutdown) + def _handle_SIGUSR1(self, *args): + if log.level.current != log.level.DEBUG: + log.level.current = log.level.DEBUG + log.info('Switched logging level to DEBUG') + else: + log.info('Switched logging level to {}'.format(RelayConfig.log_level)) + log.level.current = RelayConfig.log_level + def shutdown(self, graceful=False): raise NotImplementedError() def on_shutdown(self): pass def _shutdown(self): reactor.stop() self.on_shutdown() try: from mediaproxy.sipthor import SIPThorMediaRelayBase as MediaRelayBase except ImportError: MediaRelayBase = SRVMediaRelayBase class MediaRelay(MediaRelayBase): def __init__(self): self.cred = X509Credentials(cert_name='relay') self.tls_context = TLSContext(self.cred) self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end) self.dispatchers = set() self.dispatcher_session_count = {} self.dispatcher_connectors = {} self.old_connectors = {} self.shutting_down = False self.graceful_shutdown = False self.start_time = time() super(MediaRelay, self).__init__() @property def status(self): if self.graceful_shutdown or self.shutting_down: return 'halting' else: return 'active' def update_dispatchers(self, dispatchers): dispatchers = set(dispatchers) for new_dispatcher in dispatchers.difference(self.dispatchers): if new_dispatcher in self.old_connectors.iterkeys(): log.info('Restoring old dispatcher at %s:%d' % new_dispatcher) self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher) else: log.info('Adding new dispatcher at %s:%d' % new_dispatcher) dispatcher_addr, dispatcher_port = new_dispatcher factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port) self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.tls_context) for old_dispatcher in self.dispatchers.difference(dispatchers): log.info('Removing old dispatcher at %s:%d' % old_dispatcher) self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher) self._check_disconnect(old_dispatcher) self.dispatchers = dispatchers def got_command(self, dispatcher, command, headers): if command == 'summary': summary = {'ip': RelayConfig.relay_ip, 'version': __version__, 'status': self.status, 'uptime': int(time() - self.start_time), 'session_count': len(self.session_manager.sessions), 'stream_count': self.session_manager.stream_count, 'bps_relayed': self.session_manager.bps_relayed} return cjson.encode(summary) elif command == 'sessions': return cjson.encode(self.session_manager.statistics) elif command == 'update': if self.graceful_shutdown or self.shutting_down: if not self.session_manager.has_session(**headers): log.info('cannot add new session: media-relay is shutting down') return 'halting' try: local_media = self.session_manager.update_session(dispatcher, **headers) except RelayPortsExhaustedError: log.error('Could not reserve relay ports for session, all allocated ports are being used') return 'error' if local_media: return ' '.join([RelayConfig.advertised_ip or local_media[0][0]] + [str(media[1]) for media in local_media]) else: # command == 'remove' session = self.session_manager.remove_session(**headers) if session is None: return 'error' else: return cjson.encode(session.statistics) def session_expired(self, session): connector = self.dispatcher_connectors.get(session.dispatcher) if connector is None: connector = self.old_connectors.get(session.dispatcher) if connector and connector.state == 'connected': connector.transport.write(' '.join(['expired', cjson.encode(session.statistics)]) + connector.factory.protocol.delimiter) else: log.warning('dispatcher for expired session is no longer online, statistics are lost!') def add_session(self, dispatcher): self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1 def remove_session(self, dispatcher): self.dispatcher_session_count[dispatcher] -= 1 if self.dispatcher_session_count[dispatcher] == 0: del self.dispatcher_session_count[dispatcher] if self.graceful_shutdown and not self.dispatcher_session_count: self.shutdown() elif dispatcher in self.old_connectors: self._check_disconnect(dispatcher) def _check_disconnect(self, dispatcher): connector = self.old_connectors[dispatcher] if self.dispatcher_session_count.get(dispatcher, 0) == 0: old_state = connector.state connector.factory.cancel_delayed() connector.disconnect() if old_state == "disconnected": del self.old_connectors[dispatcher] if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() def connector_needs_reconnect(self, connector): if connector in self.dispatcher_connectors.values(): return True else: for dispatcher, old_connector in self.old_connectors.items(): if old_connector is connector: if self.dispatcher_session_count.get(dispatcher, 0) > 0: return True else: del self.old_connectors[dispatcher] break if self.shutting_down: if len(self.old_connectors) == 0: self._shutdown() return False def shutdown(self, graceful=False): if graceful: self.graceful_shutdown = True if self.dispatcher_session_count: return if not self.shutting_down: self.shutting_down = True self.srv_monitor.cancel() self.session_manager.cleanup() if len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() else: self.update_dispatchers([]) diff --git a/mediaproxy/sipthor.py b/mediaproxy/sipthor.py index 46fe799..427dd6d 100644 --- a/mediaproxy/sipthor.py +++ b/mediaproxy/sipthor.py @@ -1,62 +1,64 @@ """SIP Thor backend""" from application import log from gnutls.interfaces.twisted import TLSContext from thor.entities import ThorEntities, GenericThorEntity from thor.eventservice import EventServiceClient, ThorEvent from thor.tls import X509Credentials from mediaproxy import __version__ from mediaproxy.configuration import ThorNetworkConfig from mediaproxy.configuration.datatypes import DispatcherIPAddress from mediaproxy.relay import SRVMediaRelayBase if ThorNetworkConfig.domain is None: # SIP Thor is installed but disabled. Fake an ImportError to start in standalone media relay mode. log.warning('SIP Thor is installed but disabled from the configuration') raise ImportError('SIP Thor is disabled') class SIPThorMediaRelayBase(EventServiceClient, SRVMediaRelayBase): topics = ['Thor.Members'] def __init__(self): self.node = GenericThorEntity(ThorNetworkConfig.node_ip, ['media_relay'], version=__version__) self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) self.sipthor_dispatchers = [] self.additional_dispatchers = [] credentials = X509Credentials(cert_name='relay') tls_context = TLSContext(credentials) EventServiceClient.__init__(self, ThorNetworkConfig.domain, tls_context) SRVMediaRelayBase.__init__(self) def handle_event(self, event): if not self.shutting_down: sip_proxy_ips = [node.ip for node in ThorEntities(event.message, role='sip_proxy')] self.sipthor_dispatchers = [(ip, DispatcherIPAddress.default_port) for ip in sip_proxy_ips] self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers) def _cb_got_all(self, results): if not self.shutting_down: self.additional_dispatchers = [result[1] for result in results if result[0] and result[1] is not None] self.update_dispatchers(self.sipthor_dispatchers + self.additional_dispatchers) def update_dispatchers(self, dispatchers): raise NotImplementedError() def _handle_SIGHUP(self, *args): SRVMediaRelayBase._handle_SIGHUP(self, *args) def _handle_SIGINT(self, *args): SRVMediaRelayBase._handle_SIGINT(self, *args) def _handle_SIGTERM(self, *args): SRVMediaRelayBase._handle_SIGTERM(self, *args) + def _handle_SIGUSR1(self, *args): + SRVMediaRelayBase._handle_SIGUSR1(self, *args) + def shutdown(self, graceful=False): raise NotImplementedError() -