diff --git a/README b/README index 6596de1..a02f0e2 100644 --- a/README +++ b/README @@ -1,348 +1,378 @@ MediaProxy ---------- Authors: Ruud Klaver, Dan Pascu, Saul Ibarra Home page: http://mediaproxy.ag-projects.com License ------- This software is licensed according to the GNU General Public License version 2. See LICENSE file for more details. For other licensing options please contact sales-request@ag-projects.com Description ----------- MediaProxy is a media relay for RTP/RTCP and UDP streams that works in tandem with OpenSIPS to provide NAT traversal capability for media streams from SIP user agents located behind NAT. When using MediaProxy, NAT traversal for RTP media will work without any settings in the SIP User Agents or the NAT router. Features -------- - Scalability of thousands of calls per server limited only by the Linux kernel networking layer and network interface bandwidth - Supports multiple chained relays as long as each has a public IP - TLS encryption between the relays and dispatcher - T.38 fax support - Graceful shutdown capability - Automatic load balancing and redundancy among all media relays - Real-time sessions statistics - Configurable IP and UDP port range - Support for any combination of audio and video streams - Ability to use OpenSIPS' MI interface to close a call that did timeout - Radius accounting of IP network traffic - Database accounting of complete media information including all streams, their type, codecs and duration. - Supports ICE negotiation by behaving like a TURN relay candiate + - Supports routing media between multiple interfaces Background ----------- MediaProxy 2.0 is the second generation media relay application which is based on a completely new design that allows for major improvements in areas such as scalability (an order of magnitude more scalable than previous version) and security (communication between relay and dispatcher is encrypted). New features have been added to support call flows related to user mobility and fax transmission. Architecture ------------ MediaProxy consists of a dispatcher and one or more media relays. The dispatcher component always runs on the same host as OpenSIPS and communicates with its mediaproxy module through a UNIX domain socket. The relay(s) connect to the dispatcher using TLS. This relay component may be on the same or on a different host as OpenSIPS. There may be several relays for the dispatcher to choose from and a relay may service more than one dispatcher. When OpenSIPS requests that a call be relayed, the dispatcher will forward this request to one of its connected relays, along with some data from the SDP. The relay will allocate a set of UDP ports for this session, depending on the number of proposed streams. It will inform the dispatcher which ports it has allocated so that it may in turn notify the mediaproxy module of OpenSIPS, which will replace the relevant parts of the SDP. The same is done for any SIP messages from the callee, thus all the media streams will be sent through the relay. When the session between caller and callee has finished, either through a SIP BYE or because the media is no longer flowing and has timed out, the relay will send session information to the dispatcher, which can store this information using one or more accounting modules. The session information may also be queried using a management interface on the dispatcher. All of this is illustrated in the following diagram: +---+ +---+ | | +---------------------+ | | | | | SIP Proxy | | | | | | +----------+ | SIP | | | |<--+->| OpenSIPS |<------+------------------->| | | | | +----------+ | | | | | | ^ | | | | | | | UNIX socket | | | | C | | v | | C | | A | | +------------+ | +------------+ | A | | L | | | Dispatcher |<-----+-->| Management | | L | | L | | +------------+ TCP | | client | | L | | E | | ^ /TLS | +------------+ | E | | R | | | | | E | | | +---------+-----------+ | | | | | | | | | | TLS | | | | v | | | | +-------------+ UDP | | | |<---->| Relay |<----------------------->| | | | +-------------+ RTP / RTCP | | +---+ +---+ Please note that the accounting modules are not shown. Compatibility and pre-requisites -------------------------------- Both OpenSIPS and MediaProxy must use a public IP address. To run the software, you will need a server running the Linux Operating System using a kernel version 2.6.18 or higher that has been compiled with connection tracking support (conntrack). IPtables 1.4.3 or higher is also required. Because of this dependency on Linux, other operating systems are not supported. This dependency only applies to the media relay component. The dispatcher component which runs on the same host as OpenSIPS, can run on any platform that has a python interpreter and supports the twisted framework. Communication between the dispatcher and the relays uses TLS encryption and requires a set of X509 certificates to work. For more information about this please read tls/README which contains information about the sample certificates that are included as well as information about how to generate your own. MediaProxy is meant to be used together with OpenSIPS' mediaproxy module. This version of MediaProxy (2.0 or higher) cannot be used in combination with any version of OpenSIPS older than 1.4 or any components of MediaProxy older than 2.0. You must completely upgrade any previous installation of OpenSER to OpenSIPS to use this version of MediaProxy. No STUN or TURN support are required in the clients. The SIP User Agents must work symmetrically (that is to send and receive data on the same port for each stream), which is documented in RFC 4961. To display the history of the media streams CDRTool 6.5 or higher is required. Some features that were present in the previous version have been removed: - Support for specifying media relays per domain has been discontinued - Support for DNS records has been discontinued - Support for asymmetric clients has been discontinued - Support for other operating systems than Linux has been discontinued (only for the media relay, as the dispatcher has no such limitation) For information of how to install MediaProxy, please consult the INSTALL file. Important note -------------- For Linux kernels >= 4.9 and < 5.1 you must add a rule to trigger the connection tracking: sudo iptables -I INPUT -m state --state NEW Starting with kernel 5.1 you can enable enable_hooks parameter: modprobe nf_conntrack enable_hooks=1 or use the iptables rule above. For more information about this requirement see: https://github.com/torvalds/linux/commit/ba3fbe663635ae7b33a2d972c5d2def036258e42 Operation --------- Before the relay is run, please make sure that /proc/sys/net/ipv4/ip_forward is set to "1". Also for newer kernels ACCT on connection tracking needs to be enabled. Therefore /proc/sys/net/netfilter/nf_conntrack_acct must be set to "1". Both the dispatcher and the relay should be executed with root privileges. With no arguments, both applications will automatically fork into the background and log to syslog. They can remain in the foreground and log to console when given the --no-fork argument. The relay can be shut down in two ways. When receiving either an INT or TERM signal, the relay will terminate all of its sessions immediately and inform the dispatcher that those sessions have expired. When given the HUP signal, it will not accept any new sessions from the dispatcher and wait for all of the running sessions to expire, thus terminating gracefully. At the very least a set of TLS credentials is required. Sample certificates for this are included in the tls/ subdirectory. DO NOT USE THESE IN A PRODUCTION ENVIRONMENT, but only for testing purposes. For more information about TLS certificates and how to generate your own, check the tls/README file. Accounting ---------- MediaProxy is capable to do additional per call accounting with information related to the media streams used by the call. MediaProxy has a modular interface to the accounting system, allowing for new modules to be easily implemented. Currently it supports database and radius backends. Multiple backends can be configured and used simultaneously. Radius accounting ----------------- The radius backend logs very basic information about the media streams. The limited nature of the logged information is mainly given by the limitations imposed by the radius protocol to the data size. The information sent in the radius packet is shown below: Acct-Status-Type = "Update" User-Name = "mediaproxy@default" Acct-Session-Id = call_id Sip-From-Tag = from_tag Sip-To-Tag = to_tag Acct-Session-Time = call duration Acct-Input-Octets = bytes received from caller Acct-Output-Octets = bytes received from callee NAS-IP-Address = media-relay address Sip-User-Agents = caller + callee user agents Sip-Applications = "Audio", "Video", ... Media-Codecs = codecs used by streams (comma separated) Media-Info = "timeout" or "" Acct-Delay-Time = post dial delay (seconds from INVITE to 1st media packet) Database accounting ------------------- The database backend logs all the information related to the media streams that were created/closed during the whole session. This information is stored as a JSON encoded string in a BLOB column in the database, along with the call_id, from_tag and to_tag columns that can be used to retrieve the media information for a given call. The database table and column names are fully configurable in the database section of the configuration file. The table used to store these records, is automatically created by the media dispatcher on startup, if it's not present. For this to happen, the user that is configured in the dburi option in the database section, must have the CREATE and ALTER rights on the database specified in the same dburi. If this is not possible, then the media dispatcher will log an error indicating why it could not create the table and also output the table definition that can be used by some human operator to manually create the table. However, the recommended way is to grant the CREATE and ALTER privileges to the user in the dburi over the database specified in the same dburi. The database module uses SQLObject to access the database, which means it can work with a lot of databases, by simply changing the scheme in the dburi. Currently the following databases are supported: mysql, postgres, sqlite, firebird, maxdb, mssql and sybase. Closing expired calls --------------------- Starting with version 2.1.0, MediaProxy supports closing calls for which all the media streams did timeout, but for which no BYE was received to close the call in the standard way. This feature will only work, when the OpenSIPS mediaproxy module uses the engage_media_proxy() command to start MediaProxy for a given call. In this case the mediaproxy module uses the dialog module to keep track of the call and can pass the dialog id to the media dispatcher. When a media session is expired because all streams did timeout, but no closing request was received from the proxy, the media dispatcher will use the dialog id that was received from the mediaproxy module, to issue a dlg_end_dlg request into the OpenSIPS' MI interface, instructing OpenSIPS to generate the BYEs for the call, closing it in a clean way and generating the accounting records. To use this, the mi_datagram module must be loaded and configured to use a UNIX filesystem socket which must also be configured into the OpenSIPS section of the MediaProxy configuration as socket_path. This feature is not available when using the use_media_proxy/end_media_session functions in the proxy configuration, because in that case there is no dialog that is tracked by the proxy which could be terminated using dlg_end_dlg. +Multiple interfaces +------------------- + +When using MediaProxy, the default IP address of the relay machine will appear +in the c line of the SDP proposed to each party. + +On systems with multiple network interfaces, this IP address can be +automatically set with the IP addresss that coresponds to the interface that +has a route for the IP adress of each side of the call. + +In order to decide which network interface should be used, the mp_signaling_ip +avp in OpenSIPS configuration should be set as follows: + +$avp(mp_signaling_ip) = sourceIP_destinationIP + +The sourceIP is the IP address where the SIP INVITE originated from. The +destinationIP is the IP address where the SIP INVITE will be sent to. + +If destinationIP is not known, $avp(mp_signaling_ip) can be set only to +sourceIP. Otherwise, if the avp is not set, the source IP address of the +original SIP INVITE packet will be used. + +This behaviour can be enabled my setting auto_detect_interfaces to True in the +relay configuration. + +The IP address can also be always overwritten by configuring advertised_ip in +the relay configuration. If so, auto_detect_interfaces setting has no effect. + + Gracefull shutdown ------------------ To tell media-relay component to gracefully shutdown when using systemd: sudo systemctl reload mediaproxy-relay The reload command will send the HUP signal to the PID of the relay component and the software will shutdown when the last relayed call has ended. Management interface -------------------- The management interface will accept commands terminated by \r\n. It will return the results of the command, one per line, terminated by an empty line (also \r\n terminated). Currently two commands are supported: sessions : This will have the dispatcher query all of its connected relays for active sessions. For every sessions it finds it will return one line with a JSON encoded dictionary containing session information. summary : This will have the dispatcher present a summary of each of its connected relays. The results are returned as a JSON encoded dictionary, one line per relay. Free support ------------ MediaProxy is developed and supported by AG Projects. AG Projects offers best-effort free support for MediaProxy. "best-effort" means that we try to solve the bugs you report or help fix your problems as soon as we can, subject to available resources. You may report bugs or feature request to: users@lists.opensips.org A mailing list archive is available at: http://lists.opensips.org/cgi-bin/mailman/listinfo/users Commercial support ------------------ Visit http://ag-projects.com diff --git a/config.ini.sample b/config.ini.sample index 3913f32..538cf73 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -1,234 +1,240 @@ [Relay] ; A list of dispatchers to connect to, separated by spaces. The format is ; "host[:port] [host[:port] ...]". If a port is not specified the default port ; of 25060 will be used. "host" can be one of the following: ; - A domain name that has a SRV record for a SIP proxy, i.e. at ; "_sip._udp.". If the DNS lookup for this succeeds the relay ; will connect to the IP address of the SIP proxy on the port specified in ; this configuration. ; - A hostname. The lookup for this will be performed if the SRV lookup ; fails. ; - An IP address. The relay will connect directly to this address. ; Both the SRV and hostname lookups will be periodically refreshed (see ; "dns_check_interval" below). ; ;dispatchers = example.com 1.2.3.4:12345 ; Specify extra checks to be performed on the dispatcher TLS credentials before ; considering the connection with the dispatcher successful. The passport is ; specified as a list of attribute/value pairs in the form: ; AN:value[, AN:value...] ; where the attribute name (AN) is one of the available attribute names from ; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a ; string that has to match with the corresponding attribute value from the ; dispatcher certificate. A wildcard (*) can be used in the value at the ; beginning or the end of the string to indicate that the corresponding ; attribute from the dispatcher certificate must end with respectively to ; start with the given string (excluding the wildcard). ; For example using this passport: ; passport = O:AG Projects, CN:*dispatcher ; means that a connection with a dispatcher will only be accepted if the ; dispatcher certificate subject has organization set to "AG Projects" and ; the common name ends with "dispatcher". To specify that no additional ; identity checks need to be performed, use the keyword None. If passport ; is None, then only the certificate signature is verified against the ; certificate authority in tls/ca.pem (signature is always verified even ; when passport is None). ; ; Default value is None. ; ;passport = None ; The host IP address used for relaying streams. The default for this value ; is to use the IP address of the interface that has the default route. This ; is the most appropriate choice for almost any situation. Unless you need to ; use a very specific interface, which is not the default one, there is no need ; to set this option. Leave this option commented to use the default value. ;relay_ip = +; The IP address of the relay can be replaced with the IP address of the +; interface coreponding to the sourceIP and destinationIP of the call. +; if set to False the relay_ip will be used instead +;auto_detect_interfaces = False + ; The host IP address to return when a session is allocated in the relay. This ; could be of use in case the relay is behind NAT but it has a 1 to 1 mapping ; with a public IP address, like Amazon EC2, for example. +; If set, auto_detect_interfaces setting will be ignored. ;advertised_ip = ; The port range to use for relaying media streams in the form start:end with ; start and end being even numbers in the [1024, 65536] range and start < end ; The default range is 50000:60000. You should allocate 4 times the number of ; streams you plan for the relay to handle simultaneously. The default range ; having 10000 ports, is able to handle up to 2500 streams. ; ;port_range = 50000:60000 ; Logging level (one of CRITICAL, ERROR, WARNING, INFO or DEBUG) ;log_level = INFO ; The amount of time to wait for a stream in a new SDP offer to start sending ; data before the relay decides that it has timed out. The default value is 90 ; seconds. This only applies to the initial setup stage, before the first ; packet for a stream is received (from both ends). After the stream is started ; and the conntrack rule is in place, the idle timeout (how long before the ; conntrack rule expires when no traffic is received) is controlled by a kernel ; setting that defaults to 180 seconds and can be adjusted in: ; /proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream ; ;stream_timeout = 90 ; Amount of time a call can be on hold before it is declared expired by the ; relay. The default value is 7200 seconds (2 hours). ; ;on_hold_timeout = 7200 ; How often to check in DNS if the SRV and A records for the dispatcher have ; changed. Interval is in seconds and the default value is 60 seconds. ; ;dns_check_interval = 60 ; If the relay cannot connect to a dispatcher is should retry after this ; amount of seconds. The default value is 10 seconds. ; ;reconnect_delay = 10 ; How often to sample the aggregate amount of data processed by the relay, in ; order to compute an average of the relayed traffic over that period. The ; value is expressed in seconds and the default value is 15 seconds. ; Use 0 to disable it in case you have to many streams processed by the relay ; and it warns you in syslog that gathering this information takes too long. ; ;traffic_sampling_period = 15 ; Specify a list of network ranges (in CIDR notation) for which media is relayed ; even if no packet was received from the endpoint and the IP address is private. ;routable_private_ranges = 192.168.1.0/24 [Dispatcher] ; Local socket on which to communicate with OpenSIPS. The OpenSIPS mediaproxy ; module should be configured to connect to this socket. If a relative path, ; the runtime directory will be prepended. Default value is dispatcher.sock. ; ;socket_path = dispatcher.sock ; Listen address for incoming connections from the relays. The format is ; "ip[:port]". If the ip is "0.0.0.0" or the keyword "any", the dispatcher ; will listen on all interfaces of this host. If the port is not specified, ; the dispatcher will listen on the default port of 25060. ; ;listen = 0.0.0.0 ; Listen address for incoming management interface connections. Clients can ; connect to this and issue commands to query the status of the relays and ; their sessions. The format is "ip[:port]". If the ip is "0.0.0.0" or the ; keyword "any", the dispatcher will listen on all interfaces of this host. ; If the port is not specified, the dispatcher will listen on the default ; port of 25061. ; ;listen_management = 0.0.0.0 ; Whether or not to use TLS on the management interface. Note that the same ; TLS credentials are used for both the relay and the management interface ; connections. ; ; Default value is yes. ; ;management_use_tls = yes ; Specify extra checks to be performed on the relay TLS credentials before ; considering the connection with the relay successful. The passport is ; specified as a list of attribute/value pairs in the form: ; AN:value[, AN:value...] ; where the attribute name (AN) is one of the available attribute names from ; the X509 certificate subject: O, OU, CN, C, L, ST, EMAIL. The value is a ; string that has to match with the corresponding attribute value from the ; relay certificate. A wildcard (*) can be used in the value at the beginning ; or the end of the string to indicate that the corresponding attribute from ; the relay certificate must end with respectively to start with the given ; string (excluding the wildcard). ; For example using this passport: ; passport = O:AG Projects, CN:relay* ; means that a connection with a relay will only be accepted if the relay ; certificate subject has organization set to "AG Projects" and the common ; name starts with "relay". To specify that no additional identity checks ; need to be performed, use the keyword None. If passport is None, then only ; the certificate signature is verified against the certificate authority in ; tls/ca.pem (signature is always verified even when passport is None). ; ; Default value is None. ; ;passport = None ; This option is similar to passport above, but applies to the management ; interface connections instead of relay connections. It specifies extra ; checks to be performed on the TLS credentials supplied by an entity that ; connects to the management interface. Please consult passport above for ; a detailed description of the possible values for this option. ; ; If management_use_tls is false, this option is ignored. ; ; Default value is None. ; ;management_passport = None ; Logging level (one of CRITICAL, ERROR, WARNING, INFO or DEBUG) ;log_level = INFO ; Timeout value in second for individual relays. When a command is sent from ; the dispatcher to a relay it will wait this amount of seconds for a reply. ; The default is 5 seconds. ; ;relay_timeout = 5 ; A comma separated list of accounting backends that will be used to save ; accounting data with the session information once a session has finished. ; Currently 2 backends are available: "radius" and "database". If enabled ; they can be configured below in their respective sections. The default ; is to use no accounting backend. ; ;accounting = [TLS] ; Path to the certificates. If relative, it will be looked up in both the ; application directory (for a standalone installation) and /etc/mediaproxy, ; the former taking precedence if found. ; ;certs_path = tls ; How often (in seconds) to verify the peer certificate for expiration and ; revocation. Default value is 300 seconds (5 minutes) ; ;verify_interval = 300 [Database] ; This section needs to be configured if database accounting is enabled ; Database URI in the form: scheme://user:password@host/database ;dburi = mysql://mediaproxy:CHANGEME@localhost/mediaproxy ; Name for the table. ;sessions_table = media_sessions ; Column names. Columns are strings except for info which is a BLOB ; ;callid_column = call_id ;fromtag_column = from_tag ;totag_column = to_tag ;info_column = info [Radius] ; This section needs to be configured if radius accounting is enabled ; OpenSIPS RADIUS configuration file. All RADIUS configuration parameters ; will be read from this file, including dictionary files. ; ;config_file = /etc/opensips/radius/client.conf ; Additional dictionary file with MediaProxy specific attributes. ;additional_dictionary = radius/dictionary [OpenSIPS] ; Configure interaction between the media dispatcher and OpenSIPS ; Path to OpenSIPS's UNIX filesystem socket from the mi_datagram module. ;socket_path = /run/opensips/socket diff --git a/mediaproxy/configuration/__init__.py b/mediaproxy/configuration/__init__.py index f7599fb..60a3ba6 100644 --- a/mediaproxy/configuration/__init__.py +++ b/mediaproxy/configuration/__init__.py @@ -1,90 +1,91 @@ from application import log from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import IPAddress, LogLevel, NetworkRangeList from application.system import host from mediaproxy import configuration_file from mediaproxy.configuration.datatypes import AccountingModuleList, DispatcherIPAddress, DispatcherAddressList, DispatcherManagementAddress, PortRange, PositiveInteger, SIPThorDomain, X509NameValidator class DispatcherConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'Dispatcher' socket_path = 'dispatcher.sock' listen = ConfigSetting(type=DispatcherIPAddress, value=DispatcherIPAddress('any')) listen_management = ConfigSetting(type=DispatcherManagementAddress, value=DispatcherManagementAddress('any')) relay_timeout = 5 # How much to wait for an answer from a relay relay_recover_interval = 60 # How much to wait for an unresponsive relay to recover, before disconnecting it cleanup_dead_relays_after = 43200 # 12 hours cleanup_expired_sessions_after = 86400 # 24 hours management_use_tls = True accounting = ConfigSetting(type=AccountingModuleList, value=[]) passport = ConfigSetting(type=X509NameValidator, value=None) management_passport = ConfigSetting(type=X509NameValidator, value=None) log_level = ConfigSetting(type=LogLevel, value=log.level.INFO) class RelayConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'Relay' relay_ip = ConfigSetting(type=IPAddress, value=host.default_ip) advertised_ip = ConfigSetting(type=IPAddress, value=None) + auto_detect_interfaces = False stream_timeout = 90 on_hold_timeout = 7200 traffic_sampling_period = 15 userspace_transmit_every = 1 dispatchers = ConfigSetting(type=DispatcherAddressList, value=[]) port_range = PortRange('50000:60000') dns_check_interval = PositiveInteger(60) keepalive_interval = PositiveInteger(10) reconnect_delay = PositiveInteger(10) passport = ConfigSetting(type=X509NameValidator, value=None) routable_private_ranges = ConfigSetting(type=NetworkRangeList, value=[]) log_level = ConfigSetting(type=LogLevel, value=log.level.INFO) class OpenSIPSConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'OpenSIPS' socket_path = '/run/opensips/socket' location_table = 'location' class RadiusConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'Radius' config_file = '/etc/opensips/radius/client.conf' additional_dictionary = 'radius/dictionary' class DatabaseConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'Database' dburi = '' sessions_table = 'media_sessions' callid_column = 'call_id' fromtag_column = 'from_tag' totag_column = 'to_tag' info_column = 'info' class TLSConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'TLS' certs_path = 'tls' verify_interval = 300 class ThorNetworkConfig(ConfigSection): __cfgfile__ = configuration_file __section__ = 'ThorNetwork' domain = ConfigSetting(type=SIPThorDomain, value=None) node_ip = host.default_ip diff --git a/mediaproxy/mediacontrol.py b/mediaproxy/mediacontrol.py index 3d7d0f7..c218b22 100644 --- a/mediaproxy/mediacontrol.py +++ b/mediaproxy/mediacontrol.py @@ -1,839 +1,856 @@ import hashlib import struct from application import log +from application.system import host from base64 import b64encode as base64_encode from itertools import chain from collections import deque from operator import attrgetter from time import time from twisted.internet import reactor from twisted.internet.interfaces import IReadDescriptor from twisted.internet.protocol import DatagramProtocol from twisted.internet.error import CannotListenError from twisted.python.log import Logger from zope.interface import implementer from mediaproxy.configuration import RelayConfig from mediaproxy.interfaces.system import _conntrack from mediaproxy.iputils import is_routable_ip from mediaproxy.scheduler import RecurrentCall, KeepRunning UDP_TIMEOUT_FILE = '/proc/sys/net/netfilter/nf_conntrack_udp_timeout_stream' rtp_payloads = { 0: 'G711u', 1: '1016', 2: 'G721', 3: 'GSM', 4: 'G723', 5: 'DVI4', 6: 'DVI4', 7: 'LPC', 8: 'G711a', 9: 'G722', 10: 'L16', 11: 'L16', 14: 'MPA', 15: 'G728', 18: 'G729', 25: 'CelB', 26: 'JPEG', 28: 'nv', 31: 'H261', 32: 'MPV', 33: 'MP2T', 34: 'H263' } class RelayPortsExhaustedError(Exception): pass if RelayConfig.relay_ip is None: raise RuntimeError('Could not determine default host IP; either add default route or specify relay IP manually') class SessionLogger(log.ContextualLogger): def __init__(self, session): super(SessionLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend self.session_id = session.call_id def apply_context(self, message): return '[session {0.session_id}] {1}'.format(self, message) if message != '' else '' class Address(object): """Representation of an endpoint address""" def __init__(self, host, port, in_use=True, got_rtp=False): self.host = host self.port = port self.in_use = self.__bool__() and in_use self.got_rtp = got_rtp def __len__(self): return 2 def __bool__(self): return None not in (self.host, self.port) def __getitem__(self, index): return (self.host, self.port)[index] def __contains__(self, item): return item in (self.host, self.port) def __iter__(self): yield self.host yield self.port def __str__(self): return self.__bool__() and ('%s:%d' % (self.host, self.port)) or 'Unknown' def __repr__(self): return '%s(%r, %r, in_use=%r, got_rtp=%r)' % (self.__class__.__name__, self.host, self.port, self.in_use, self.got_rtp) def forget(self): self.host, self.port, self.in_use, self.got_rtp = None, None, False, False @property def unknown(self): return None in (self.host, self.port) @property def obsolete(self): return self.__bool__() and not self.in_use class Counters(dict): def __add__(self, other): n = Counters(self) for k, v in other.items(): n[k] += v return n def __iadd__(self, other): for k, v in other.items(): self[k] += v return self @property def caller_bytes(self): return self['caller_bytes'] @property def callee_bytes(self): return self['callee_bytes'] @property def caller_packets(self): return self['caller_packets'] @property def callee_packets(self): return self['callee_packets'] @property def relayed_bytes(self): return self['caller_bytes'] + self['callee_bytes'] @property def relayed_packets(self): return self['caller_packets'] + self['callee_packets'] class StreamListenerProtocol(DatagramProtocol): noisy = False def __init__(self): self.cb_func = None self.sdp = None self.send_packet_count = 0 self.stun_queue = [] def datagramReceived(self, data, addr): (host, port) = addr if self.cb_func is not None: self.cb_func(host, port, data) def set_remote_sdp(self, ip, port): if is_routable_ip(ip): self.sdp = ip, port else: self.sdp = None def send(self, data, is_stun, ip=None, port=None): if is_stun: self.stun_queue.append(data) if ip is None or port is None: # this means that we have not received any packets from this host yet, # so we have not learnt its address if self.sdp is None: # we can't do anything if we haven't received the SDP IP yet or # it was in a private range return ip, port = self.sdp # we learnt the IP, empty the STUN packets queue if self.stun_queue: for data in self.stun_queue: self.transport.write(data, (ip, port)) self.stun_queue = [] if not is_stun: if not self.send_packet_count % RelayConfig.userspace_transmit_every: self.transport.write(data, (ip, port)) self.send_packet_count += 1 def _stun_test(data): # Check if data is a STUN request and if it's a binding request if len(data) < 20: return False, False msg_type, msg_len, magic = struct.unpack('!HHI', data[:8]) if msg_type & 0xc == 0 and magic == 0x2112A442: if msg_type == 0x0001: return True, True else: return True, False else: return False, False class MediaSubParty(object): def __init__(self, substream, listener): self.substream = substream self.logger = substream.logger self.listener = listener self.listener.protocol.cb_func = self.got_data self.remote = Address(None, None) host = self.listener.protocol.transport.getHost() self.local = Address(host.host, host.port) self.timer = None self.codec = 'Unknown' self.got_stun_probing = False self.reset() def reset(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout) self.remote.in_use = False # keep remote address around but mark it as obsolete self.remote.got_rtp = False self.got_stun_probing = False self.listener.protocol.send_packet_count = 0 def before_hold(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = reactor.callLater(RelayConfig.on_hold_timeout, self.substream.expired, 'on hold timeout', RelayConfig.on_hold_timeout) def after_hold(self): if self.timer and self.timer.active(): self.timer.cancel() if not self.remote.in_use: self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout) def got_data(self, host, port, data): if (host, port) == tuple(self.remote): if self.remote.obsolete: # the received packet matches the previously used IP/port, # which has been made obsolete, so ignore it return else: if self.remote.in_use: # the received packet is different than the recorded IP/port, # so we will discard it return # we have learnt the remote IP/port self.remote.host, self.remote.port = host, port self.remote.in_use = True self.logger.info('discovered peer: %s' % self.substream.stream) is_stun, is_binding_request = _stun_test(data) self.substream.send_data(self, data, is_stun) if not self.remote.got_rtp and not is_stun: # This is the first RTP packet received self.remote.got_rtp = True if self.timer: if self.timer.active(): self.timer.cancel() self.timer = None if self.codec == 'Unknown' and self.substream is self.substream.stream.rtp: try: pt = data[1] & 127 except IndexError: pass else: if pt > 95: self.codec = 'Dynamic(%d)' % pt elif pt in rtp_payloads: self.codec = rtp_payloads[pt] else: self.codec = 'Unknown(%d)' % pt self.substream.check_create_conntrack() if is_binding_request: self.got_stun_probing = True def cleanup(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = None self.listener.protocol.cb_func = None self.substream = None class MediaSubStream(object): def __init__(self, stream, listener_caller, listener_callee): self.stream = stream self.logger = stream.logger self.forwarding_rule = None self.caller = MediaSubParty(self, listener_caller) self.callee = MediaSubParty(self, listener_callee) self._counters = Counters(caller_bytes=0, callee_bytes=0, caller_packets=0, callee_packets=0) @property def counters(self): """Accumulated counters from all the forwarding rules the stream had""" if self.forwarding_rule is None: return self._counters else: try: self.logger.debug(', '.join([f"{key}={self.forwarding_rule.counters[key]}" for key in self.forwarding_rule.counters.keys()])) return self._counters + self.forwarding_rule.counters except _conntrack.Error: return self._counters def _stop_relaying(self): if self.forwarding_rule is not None: try: self.logger.info(', '.join([f"{key}={self.forwarding_rule.counters[key]}" for key in self.forwarding_rule.counters.keys()])) self._counters += self.forwarding_rule.counters except _conntrack.Error: pass self.forwarding_rule = None def reset(self, party): if party == 'caller': self.caller.reset() else: self.callee.reset() self._stop_relaying() def check_create_conntrack(self): if self.stream.first_media_time is None: self.stream.first_media_time = time() if self.caller.remote.in_use and self.caller.remote.got_rtp and self.callee.remote.in_use and self.callee.remote.got_rtp: self.forwarding_rule = _conntrack.ForwardingRule(self.caller.remote, self.caller.local, self.callee.remote, self.callee.local, self.stream.session.mark) self.forwarding_rule.expired_func = self.conntrack_expired def send_data(self, source, data, is_stun): if source is self.caller: dest = self.callee else: dest = self.caller if dest.remote: # if we have already learnt the remote address of the destination, use that ip, port = dest.remote.host, dest.remote.port dest.listener.protocol.send(data, is_stun, ip, port) else: # otherwise use the IP/port specified in the SDP, if public dest.listener.protocol.send(data, is_stun) def conntrack_expired(self): try: timeout_wait = int(open(UDP_TIMEOUT_FILE).read()) except: timeout_wait = 0 self.expired('conntrack timeout', timeout_wait) def expired(self, reason, timeout_wait): self._stop_relaying() self.stream.substream_expired(self, reason, timeout_wait) def cleanup(self): self.caller.cleanup() self.callee.cleanup() self._stop_relaying() self.stream = None class MediaParty(object): - def __init__(self, stream): + def __init__(self, stream, party): self.manager = stream.session.manager self.logger = stream.logger self._remote_sdp = None self.is_on_hold = False self.uses_ice = False while True: self.listener_rtp = None self.ports = port_rtp, port_rtcp = self.manager.get_ports() + listen_ip = None + if RelayConfig.auto_detect_interfaces and not RelayConfig.advertised_ip: + if party == 'callee' and stream.session.destination_ip: + listen_ip = host.outgoing_ip_for(stream.session.destination_ip) + else: + listen_ip = host.outgoing_ip_for(stream.session.caller_ip) + try: - self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=RelayConfig.relay_ip) - self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=RelayConfig.relay_ip) + self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=listen_ip or RelayConfig.relay_ip) + self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=listen_ip or RelayConfig.relay_ip) except CannotListenError: if self.listener_rtp is not None: self.listener_rtp.stopListening() self.manager.set_bad_ports(self.ports) self.logger.warning('Cannot use port pair %d/%d' % self.ports) else: break def _get_remote_sdp(self): return self._remote_sdp def _set_remote_sdp(self, addr): (ip, port) = addr self._remote_sdp = ip, port self.listener_rtp.protocol.set_remote_sdp(ip, port) remote_sdp = property(_get_remote_sdp, _set_remote_sdp) def cleanup(self): self.listener_rtp.stopListening() self.listener_rtcp.stopListening() self.manager.free_ports(self.ports) self.manager = None class MediaStream(object): def __init__(self, session, media_type, media_ip, media_port, direction, media_parameters, initiating_party): self.is_alive = True self.session = session # type: Session self.logger = session.logger self.media_type = media_type - self.caller = MediaParty(self) - self.callee = MediaParty(self) + self.caller = MediaParty(self, 'caller') + self.callee = MediaParty(self, 'callee') self.rtp = MediaSubStream(self, self.caller.listener_rtp, self.callee.listener_rtp) self.rtcp = MediaSubStream(self, self.caller.listener_rtcp, self.callee.listener_rtcp) getattr(self, initiating_party).remote_sdp = (media_ip, media_port) getattr(self, initiating_party).uses_ice = (media_parameters.get('ice', 'no') == 'yes') self.check_hold(initiating_party, direction, media_ip) self.create_time = time() self.first_media_time = None self.start_time = None self.end_time = None self.status = 'active' self.timeout_wait = 0 def __str__(self): if self.caller.remote_sdp is None: src = 'Unknown' else: src = '%s:%d' % self.caller.remote_sdp if self.caller.is_on_hold: src += ' ON HOLD' if self.caller.uses_ice: src += ' (ICE)' if self.callee.remote_sdp is None: dst = 'Unknown' else: dst = '%s:%d' % self.callee.remote_sdp if self.callee.is_on_hold: dst += ' ON HOLD' if self.callee.uses_ice: dst += ' (ICE)' rtp = self.rtp rtcp = self.rtcp return '(%s) %s (RTP: %s, RTCP: %s) <-> %s <-> %s <-> %s (RTP: %s, RTCP: %s)' % ( self.media_type, src, rtp.caller.remote, rtcp.caller.remote, rtp.caller.local, rtp.callee.local, dst, rtp.callee.remote, rtcp.callee.remote) @property def counters(self): return self.rtp.counters + self.rtcp.counters @property def is_on_hold(self): return self.caller.is_on_hold or self.callee.is_on_hold def check_hold(self, party, direction, ip): previous_hold = self.is_on_hold party = getattr(self, party) if direction == 'sendonly' or direction == 'inactive': party.is_on_hold = True elif ip == '0.0.0.0': party.is_on_hold = True else: party.is_on_hold = False if previous_hold and not self.is_on_hold: for substream in [self.rtp, self.rtcp]: for subparty in [substream.caller, substream.callee]: self.status = 'active' subparty.after_hold() if not previous_hold and self.is_on_hold: for substream in [self.rtp, self.rtcp]: for subparty in [substream.caller, substream.callee]: self.status = 'on hold' subparty.before_hold() def reset(self, party, media_ip, media_port): self.rtp.reset(party) self.rtcp.reset(party) getattr(self, party).remote_sdp = (media_ip, media_port) def substream_expired(self, substream, reason, timeout_wait): if substream is self.rtp and self.caller.uses_ice and self.callee.uses_ice: reason = 'unselected ICE candidate' self.logger.info('RTP stream expired: {}'.format(reason)) if not substream.caller.got_stun_probing and not substream.callee.got_stun_probing: self.logger.info('unselected ICE candidate, but no STUN was received') if substream is self.rtcp: # Forget about the remote addresses, this will cause any # re-occurrence of the same traffic to be forwarded again substream.caller.remote.forget() substream.caller.listener.protocol.send_packet_count = 0 substream.callee.remote.forget() substream.callee.listener.protocol.send_packet_count = 0 else: session = self.session self.cleanup(reason) self.timeout_wait = timeout_wait session.stream_expired(self) def cleanup(self, status='closed'): if self.is_alive: self.is_alive = False self.status = status self.caller.cleanup() self.callee.cleanup() self.rtp.cleanup() self.rtcp.cleanup() self.session = None self.end_time = time() class Session(object): - def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark=0): + def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark=0, caller_ip=None, destination_ip=None): self.manager = manager self.dispatcher = dispatcher self.session_id = base64_encode(hashlib.md5(call_id.encode()).digest()).rstrip(b'=') self.call_id = call_id + self.caller_ip = caller_ip + self.destination_ip = destination_ip self.from_tag = from_tag self.to_tag = None self.mark = mark self.from_uri = from_uri self.to_uri = to_uri self.caller_ua = None self.callee_ua = None self.cseq = None self.previous_cseq = None self.streams = {} self.start_time = None self.end_time = None self.logger = SessionLogger(self) self.logger.info('created: from-tag {0.from_tag})'.format(self)) self.update_media(cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq) def update_media(self, cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq): if self.cseq is None: old_cseq = (0, 0) else: old_cseq = self.cseq if is_caller_cseq: cseq = (cseq, old_cseq[1]) if self.to_tag is None and to_tag is not None: self.to_tag = to_tag else: cseq = (old_cseq[0], cseq) if is_downstream: party = 'caller' if self.caller_ua is None: self.caller_ua = user_agent else: party = 'callee' if self.callee_ua is None: self.callee_ua = user_agent if self.cseq is None or cseq > self.cseq: if not media_list: return self.logger.info('got SDP offer') self.streams[cseq] = new_streams = [] if self.cseq is None: old_streams = [] else: old_streams = self.streams[self.cseq] for media_type, media_ip, media_port, media_direction, media_parameters in media_list: for old_stream in old_streams: old_remote = getattr(old_stream, party).remote_sdp if old_remote is not None: old_ip, old_port = old_remote else: old_ip, old_port = None, None if old_stream.is_alive and old_stream.media_type == media_type and ((media_ip, media_port) in ((old_ip, old_port), ('0.0.0.0', old_port), (old_ip, 0))): stream = old_stream stream.check_hold(party, media_direction, media_ip) if media_port == 0: self.logger.info('disabled stream: %s', stream) else: self.logger.info('retained stream: %s', stream) break else: stream = MediaStream(self, media_type, media_ip, media_port, media_direction, media_parameters, party) self.logger.info('proposed stream: %s' % stream) if media_port == 0: stream.cleanup() new_streams.append(stream) if self.previous_cseq is not None: for stream in self.streams[self.previous_cseq]: if stream not in self.streams[self.cseq] + new_streams: stream.cleanup() self.previous_cseq = self.cseq self.cseq = cseq elif self.cseq == cseq: self.logger.info('got SDP answer') now = time() if self.start_time is None: self.start_time = now current_streams = self.streams[cseq] for stream in current_streams: if stream.start_time is None: stream.start_time = now if to_tag is not None and not media_list: return if len(media_list) < len(current_streams): for stream in current_streams[len(media_list):]: self.logger.info('removed! stream: %s' % stream) stream.cleanup('rejected') for stream, (media_type, media_ip, media_port, media_direction, media_parameters) in zip(current_streams, media_list): if stream.media_type != media_type: raise ValueError('Media types do not match: %r and %r' % (stream.media_type, media_type)) if media_port == 0: if stream.is_alive: self.logger.info('rejected stream: %s' % stream) else: self.logger.info('disabled stream: %s' % stream) stream.cleanup('rejected') continue stream.check_hold(party, media_direction, media_ip) party_info = getattr(stream, party) party_info.uses_ice = (media_parameters.get('ice', 'no') == 'yes') if party_info.remote_sdp is None or party_info.remote_sdp[0] == '0.0.0.0': party_info.remote_sdp = (media_ip, media_port) self.logger.info('accepted stream: %s' % stream) else: if party_info.remote_sdp[1] != media_port or (party_info.remote_sdp[0] != media_ip != '0.0.0.0'): stream.reset(party, media_ip, media_port) self.logger.info('updating stream: %s' % stream) else: self.logger.info('retained stream: %s' % stream) if self.previous_cseq is not None: for stream in [stream for stream in self.streams[self.previous_cseq] if stream not in current_streams]: self.logger.info('removing stream: %s' % stream) stream.cleanup() else: self.logger.info('got old CSeq %d:%d, ignoring' % cseq) def get_local_media(self, is_downstream, cseq, is_caller_cseq): if is_caller_cseq: pos = 0 else: pos = 1 try: cseq = max(key for key in list(self.streams.keys()) if key[pos] == cseq) except ValueError: return None if is_downstream: retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.callee.local) or (stream.rtp.callee.local.host, 0) for stream in self.streams[cseq]] else: retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.caller.local) or (stream.rtp.caller.local.host, 0) for stream in self.streams[cseq]] + self.logger.info('SDP media ip for %s set to %s:%d' % ("callee" if is_downstream else "caller", retval[0][0], retval[0][1])) return retval def cleanup(self): self.end_time = time() for cseq in [self.previous_cseq, self.cseq]: if cseq is not None: for stream in self.streams[cseq]: stream.cleanup() def stream_expired(self, stream): active_streams = set() for cseq in [self.previous_cseq, self.cseq]: if cseq is not None: active_streams.update({stream for stream in self.streams[cseq] if stream.is_alive}) if len(active_streams) == 0: self.manager.session_expired(self.call_id, self.from_tag) @property def duration(self): if self.start_time is not None: if self.end_time is not None: return int(self.end_time - self.start_time) else: return int(time() - self.start_time) else: return 0 @property def relayed_bytes(self): return sum(stream.counters.relayed_bytes for stream in set(chain(*iter(self.streams.values())))) @property def statistics(self): all_streams = set(chain(*iter(self.streams.values()))) attributes = ('call_id', 'from_tag', 'from_uri', 'to_tag', 'to_uri', 'start_time', 'duration') stats = dict((name, getattr(self, name)) for name in attributes) stats['caller_ua'] = self.caller_ua or 'Unknown' stats['callee_ua'] = self.callee_ua or 'Unknown' stats['streams'] = streams = [] stream_attributes = ('media_type', 'status', 'timeout_wait') streams_to_sort = [] for stream in all_streams: try: if stream and stream.start_time: streams_to_sort.append(stream) except AttributeError: pass for stream in sorted(streams_to_sort, key=attrgetter('start_time')): # type: MediaStream info = dict((name, getattr(stream, name)) for name in stream_attributes) info['caller_codec'] = stream.rtp.caller.codec info['callee_codec'] = stream.rtp.callee.codec if stream.start_time is None: info['start_time'] = info['end_time'] = None elif self.start_time is None: info['start_time'] = info['end_time'] = 0 else: info['start_time'] = max(int(stream.start_time - self.start_time), 0) if stream.status == 'rejected': info['end_time'] = info['start_time'] else: if stream.end_time is None: info['end_time'] = stats['duration'] else: info['end_time'] = min(int(stream.end_time - self.start_time), self.duration) if stream.first_media_time is None: info['post_dial_delay'] = None else: info['post_dial_delay'] = stream.first_media_time - stream.create_time caller = stream.rtp.caller callee = stream.rtp.callee info.update(stream.counters) info['caller_local'] = str(caller.local) info['callee_local'] = str(callee.local) info['caller_remote'] = str(caller.remote) info['callee_remote'] = str(callee.remote) streams.append(info) return stats class SessionManager(Logger): @implementer(IReadDescriptor) def __init__(self, relay, start_port, end_port): self.relay = relay self.ports = deque((i, i + 1) for i in range(start_port, end_port, 2)) self.bad_ports = deque() self.sessions = {} self.watcher = _conntrack.ExpireWatcher() self.active_byte_counter = 0 # relayed byte counter for sessions active during last speed measurement self.closed_byte_counter = 0 # relayed byte counter for sessions closed after last speed measurement self.bps_relayed = 0 if RelayConfig.traffic_sampling_period > 0: self.speed_calculator = RecurrentCall(RelayConfig.traffic_sampling_period, self._measure_speed) else: self.speed_calculator = None reactor.addReader(self) def _measure_speed(self): start_time = time() current_byte_counter = sum(session.relayed_bytes for session in self.sessions.values()) self.bps_relayed = 8 * (current_byte_counter + self.closed_byte_counter - self.active_byte_counter) / RelayConfig.traffic_sampling_period self.active_byte_counter = current_byte_counter self.closed_byte_counter = 0 us_taken = int((time() - start_time) * 1000000) if us_taken > 10000: log.warning('Aggregate speed calculation time exceeded 10ms: %d us for %d sessions' % (us_taken, len(self.sessions))) return KeepRunning # implemented for IReadDescriptor def fileno(self): return self.watcher.fd def doRead(self): stream = self.watcher.read() if stream: stream.expired_func() def connectionLost(self, reason): reactor.removeReader(self) # port management def get_ports(self): if len(self.bad_ports) > len(self.ports): log.debug('Excessive amount of bad ports, doing cleanup') self.ports.extend(self.bad_ports) self.bad_ports = deque() try: return self.ports.popleft() except IndexError: raise RelayPortsExhaustedError() def set_bad_ports(self, ports): self.bad_ports.append(ports) def free_ports(self, ports): self.ports.append(ports) # called by higher level def _find_session_key(self, call_id, from_tag, to_tag): key_from = (call_id, from_tag) if key_from in self.sessions: return key_from if to_tag: key_to = (call_id, to_tag) if key_to in self.sessions: return key_to return None def has_session(self, call_id, from_tag, to_tag=None, **kw): return any((call_id, tag) in self.sessions for tag in (from_tag, to_tag) if tag is not None) def update_session(self, dispatcher, call_id, from_tag, from_uri, to_uri, cseq, user_agent, type, media=[], to_tag=None, **kw): key = self._find_session_key(call_id, from_tag, to_tag) + try: + (signaling_ip, destination_ip) = kw['signaling_ip'].split("_") + except ValueError: + signaling_ip = kw['signaling_ip'] + destination_ip = None + if key: session = self.sessions[key] is_downstream = (session.from_tag != from_tag) ^ (type == 'request') is_caller_cseq = (session.from_tag == from_tag) session.update_media(cseq, to_tag, user_agent, media, is_downstream, is_caller_cseq) elif type == 'reply' and not media: return None else: is_downstream = type == 'request' is_caller_cseq = True - session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq) + session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq, caller_ip=signaling_ip, destination_ip=destination_ip) self.sessions[(call_id, from_tag)] = session self.relay.add_session(dispatcher) return session.get_local_media(is_downstream, cseq, is_caller_cseq) def remove_session(self, call_id, from_tag, to_tag=None, **kw): key = self._find_session_key(call_id, from_tag, to_tag) try: session = self.sessions[key] except KeyError: log.warning('The dispatcher tried to remove a session which is no longer present on the relay') return None session.logger.info('removed') session.cleanup() self.closed_byte_counter += session.relayed_bytes del self.sessions[key] reactor.callLater(0, self.relay.remove_session, session.dispatcher) return session def session_expired(self, call_id, from_tag): key = (call_id, from_tag) try: session = self.sessions[key] except KeyError: log.warning('A session expired but is no longer present on the relay') return session.logger.info('expired') session.cleanup() self.closed_byte_counter += session.relayed_bytes del self.sessions[key] self.relay.session_expired(session) self.relay.remove_session(session.dispatcher) def cleanup(self): if self.speed_calculator is not None: self.speed_calculator.cancel() for key in list(self.sessions.keys()): self.session_expired(*key) @property def statistics(self): return [session.statistics for session in self.sessions.values()] @property def stream_count(self): stream_count = {} for session in self.sessions.values(): for stream in set(chain(*iter(session.streams.values()))): if stream.is_alive: stream_count[stream.media_type] = stream_count.get(stream.media_type, 0) + 1 return stream_count