diff --git a/config.ini.sample b/config.ini.sample index 2d8a124..0831e86 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -1,237 +1,242 @@ [Relay] ; A list of dispatchers to connect to, separated by spaces. The format is ; "host[:port] [host[:port] ...]". If a port is not specified the default port ; of 25060 will be used. "host" can be one of the following: ; - A domain name that has a SRV record for a SIP proxy, i.e. at ; "_sip._udp.". If the DNS lookup for this succeeds the relay ; will connect to the IP address of the SIP proxy on the port specified in ; this configuration. ; - A hostname. The lookup for this will be performed if the SRV lookup ; fails. ; - An IP address. The relay will connect directly to this address. ; Both the SRV and hostname lookups will be periodically refreshed (see ; "dns_check_interval" below). ; ;dispatchers = example.com 1.2.3.4:12345 ; Specify extra checks to be performed on the dispatcher TLS credentials before ; considering the connection with the dispatcher succesful. The passport is ; specified as a list of attribute/value pairs in the form: ; AN:value[, AN:value...] ; where the attribute name (AN) is one of the available attribute names from ; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a ; string that has to match with the corresponding attribute value from the ; dispatcher certificate. A wildcard (*) can be used in the value at the ; beginning or the end of the string to indicate that the corresponding ; attribute from the dispatcher certificate must end with respectively to ; start with the given string (excluding the wildcard). ; For example using this passport: ; passport = O:AG Projects, CN:*dispatcher ; means that a connection with a dispatcher will only be accepted if the ; dispatcher certificate subject has organization set to "AG Projects" and ; the common name ends with "dispatcher". To specify that no additional ; identity checks need to be performed, use the keyword None. If passport ; is None, then only the certificate signature is verified agains the ; certificate authority in tls/ca.pem (signature is always verified even ; when passport is None). ; ; Default value is None. ; ;passport = None ; The host IP address used for relaying streams. The default for this value ; is to use the IP address of the interface that has the default route. This ; is the most appropriate choice for almost any situation. Unless you need to ; use a very specific interface, which is not the default one, there is no need ; to set this option. Leave this option commented to use the default value. ;relay_ip = +; The host IP address to return when a session is allocated in th relay. This +; could be of use in case the relay is behind NAT but it has a 1 to 1 mapping +; with a public IP address, like Amazon EC2, for example. +;advertised_ip = + ; The port range to use for relaying media streams in the form start:end with ; start and end being even numbers in the [1024, 65536] range and start < end ; The default range is 50000:60000. You should allocate 4 times the number of ; streams you plan for the relay to handle simultaneously. The default range ; having 10000 ports, is able to handle up to 2500 streams. ; ;port_range = 50000:60000 ; The minimum level log messages need to have in order to appear in syslog ; or on the console, depending on the mode the relay is running in. ; In order of severity, this can be one of CRITICAL, ERROR, WARNING, INFO or ; DEBUG. ;log_level = DEBUG ; The amount of time to wait for a stream in a new SDP offer to start sending ; data before the relay decides that it has timed out. The default value is 90 ; seconds. This only applies to the initial setup stage, before the first ; packet for a stream is received (from both ends). After the stream is started ; and the conntrack rule is in place, the idle timeout (how long before the ; conntrack rule expires when no traffic is received) is controlled by a kernel ; setting that defaults to 180 seconds and can be adjusted in: ; /proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream ; ;stream_timeout = 90 ; Amount of time a call can be on hold before it is declared expired by the ; relay. The default value is 7200 seconds (2 hours). ; ;on_hold_timeout = 7200 ; How often to check in DNS if the SRV and A records for the dispatcher have ; changed. Interval is in seconds and the default value is 60 seconds. ; ;dns_check_interval = 60 ; If the relay cannot connect to a dispatcher is should retry after this ; amount of seconds. The default value is 10 seconds. ; ;reconnect_delay = 10 ; How often to sample the aggregate ammount of data processed by the relay, in ; order to compute an average of the relayed traffic over that period. The ; value is expressed in seconds and the default value is 15 seconds. ; Use 0 to disable it in case you have to many streams processed by the relay ; and it warns you in syslog that gathering this information takes too long. ; ;traffic_sampling_period = 15 [Dispatcher] ; Local socket on which to communicate with OpenSIPS. The OpenSIPS mediaproxy ; module should be configured to connect to this socket. If a relative path, ; /var/run/mediaproxy will be prepended. Default value is dispatcher.sock. ; ;socket_path = dispatcher.sock ; Listen address for incoming connections from the relays. The format is ; "ip[:port]". If the ip is "0.0.0.0" or the keyword "any", the dispatcher ; will listen on all interfaces of this host. If the port is not specified, ; the dispatcher will listen on the default port of 25060. ; ;listen = 0.0.0.0 ; Listen address for incoming management interface connections. Clients can ; connect to this and issue commands to query the status of the relays and ; their sessions. The format is "ip[:port]". If the ip is "0.0.0.0" or the ; keyword "any", the dispatcher will listen on all interfaces of this host. ; If the port is not specified, the dispatcher will listen on the default ; port of 25061. ; ;listen_management = 0.0.0.0 ; Whether or not to use TLS on the management interface. Note that the same ; TLS credentials are used for both the relay and the management interface ; connections. ; ; Default value is yes. ; ;management_use_tls = yes ; Specify extra checks to be performed on the relay TLS credentials before ; considering the connection with the relay succesful. The passport is ; specified as a list of attribute/value pairs in the form: ; AN:value[, AN:value...] ; where the attribute name (AN) is one of the available attribute names from ; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a ; string that has to match with the corresponding attribute value from the ; relay certificate. A wildcard (*) can be used in the value at the beginning ; or the end of the string to indicate that the corresponding attribute from ; the relay certificate must end with respectively to start with the given ; string (excluding the wildcard). ; For example using this passport: ; passport = O:AG Projects, CN:relay* ; means that a connection with a relay will only be accepted if the relay ; certificate subject has organization set to "AG Projects" and the common ; name starts with "relay". To specify that no additional identity checks ; need to be performed, use the keyword None. If passport is None, then only ; the certificate signature is verified agains the certificate authority in ; tls/ca.pem (signature is always verified even when passport is None). ; ; Default value is None. ; ;passport = None ; This option is similar to passport above, but applies to the management ; interface connections instead of relay connections. It specifies extra ; checks to be performed on the TLS credentials suplied by an entity that ; connects to the management interface. Please consult passport above for ; a detailed description of the possible values for this option. ; ; If management_use_tls is false, this option is ignored. ; ; Default value is None. ; ;management_passport = None ; The minimum level log messages need to have in order to appear in syslog ; or on the console, depending on the mode the dispatcher is running in. ; In order of severity, this can be one of CRITICAL, ERROR, WARNING, INFO or ; DEBUG. ;log_level = DEBUG ; Timeout value in second for individual relays. When a command is sent from ; the dispatcher to a relay it will wait this amount of seconds for a reply. ; The default is 5 seconds. ; ;relay_timeout = 5 ; A comma separated list of accounting backends that will be used to save ; accounting data with the session information once a session has finished. ; Currently 2 backends are available: "radius" and "database". If enabled ; they can be configured below in their respective sections. The default ; is to use no accounting backend. ; ;accounting = [TLS] ; Path to the certificates. If relative, it will be looked up in both the ; application directory (for a standalone installation) and /etc/mediaproxy, ; the former taking precedence if found. ; ;certs_path = tls ; How often (in seconds) to verify the peer certificate for expiration and ; revocation. Default value is 300 seconds (5 minutes) ; ;verify_interval = 300 [Database] ; This section needs to be configured if database accounting is enabled ; Database URI in the form: scheme://user:password@host/database ;dburi = mysql://mediaproxy:CHANGEME@localhost/mediaproxy ; Name for the table. ;sessions_table = media_sessions ; Column names. Columns are strings except for info which is a BLOB ; ;callid_column = call_id ;fromtag_column = from_tag ;totag_column = to_tag ;info_column = info [Radius] ; This section needs to be configured if radius accounting is enabled ; OpenSIPS RADIUS configuration file. All RADIUS cofiguration parameters will ; be read from this file, including dictionary files. ; ;config_file = /etc/opensips/radius/client.conf ; Additional dictionary file with MediaProxy specific attributes. ;additional_dictionary = radius/dictionary [OpenSIPS] ; Configure interaction between the media dispatcher and OpenSIPS ; Path to OpenSIPS's UNIX filesystem socket from the mi_datagram module. ;socket_path = /var/run/opensips/socket ; Maximum number of connections to open with OpenSIPS's mi_datagram socket. ; Please note that connections will be opened on a need basis depending on ; load, but never more than the number configured below. ; ;max_connections = 10 diff --git a/mediaproxy/configuration/__init__.py b/mediaproxy/configuration/__init__.py index 30fc16b..0cce396 100644 --- a/mediaproxy/configuration/__init__.py +++ b/mediaproxy/configuration/__init__.py @@ -1,88 +1,89 @@ # Copyright (C) 2008-2014 AG Projects # from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import IPAddress from application.system import host from mediaproxy import configuration_filename from mediaproxy.configuration.datatypes import AccountingModuleList, DispatcherIPAddress, DispatcherAddressList, DispatcherManagementAddress, PortRange, PositiveInteger, SIPThorDomain, X509NameValidator class DispatcherConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Dispatcher' socket_path = "dispatcher.sock" listen = ConfigSetting(type=DispatcherIPAddress, value=DispatcherIPAddress("any")) listen_management = ConfigSetting(type=DispatcherManagementAddress, value=DispatcherManagementAddress("any")) relay_timeout = 5 # How much to wait for an answer from a relay relay_recover_interval = 60 # How much to wait for an unresponsive relay to recover, before disconnecting it cleanup_dead_relays_after = 43200 # 12 hours cleanup_expired_sessions_after = 86400 # 24 hours management_use_tls = True accounting = ConfigSetting(type=AccountingModuleList, value=[]) passport = ConfigSetting(type=X509NameValidator, value=None) management_passport = ConfigSetting(type=X509NameValidator, value=None) class RelayConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Relay' relay_ip = ConfigSetting(type=IPAddress, value=host.default_ip) + advertised_ip = ConfigSetting(type=IPAddress, value=None) stream_timeout = 90 on_hold_timeout = 7200 traffic_sampling_period = 15 userspace_transmit_every = 1 dispatchers = ConfigSetting(type=DispatcherAddressList, value=[]) port_range = PortRange("50000:60000") dns_check_interval = PositiveInteger(60) keepalive_interval = PositiveInteger(10) reconnect_delay = PositiveInteger(10) passport = ConfigSetting(type=X509NameValidator, value=None) class OpenSIPSConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'OpenSIPS' socket_path = '/var/run/opensips/socket' max_connections = 10 class RadiusConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Radius' config_file = "/etc/opensips/radius/client.conf" additional_dictionary = "radius/dictionary" class DatabaseConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Database' dburi = "" sessions_table = "media_sessions" callid_column = "call_id" fromtag_column = "from_tag" totag_column = "to_tag" info_column = "info" class TLSConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'TLS' certs_path = 'tls' verify_interval = 300 class ThorNetworkConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'ThorNetwork' domain = ConfigSetting(type=SIPThorDomain, value=None) node_ip = host.default_ip diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py index 70d745b..76e6c72 100644 --- a/mediaproxy/relay.py +++ b/mediaproxy/relay.py @@ -1,389 +1,389 @@ # Copyright (C) 2008 AG Projects # Author: Ruud Klaver # """Implementation of the MediaProxy relay""" import cjson import signal import resource from time import time try: from twisted.internet import epollreactor; epollreactor.install() except: raise RuntimeError("mandatory epoll reactor support is missing from the twisted framework") from application import log from application.process import process from gnutls.errors import CertificateError, CertificateSecurityError from twisted.protocols.basic import LineOnlyReceiver from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError from twisted.internet.protocol import ClientFactory from twisted.internet.defer import DeferredList, succeed from twisted.internet import reactor from twisted.python import failure from twisted.names import dns from twisted.names.client import lookupService from twisted.names.error import DomainError from mediaproxy import __version__ from mediaproxy.configuration import RelayConfig from mediaproxy.headers import DecodingDict, DecodingError from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials ## Increase the system limit for the maximum number of open file descriptors ## to be able to handle connections to all ports in port_range try: fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000 resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit)) except ValueError: raise RuntimeError("Cannot set resource limit for maximum open file descriptors to %d" % fd_limit) else: new_limits = resource.getrlimit(resource.RLIMIT_NOFILE) if new_limits < (fd_limit, fd_limit): raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit)) else: log.msg("Set resource limit for maximum open file descriptors to %d" % fd_limit) class RelayClientProtocol(LineOnlyReceiver): noisy = False required_headers = {'update': set(['call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type']), 'remove': set(['call_id', 'from_tag']), 'summary': set(), 'sessions': set()} def __init__(self): self.command = None self.seq = None self._connection_watcher = None self._queued_keepalives = 0 def _send_keepalive(self): if self._queued_keepalives >= 3: # 3 keepalives in a row didn't get an answer. assume connection is down. log.error("missed 3 keepalive answers in a row. assuming the connection is down.") # do not use loseConnection() as it waits to flush the output buffers. reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) return None self.transport.write("ping\r\n") self._queued_keepalives += 1 return KeepRunning def connectionMade(self): peer = self.transport.getPeer() log.debug("Connected to dispatcher at %s:%d" % (peer.host, peer.port)) if RelayConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() if not RelayConfig.passport.accept(peer_cert): self.transport.loseConnection(CertificateSecurityError('peer certificate not accepted')) self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive) def connectionLost(self, reason): if self._connection_watcher is not None: self._connection_watcher.cancel() self._connection_watcher = None self._queued_keepalives = 0 def lineReceived(self, line): if line == 'pong': self._queued_keepalives -= 1 return if self.command is None: try: command, seq = line.split() except ValueError: log.error("Could not decode command/sequence number pair from dispatcher: %s" % line) return if command in self.required_headers: self.command = command self.seq = seq self.headers = DecodingDict() else: log.error("Unknown command: %s" % command) self.transport.write("%s error\r\n" % seq) elif line == "": try: missing_headers = self.required_headers[self.command].difference(self.headers) if missing_headers: for header in missing_headers: log.error("Missing mandatory header '%s' from '%s' command" % (header, self.command)) response = "error" else: try: response = self.factory.parent.got_command(self.factory.host, self.command, self.headers) except: log.err() response = "error" finally: self.transport.write("%s %s\r\n" % (self.seq, response)) self.command = None else: try: name, value = line.split(": ", 1) except ValueError: log.error("Unable to parse header: %s" % line) else: try: self.headers[name] = value except DecodingError, e: log.error("Could not decode header: %s" % e) class DispatcherConnectingFactory(ClientFactory): noisy = False protocol = RelayClientProtocol def __init__(self, parent, host, port): self.parent = parent self.host = (host, port) self.delayed = None self.connection_lost = False def __eq__(self, other): return self.host == other.host def clientConnectionFailed(self, connector, reason): log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value)) if self.parent.connector_needs_reconnect(connector): self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) def clientConnectionLost(self, connector, reason): self.cancel_delayed() if reason.type != ConnectionDone: log.error("Connection with dispatcher at %(host)s:%(port)d was lost: %%s" % connector.__dict__ % reason.value) else: log.msg("Connection with dispatcher at %(host)s:%(port)d was closed" % connector.__dict__) if self.parent.connector_needs_reconnect(connector): if isinstance(reason.value, CertificateError) or self.connection_lost: self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) else: self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect) self.connection_lost = True def buildProtocol(self, addr): self.delayed = reactor.callLater(5, self._connected_successfully) return ClientFactory.buildProtocol(self, addr) def _connected_successfully(self): self.connection_lost = False def cancel_delayed(self): if self.delayed: if self.delayed.active(): self.delayed.cancel() self.delayed = None class SRVMediaRelayBase(object): def __init__(self): self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup) self._do_lookup() def _do_lookup(self): defers = [] for addr, port, is_domain in RelayConfig.dispatchers: if is_domain: defer = lookupService("_sip._udp.%s" % addr) defer.addCallback(self._cb_got_srv, port) defer.addErrback(self._eb_no_srv, addr, port) defers.append(defer) else: defers.append(succeed((addr, port))) defer = DeferredList(defers) defer.addCallback(self._cb_got_all) return KeepRunning def _cb_got_srv(self, (answers, auth, add), port): for answer in answers: if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."): return str(answer.payload.target), port raise DomainError def _eb_no_srv(self, failure, addr, port): failure.trap(DomainError) return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr) def _eb_no_dns(self, failure, addr): failure.trap(DNSLookupError) log.error("Could resolve neither SRV nor A record for '%s'" % addr) def _cb_got_all(self, results): if not self.shutting_down: dispatchers = [result[1] for result in results if result[0] and result[1] is not None] self.update_dispatchers(dispatchers) def update_dispatchers(self, dispatchers): raise NotImplementedError() def run(self): process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) reactor.run(installSignalHandlers=False) def _handle_SIGHUP(self, *args): log.msg("Received SIGHUP, shutting down after all sessions have expired.") reactor.callFromThread(self.shutdown, graceful=True) def _handle_SIGINT(self, *args): if process._daemon: log.msg("Received SIGINT, shutting down.") else: log.msg("Received KeyboardInterrupt, exiting.") reactor.callFromThread(self.shutdown) def _handle_SIGTERM(self, *args): log.msg("Received SIGTERM, shutting down.") reactor.callFromThread(self.shutdown) def shutdown(self, graceful=False): raise NotImplementedError() def on_shutdown(self): pass def _shutdown(self): reactor.stop() self.on_shutdown() try: from mediaproxy.sipthor import SIPThorMediaRelayBase MediaRelayBase = SIPThorMediaRelayBase except ImportError: MediaRelayBase = SRVMediaRelayBase class MediaRelay(MediaRelayBase): def __init__(self): self.cred = X509Credentials(cert_name='relay') self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end) self.dispatchers = set() self.dispatcher_session_count = {} self.dispatcher_connectors = {} self.old_connectors = {} self.shutting_down = False self.graceful_shutdown = False self.start_time = time() MediaRelayBase.__init__(self) @property def status(self): if self.graceful_shutdown or self.shutting_down: return 'halting' else: return 'active' def update_dispatchers(self, dispatchers): dispatchers = set(dispatchers) for new_dispatcher in dispatchers.difference(self.dispatchers): if new_dispatcher in self.old_connectors.iterkeys(): log.debug('Restoring old dispatcher at %s:%d' % new_dispatcher) self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher) else: log.debug('Adding new dispatcher at %s:%d' % new_dispatcher) dispatcher_addr, dispatcher_port = new_dispatcher factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port) self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.cred) for old_dispatcher in self.dispatchers.difference(dispatchers): log.debug('Removing old dispatcher at %s:%d' % old_dispatcher) self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher) self._check_disconnect(old_dispatcher) self.dispatchers = dispatchers def got_command(self, dispatcher, command, headers): if command == "summary": summary = {'ip' : RelayConfig.relay_ip, 'version' : __version__, 'status' : self.status, 'uptime' : int(time() - self.start_time), 'session_count' : len(self.session_manager.sessions), 'stream_count' : self.session_manager.stream_count, 'bps_relayed' : self.session_manager.bps_relayed} return cjson.encode(summary) elif command == "sessions": return cjson.encode(self.session_manager.statistics) elif command == "update": if self.graceful_shutdown or self.shutting_down: if not self.session_manager.has_session(**headers): log.debug("cannot add new session: media-relay is shutting down") return 'halting' try: local_media = self.session_manager.update_session(dispatcher, **headers) except RelayPortsExhaustedError: log.error("Could not reserve relay ports for session, all allocated ports are being used") return "error" if local_media: - return " ".join([local_media[0][0]] + [str(media[1]) for media in local_media]) + return " ".join([RelayConfig.advertised_ip or local_media[0][0]] + [str(media[1]) for media in local_media]) else: # remove session = self.session_manager.remove_session(**headers) if session is None: return "error" else: return cjson.encode(session.statistics) def session_expired(self, session): connector = self.dispatcher_connectors.get(session.dispatcher) if connector is None: connector = self.old_connectors.get(session.dispatcher) if connector and connector.state == "connected": connector.transport.write(" ".join(["expired", cjson.encode(session.statistics)]) + "\r\n") else: log.warn("dispatcher for expired session is no longer online, statistics are lost!") def add_session(self, dispatcher): self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1 def remove_session(self, dispatcher): self.dispatcher_session_count[dispatcher] -= 1 if self.dispatcher_session_count[dispatcher] == 0: del self.dispatcher_session_count[dispatcher] if self.graceful_shutdown and not self.dispatcher_session_count: self.shutdown() elif dispatcher in self.old_connectors: self._check_disconnect(dispatcher) def _check_disconnect(self, dispatcher): connector = self.old_connectors[dispatcher] if self.dispatcher_session_count.get(dispatcher, 0) == 0: old_state = connector.state connector.factory.cancel_delayed() connector.disconnect() if old_state == "disconnected": del self.old_connectors[dispatcher] if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() def connector_needs_reconnect(self, connector): if connector in self.dispatcher_connectors.values(): return True else: for dispatcher, old_connector in self.old_connectors.items(): if old_connector is connector: if self.dispatcher_session_count.get(dispatcher, 0) > 0: return True else: del self.old_connectors[dispatcher] break if self.shutting_down: if len(self.old_connectors) == 0: self._shutdown() return False def shutdown(self, graceful=False): if graceful: self.graceful_shutdown = True if self.dispatcher_session_count: return if not self.shutting_down: self.shutting_down = True self.srv_monitor.cancel() self.session_manager.cleanup() if len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown() else: self.update_dispatchers([])