diff --git a/build_inplace b/build_inplace index 084bab0..350c270 100755 --- a/build_inplace +++ b/build_inplace @@ -1,5 +1,5 @@ #!/bin/sh -python setup.py build_ext --inplace "$@" -test -d build && python setup.py clean +python3 setup.py build_ext --inplace "$@" +test -d build && python3 setup.py clean diff --git a/media-dispatcher b/media-dispatcher index 4269843..ed7eb58 100755 --- a/media-dispatcher +++ b/media-dispatcher @@ -1,77 +1,77 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 if __name__ == '__main__': import mediaproxy import sys from application import log from application.process import process, ProcessError from argparse import ArgumentParser name = 'media-dispatcher' fullname = 'MediaProxy Dispatcher' description = 'MediaProxy Dispatcher component' process.configuration.user_directory = None process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory process.runtime.subdirectory = mediaproxy.mediaproxy_subdirectory parser = ArgumentParser(usage='%(prog)s [options]') parser.add_argument('--version', action='version', version='%(prog)s {}'.format(mediaproxy.__version__)) parser.add_argument('--systemd', action='store_true', help='run as a systemd simple service and log to journal') parser.add_argument('--no-fork', action='store_false', dest='fork', help='run in the foreground and log to the terminal') parser.add_argument('--config-dir', dest='config_directory', default=None, help='the configuration directory ({})'.format(process.configuration.system_directory), metavar='PATH') parser.add_argument('--runtime-dir', dest='runtime_directory', default=None, help='the runtime directory ({})'.format(process.runtime.directory), metavar='PATH') parser.add_argument('--debug', action='store_true', help='enable verbose logging') parser.add_argument('--debug-memory', action='store_true', help='enable memory debugging') options = parser.parse_args() log.Formatter.prefix_format = '{record.levelname:<8s} ' if options.config_directory is not None: process.configuration.local_directory = options.config_directory if options.runtime_directory is not None: process.runtime.directory = options.runtime_directory try: process.runtime.create_directory() except ProcessError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) if options.systemd: from systemd.journal import JournalHandler log.set_handler(JournalHandler(SYSLOG_IDENTIFIER=name)) log.capture_output() elif options.fork: try: process.daemonize(pidfile='{}.pid'.format(name)) except ProcessError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) log.use_syslog(name) log.info('Starting %s %s' % (fullname, mediaproxy.__version__)) from mediaproxy.dispatcher import Dispatcher from mediaproxy.configuration import DispatcherConfig log.level.current = log.level.DEBUG if options.debug else DispatcherConfig.log_level if options.debug_memory: from application.debug.memory import memory_dump try: dispatcher = Dispatcher() except Exception as e: log.critical('Failed to create %s: %s' % (fullname, e)) if type(e) is not RuntimeError: log.exception() sys.exit(1) dispatcher.run() if options.debug_memory: memory_dump() diff --git a/media-relay b/media-relay index 5c59023..40bf086 100755 --- a/media-relay +++ b/media-relay @@ -1,120 +1,120 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 if __name__ == '__main__': import mediaproxy import errno import sys import subprocess from application import log from application.process import process, ProcessError from application.version import Version from argparse import ArgumentParser IP_FORWARD_FILE = '/proc/sys/net/ipv4/ip_forward' CONNTRACK_ACCT_FILE = '/proc/sys/net/netfilter/nf_conntrack_acct' KERNEL_VERSION_FILE = '/proc/sys/kernel/osrelease' name = 'media-relay' fullname = 'MediaProxy Relay' description = 'MediaProxy Relay component' process.configuration.user_directory = None process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory process.runtime.subdirectory = mediaproxy.mediaproxy_subdirectory parser = ArgumentParser(usage='%(prog)s [options]') parser.add_argument('--version', action='version', version='%(prog)s {}'.format(mediaproxy.__version__)) parser.add_argument('--systemd', action='store_true', help='run as a systemd simple service and log to journal') parser.add_argument('--no-fork', action='store_false', dest='fork', help='run in the foreground and log to the terminal') parser.add_argument('--config-dir', dest='config_directory', default=None, help='the configuration directory ({})'.format(process.configuration.system_directory), metavar='PATH') parser.add_argument('--runtime-dir', dest='runtime_directory', default=None, help='the runtime directory ({})'.format(process.runtime.directory), metavar='PATH') parser.add_argument('--debug', action='store_true', help='enable verbose logging') parser.add_argument('--debug-memory', action='store_true', help='enable memory debugging') options = parser.parse_args() log.Formatter.prefix_format = '{record.levelname:<8s} ' if options.config_directory is not None: process.configuration.local_directory = options.config_directory if options.runtime_directory is not None: process.runtime.directory = options.runtime_directory if not sys.platform.startswith('linux'): log.critical('Cannot start %s. A Linux host is required for operation.' % fullname) sys.exit(1) try: subprocess.call(['modprobe', 'ip_tables'], env={'PATH': '/usr/sbin:/sbin:/usr/bin:/bin'}) except OSError as e: log.critical('Cannot start %s: failed to load the ip_tables kernel module: %s' % (fullname, e)) sys.exit(1) try: kernel_version = Version.parse(open(KERNEL_VERSION_FILE).read().strip()) except (OSError, IOError, ValueError): log.critical('Could not determine Linux kernel version') sys.exit(1) if kernel_version < Version(2, 6, 18): log.critical('Linux kernel version 2.6.18 or newer is required to run the media relay') sys.exit(1) try: ip_forward = bool(int(open(IP_FORWARD_FILE).read())) except (OSError, IOError, ValueError): ip_forward = False if not ip_forward: log.critical('IP forwarding is not available or not enabled (check %s)' % IP_FORWARD_FILE) sys.exit(1) try: with open(CONNTRACK_ACCT_FILE, 'w') as acct_file: acct_file.write('1') except (IOError, OSError) as e: if e.errno != errno.ENOENT: log.critical('Could not enable conntrack rule counters (check %s): %s' % (CONNTRACK_ACCT_FILE, e)) sys.exit(1) if options.systemd: from systemd.journal import JournalHandler log.set_handler(JournalHandler(SYSLOG_IDENTIFIER=name)) log.capture_output() elif options.fork: try: process.daemonize(pidfile='{}.pid'.format(name)) except ProcessError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) log.use_syslog(name) log.info('Starting %s %s' % (fullname, mediaproxy.__version__)) try: process.wait_for_network(wait_time=10, wait_message='Waiting for network to become available...') except KeyboardInterrupt: sys.exit(0) except RuntimeError as e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) try: from mediaproxy.relay import MediaRelay from mediaproxy.configuration import RelayConfig log.level.current = log.level.DEBUG if options.debug else RelayConfig.log_level if options.debug_memory: from application.debug.memory import memory_dump relay = MediaRelay() except Exception as e: log.critical('Failed to create %s: %s' % (fullname, e)) if type(e) is not RuntimeError: log.exception() sys.exit(1) relay.run() if options.debug_memory: memory_dump() diff --git a/mediaproxy/configuration/datatypes.py b/mediaproxy/configuration/datatypes.py index f741394..e9748d3 100644 --- a/mediaproxy/configuration/datatypes.py +++ b/mediaproxy/configuration/datatypes.py @@ -1,113 +1,113 @@ import re from application.configuration.datatypes import IPAddress, NetworkAddress, StringList from gnutls import crypto class DispatcherIPAddress(NetworkAddress): default_port = 25060 class DispatcherManagementAddress(NetworkAddress): default_port = 25061 class AccountingModuleList(StringList): _valid_backends = {'database', 'radius'} def __new__(cls, value): proposed_backends = set(StringList.__new__(cls, value)) return list(proposed_backends & cls._valid_backends) class DispatcherAddress(tuple): default_port = 25060 def __new__(cls, value): match = re.search(r"^(?P
.+?):(?P\d+)$", value) if match: address = str(match.group("address")) port = int(match.group("port")) else: address = value port = cls.default_port try: address = IPAddress(address) is_domain = False except ValueError: is_domain = True return tuple.__new__(cls, (address, port, is_domain)) class DispatcherAddressList(list): def __init__(cls, value): list.__init__(cls, (DispatcherAddress(dispatcher) for dispatcher in re.split(r'\s*,\s*|\s+', value))) class PortRange(object): """A port range in the form start:end with start and end being even numbers in the [1024, 65536] range""" def __init__(self, value): self.start, self.end = [int(p) for p in value.split(':', 1)] - allowed = xrange(1024, 65537, 2) + allowed = range(1024, 65537, 2) if not (self.start in allowed and self.end in allowed and self.start < self.end): raise ValueError("bad range: %r: ports must be even numbers in the range [1024, 65536] with start < end" % value) def __repr__(self): return "%s('%d:%d')" % (self.__class__.__name__, self.start, self.end) class PositiveInteger(int): def __new__(cls, value): instance = int.__new__(cls, value) if instance < 1: raise ValueError("value must be a positive integer") return instance class SIPThorDomain(str): """A SIP Thor domain name or the keyword None""" def __new__(cls, name): if name is None: return None - elif not isinstance(name, basestring): + elif not isinstance(name, str): raise TypeError("domain name must be a string, unicode or None") if name.lower() == 'none': return None return name class X509NameValidator(crypto.X509Name): def __new__(cls, dname): if dname.lower() == 'none': return None return crypto.X509Name.__new__(cls, dname) def __init__(self, dname): str.__init__(self) pairs = [x.replace('\,', ',') for x in re.split(r'(?=limit), self.sessions.iteritems())] + obsolete = [k for k, s in filter(lambda k_s: k_s[1].expire_time and (now-k_s[1].expire_time>=limit), iter(self.sessions.items()))] if obsolete: [self.sessions.pop(call_id) for call_id in obsolete] log.warning('found %d expired sessions which were not removed during the last %d hours' % (len(obsolete), round(limit / 3600.0))) return KeepRunning def buildProtocol(self, addr): protocol = Factory.buildProtocol(self, addr) protocol.ip = addr.host protocol.logger = ProtocolLogger(name='relay {}'.format(addr.host)) protocol.logger.info('Connection established') return protocol def new_relay(self, relay): old_relay = self.relays.pop(relay.ip, None) if old_relay is not None: relay.logger.warning('Reconnected, closing old connection') reactor.callLater(0, old_relay.transport.connectionLost, failure.Failure(ConnectionReplaced('relay reconnected'))) self.relays[relay.ip] = relay timer = self.cleanup_timers.pop(relay.ip, None) if timer is not None: timer.cancel() defer = relay.send_command(Command('sessions')) defer.addCallback(self._cb_purge_sessions, relay.ip) def _cb_purge_sessions(self, result, relay_ip): - relay_sessions = cjson.decode(result) + relay_sessions = json.loads(result) relay_call_ids = [session['call_id'] for session in relay_sessions] - for session_id, session in self.sessions.items(): + for session_id, session in list(self.sessions.items()): if session.expire_time is None and session.relay_ip == relay_ip and session_id not in relay_call_ids: session.logger.warning('Relay does not have the session anymore, statistics are probably lost') if session.dialog_id is not None: self.dispatcher.opensips_management.end_dialog(session.dialog_id) del self.sessions[session_id] def send_command(self, command): session = self.sessions.get(command.call_id, None) if session and session.expire_time is None: relay = session.relay_ip if relay not in self.relays: session.logger.error('Request {0.name!r} failed: relay no longer connected'.format(command)) raise RelayError('Request {0.name!r} failed: relay no longer connected'.format(command)) return self.relays[relay].send_command(command) # We do not have a session for this call_id or the session is already expired if command.name == 'update': preferred_relay = command.parsed_headers.get('media_relay') - try_relays = deque(protocol for protocol in self.relays.itervalues() if protocol.active and protocol.ip != preferred_relay) + try_relays = deque(protocol for protocol in self.relays.values() if protocol.active and protocol.ip != preferred_relay) random.shuffle(try_relays) if preferred_relay is not None: protocol = self.relays.get(preferred_relay) if protocol is not None and protocol.active: try_relays.appendleft(protocol) else: log.warning('user requested media_relay %s is not available' % preferred_relay) defer = self._try_next(try_relays, command) defer.addCallback(self._add_session, try_relays, command) return defer elif command.name == 'remove' and session: # This is the remove we received for an expired session for which we triggered dialog termination del self.sessions[command.call_id] return 'removed' else: raise RelayError('Got {0.name!r} for unknown session {0.session_id}'.format(command)) def _add_session(self, result, try_relays, command): self.sessions[command.call_id] = RelaySession(try_relays[0], command) return result def _relay_error(self, failure, try_relays, command): failure.trap(RelayError) failed_relay = try_relays.popleft() failed_relay.logger.warning('The {0.name!r} request failed: {1.value}'.format(command, failure)) return self._try_next(try_relays, command) def _try_next(self, try_relays, command): if len(try_relays) == 0: raise RelayError('No suitable relay found') defer = try_relays[0].send_command(command) defer.addErrback(self._relay_error, try_relays, command) return defer def get_summary(self): command = Command('summary') - defer = DeferredList([relay.send_command(command).addErrback(self._summary_error, command, relay) for relay in self.relays.itervalues()]) + defer = DeferredList([relay.send_command(command).addErrback(self._summary_error, command, relay) for relay in self.relays.values()]) defer.addCallback(self._got_summaries) return defer def _summary_error(self, failure, command, relay): relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure)) - return cjson.encode(dict(status='error', ip=relay.ip)) + return json.dumps(dict(status='error', ip=relay.ip)) def _got_summaries(self, results): return '[%s]' % ', '.join(result for succeeded, result in results if succeeded) def get_statistics(self): command = Command('sessions') - defer = DeferredList([relay.send_command(command).addErrback(self._statistics_error, command, relay) for relay in self.relays.itervalues()]) + defer = DeferredList([relay.send_command(command).addErrback(self._statistics_error, command, relay) for relay in self.relays.values()]) defer.addCallback(self._got_statistics) return defer def _statistics_error(self, failure, command, relay): relay.logger.error('The {0.name!r} request failed: {1.value}'.format(command, failure)) - return cjson.encode([]) + return json.loads([]) def _got_statistics(self, results): return '[%s]' % ', '.join(result[1:-1] for succeeded, result in results if succeeded and result != '[]') def connection_lost(self, relay): - if relay not in self.relays.itervalues(): + if relay not in iter(self.relays.values()): return if relay.authenticated: del self.relays[relay.ip] if self.shutting_down: if len(self.relays) == 0: self.defer.callback(None) else: self.cleanup_timers[relay.ip] = reactor.callLater(DispatcherConfig.cleanup_dead_relays_after, self._do_cleanup, relay.ip) def _do_cleanup(self, ip): log.debug('Cleaning up after old relay at %s' % ip) del self.cleanup_timers[ip] - for call_id in (call_id for call_id, session in self.sessions.items() if session.relay_ip == ip): + for call_id in (call_id for call_id, session in list(self.sessions.items()) if session.relay_ip == ip): del self.sessions[call_id] def shutdown(self): if self.shutting_down: return self.shutting_down = True - for timer in self.cleanup_timers.itervalues(): + for timer in self.cleanup_timers.values(): timer.cancel() if len(self.relays) == 0: retval = succeed(None) else: - for prot in self.relays.itervalues(): + for prot in self.relays.values(): prot.transport.loseConnection() self.defer = Deferred() retval = self.defer retval.addCallback(self._save_state) return retval def _save_state(self, result): - pickle.dump(self.sessions, open(process.runtime.file('dispatcher_state'), 'w')) + pickle.dump(self.sessions, open(process.runtime.file('dispatcher_state'), 'wb')) class Dispatcher(object): def __init__(self): self.accounting = [__import__('mediaproxy.interfaces.accounting.%s' % mod.lower(), globals(), locals(), ['']).Accounting() for mod in set(DispatcherConfig.accounting)] self.cred = X509Credentials(cert_name='dispatcher') self.tls_context = TLSContext(self.cred) self.relay_factory = RelayFactory(self) dispatcher_addr, dispatcher_port = DispatcherConfig.listen - self.relay_listener = reactor.listenTLS(dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr) + self.relay_listener = listenTLS(reactor, dispatcher_port, self.relay_factory, self.tls_context, interface=dispatcher_addr) self.opensips_factory = OpenSIPSControlFactory(self) socket_path = process.runtime.file(DispatcherConfig.socket_path) unlink(socket_path) self.opensips_listener = reactor.listenUNIX(socket_path, self.opensips_factory) self.opensips_management = opensips.ManagementInterface() self.management_factory = ManagementControlFactory(self) management_addr, management_port = DispatcherConfig.listen_management if DispatcherConfig.management_use_tls: - self.management_listener = reactor.listenTLS(management_port, self.management_factory, self.tls_context, interface=management_addr) + self.management_listener = listenTLS(reactor, management_port, self.management_factory, self.tls_context, interface=management_addr) else: - self.management_listener = reactor.listenTCP(management_port, self.management_factory, interface=management_addr) + self.management_listener = reactor.listenTCP( management_port, self.management_factory, interface=management_addr) def run(self): log.debug('Using {0.__class__.__name__}'.format(reactor)) process.signals.add_handler(signal.SIGHUP, self._handle_signal) process.signals.add_handler(signal.SIGINT, self._handle_signal) process.signals.add_handler(signal.SIGTERM, self._handle_signal) process.signals.add_handler(signal.SIGUSR1, self._handle_signal) for accounting_module in self.accounting: accounting_module.start() reactor.run(installSignalHandlers=False) def stop(self): reactor.callFromThread(self._shutdown) def send_command(self, command): return maybeDeferred(self.relay_factory.send_command, command) def update_statistics(self, session, stats): session.logger.info('statistics: {}'.format(stats)) if stats['start_time'] is not None: for accounting in self.accounting: try: accounting.do_accounting(stats) - except Exception, e: + except Exception as e: log.exception('An unhandled error occurred while doing accounting: %s' % e) def _handle_signal(self, signum, frame): if signum == signal.SIGUSR1: # toggle debugging if log.level.current != log.level.DEBUG: log.level.current = log.level.DEBUG log.info('Switched logging level to DEBUG') else: log.info('Switched logging level to {}'.format(DispatcherConfig.log_level)) log.level.current = DispatcherConfig.log_level else: # terminate program signal_map = {signal.SIGTERM: 'Terminated', signal.SIGINT: 'Interrupted', signal.SIGHUP: 'Hangup'} log.info(signal_map.get(signum, 'Received signal {}, exiting.'.format(signum))) self.stop() def _shutdown(self): defer = DeferredList([result for result in [self.opensips_listener.stopListening(), self.management_listener.stopListening(), self.relay_listener.stopListening()] if result is not None]) defer.addCallback(lambda x: self.opensips_factory.shutdown()) defer.addCallback(lambda x: self.management_factory.shutdown()) defer.addCallback(lambda x: self.relay_factory.shutdown()) defer.addCallback(lambda x: self._stop()) def _stop(self): for act in self.accounting: act.stop() reactor.stop() diff --git a/mediaproxy/headers.py b/mediaproxy/headers.py index c169331..15ba8ed 100644 --- a/mediaproxy/headers.py +++ b/mediaproxy/headers.py @@ -1,105 +1,105 @@ """Header encoding and decoding rules for communication between the dispatcher and relay components""" class EncodingError(Exception): pass class DecodingError(Exception): pass class MediaProxyHeaders(object): @classmethod def encode(cls, name, value): func_name = "encode_%s" % name if hasattr(cls, func_name): return getattr(cls, func_name)(value) else: return value @classmethod def decode(cls, name, value): func_name = "decode_%s" % name if hasattr(cls, func_name): return getattr(cls, func_name)(value) else: return value @staticmethod def encode_cseq(value): return str(value) @staticmethod def decode_cseq(value): try: return int(value) except ValueError: raise DecodingError("Not an integer: %s" % value) @staticmethod def encode_type(value): if value not in ["request", "reply"]: raise EncodingError('"type" header should be either "request" or "reply"') return value @staticmethod def decode_type(value): if value not in ["request", "reply"]: raise DecodingError('"type" header should be either "request" or "reply"') return value @staticmethod def encode_media(value): try: - return ','.join(':'.join([type, ip, str(port), direction] + ['%s=%s' % (k, v) for k, v in parameters.iteritems()]) for type, ip, port, direction, parameters in value) + return ','.join(':'.join([type, ip, str(port), direction] + ['%s=%s' % (k, v) for k, v in parameters.items()]) for type, ip, port, direction, parameters in value) except: raise EncodingError("Ill-formatted media information") @staticmethod def decode_media(value): try: streams = [] for stream_data in (data for data in value.split(",") if data): stream_data = stream_data.split(":") type, ip, port, direction = stream_data[:4] parameters = dict(param.split("=") for param in stream_data[4:] if param) streams.append((type, ip, int(port), direction, parameters)) return streams except: raise DecodingError("Ill-formatted media header") class CodingDict(dict): def __init__(self, *args, **kwargs): if not args and not kwargs: it = [] elif kwargs: - it = kwargs.iteritems() + it = iter(kwargs.items()) elif isinstance(args[0], dict): - it = args[0].iteritems() + it = iter(args[0].items()) else: try: it = iter(args[0]) except: dict.__init__(self, *args, **kwargs) return dict.__init__(self) for key, value in it: self.__setitem__(key, value) class EncodingDict(CodingDict): def __setitem__(self, key, value): encoded_value = MediaProxyHeaders.encode(key, value) dict.__setitem__(self, key, encoded_value) class DecodingDict(CodingDict): def __setitem__(self, key, value): decoded_value = MediaProxyHeaders.decode(key, value) dict.__setitem__(self, key, decoded_value) diff --git a/mediaproxy/interfaces/accounting/database.py b/mediaproxy/interfaces/accounting/database.py index 4c5c3ad..6449719 100644 --- a/mediaproxy/interfaces/accounting/database.py +++ b/mediaproxy/interfaces/accounting/database.py @@ -1,86 +1,86 @@ """Implementation of database accounting""" -import cjson +import json from application import log from application.python.queue import EventQueue from sqlobject import SQLObject, connectionForURI, sqlhub from sqlobject import StringCol, BLOBCol, DatabaseIndex from sqlobject.dberrors import DatabaseError, ProgrammingError, OperationalError from mediaproxy.configuration import DatabaseConfig if not DatabaseConfig.dburi: raise RuntimeError('Database accounting is enabled, but the database URI is not specified in config.ini') connection = connectionForURI(DatabaseConfig.dburi) sqlhub.processConnection = connection class MediaSessions(SQLObject): class sqlmeta: table = DatabaseConfig.sessions_table createSQL = {'mysql': 'ALTER TABLE %s ENGINE MyISAM' % DatabaseConfig.sessions_table} cacheValues = False call_id = StringCol(length=255, dbName=DatabaseConfig.callid_column, notNone=True) from_tag = StringCol(length=64, dbName=DatabaseConfig.fromtag_column, notNone=True) to_tag = StringCol(length=64, dbName=DatabaseConfig.totag_column) info = BLOBCol(length=2**24-1, dbName=DatabaseConfig.info_column) # 2**24-1 makes it a mediumblob in mysql, that can hold 16 million bytes # Indexes callid_idx = DatabaseIndex('call_id', 'from_tag', 'to_tag', unique=True) try: MediaSessions.createTable(ifNotExists=True) except OperationalError as e: log.error("cannot create the `%s' table: %s" % (DatabaseConfig.sessions_table, e)) log.info("please make sure that the `%s' user has the CREATE and ALTER rights on the `%s' database" % (connection.user, connection.db)) log.info('then restart the dispatcher, or you can create the table yourself using the following definition:') log.info('----------------- >8 -----------------') sql, constraints = MediaSessions.createTableSQL() statements = ';\n'.join([sql] + constraints) + ';' log.info(statements) log.info('----------------- >8 -----------------') # raise RuntimeError(str(e)) class Accounting(object): def __init__(self): self.handler = DatabaseAccounting() def start(self): self.handler.start() def do_accounting(self, stats): self.handler.put(stats) def stop(self): self.handler.stop() self.handler.join() class DatabaseAccounting(EventQueue): def __init__(self): EventQueue.__init__(self, self.do_accounting) def do_accounting(self, stats): sqlrepr = connection.sqlrepr names = ', '.join([DatabaseConfig.callid_column, DatabaseConfig.fromtag_column, DatabaseConfig.totag_column, DatabaseConfig.info_column]) - values = ', '.join((sqlrepr(v) for v in [stats['call_id'], stats['from_tag'], stats['to_tag'], cjson.encode(stats)])) + values = ', '.join((sqlrepr(v) for v in [stats['call_id'], stats['from_tag'], stats['to_tag'], json.dumps(stats)])) q = 'INSERT INTO %s (%s) VALUES (%s)' % (DatabaseConfig.sessions_table, names, values) try: try: connection.query(q) - except ProgrammingError, e: + except ProgrammingError as e: try: MediaSessions.createTable(ifNotExists=True) except OperationalError: raise e else: connection.query(q) except DatabaseError as e: log.error('failed to insert record into database: %s' % e) diff --git a/mediaproxy/interfaces/accounting/radius.py b/mediaproxy/interfaces/accounting/radius.py index 04c68aa..8a35239 100644 --- a/mediaproxy/interfaces/accounting/radius.py +++ b/mediaproxy/interfaces/accounting/radius.py @@ -1,128 +1,131 @@ """Implementation of RADIUS accounting""" from application import log from application.process import process from application.python.queue import EventQueue import pyrad.client import pyrad.dictionary from mediaproxy.configuration import RadiusConfig try: from pyrad.dictfile import DictFile except ImportError: # helper class to make pyrad support the $INCLUDE statement in dictionary files class RadiusDictionaryFile(object): def __init__(self, base_file_name): self.file_names = [base_file_name] self.fd_stack = [open(base_file_name)] def readlines(self): while True: line = self.fd_stack[-1].readline() if line: if line.startswith('$INCLUDE'): file_name = line.rstrip('\n').split(None, 1)[1] if file_name not in self.file_names: self.file_names.append(file_name) self.fd_stack.append(open(file_name)) continue else: yield line else: self.fd_stack.pop() if len(self.fd_stack) == 0: return else: del DictFile class RadiusDictionaryFile(str): pass class Accounting(object): def __init__(self): self.handler = RadiusAccounting() def start(self): self.handler.start() def do_accounting(self, stats): self.handler.put(stats) def stop(self): self.handler.stop() self.handler.join() class RadiusAccounting(EventQueue, pyrad.client.Client): def __init__(self): main_config_file = process.configuration.file(RadiusConfig.config_file) if main_config_file is None: raise RuntimeError('Cannot find the radius configuration file: %r' % RadiusConfig.config_file) try: config = dict(line.rstrip('\n').split(None, 1) for line in open(main_config_file) if len(line.split(None, 1)) == 2 and not line.startswith('#')) secrets = dict(line.rstrip('\n').split(None, 1) for line in open(config['servers']) if len(line.split(None, 1)) == 2 and not line.startswith('#')) server = config['acctserver'] try: server, acctport = server.split(':') acctport = int(acctport) except ValueError: log.info('Could not load additional RADIUS dictionary file: %r' % RadiusConfig.additional_dictionary) acctport = 1813 log.info('Using RADIUS server at %s:%d' % (server, acctport)) secret = secrets[server] log.info("Using RADIUS dictionary file %s" % config['dictionary']) dicts = [RadiusDictionaryFile(config['dictionary'])] if RadiusConfig.additional_dictionary: additional_dictionary = process.configuration.file(RadiusConfig.additional_dictionary) if additional_dictionary: log.info("Using additional RADIUS dictionary file %s" % RadiusConfig.additional_dictionary) dicts.append(RadiusDictionaryFile(additional_dictionary)) else: log.warning('Could not load additional RADIUS dictionary file: %r' % RadiusConfig.additional_dictionary) raddict = pyrad.dictionary.Dictionary(*dicts) timeout = int(config['radius_timeout']) retries = int(config['radius_retries']) except Exception: log.critical('cannot read the RADIUS configuration file %s' % RadiusConfig.config_file) raise pyrad.client.Client.__init__(self, server, 1812, acctport, 3799, secret, raddict) self.timeout = timeout self.retries = retries if 'bindaddr' in config and config['bindaddr'] != '*': self.bind((config['bindaddr'], 0)) EventQueue.__init__(self, self.do_accounting) def do_accounting(self, stats): attrs = {} attrs['Acct-Status-Type'] = 'Update' attrs['User-Name'] = 'mediaproxy@default' attrs['Acct-Session-Id'] = stats['call_id'] attrs['Acct-Session-Time'] = stats['duration'] attrs['Acct-Input-Octets'] = sum(stream_stats['caller_bytes'] for stream_stats in stats['streams']) attrs['Acct-Output-Octets'] = sum(stream_stats['callee_bytes'] for stream_stats in stats['streams']) attrs['Sip-From-Tag'] = stats['from_tag'] attrs['Sip-To-Tag'] = stats['to_tag'] or '' attrs['NAS-IP-Address'] = stats['streams'][0]['caller_local'].split(':')[0] attrs['Sip-User-Agents'] = (stats['caller_ua'] + '+' + stats['callee_ua'])[:253] attrs['Sip-Applications'] = ', '.join(sorted(set(stream['media_type'] for stream in stats['streams'] if stream['start_time'] != stream['end_time'])))[:253] attrs['Media-Codecs'] = ', '.join(stream['caller_codec'] for stream in stats['streams'])[:253] if stats['timed_out'] and not stats.get('all_streams_ice', False): attrs['Media-Info'] = 'timeout' elif stats.get('all_streams_ice', False): attrs['Media-Info'] = 'ICE session' else: attrs['Media-Info'] = '' for stream in stats['streams']: if stream['post_dial_delay'] is not None: attrs['Acct-Delay-Time'] = int(stream['post_dial_delay']) break + if isinstance(self.secret, str): + scr_bn = self.secret.encode('utf-8') + self.secret = scr_bn try: self.SendPacket(self.CreateAcctPacket(**attrs)) - except Exception, e: + except Exception as e: log.error('Failed to send radius accounting record: %s' % e) diff --git a/mediaproxy/interfaces/opensips.py b/mediaproxy/interfaces/opensips.py index ce2f3cf..d0e1de4 100644 --- a/mediaproxy/interfaces/opensips.py +++ b/mediaproxy/interfaces/opensips.py @@ -1,240 +1,236 @@ import json import socket -import urlparse +import urllib.parse from abc import ABCMeta, abstractmethod, abstractproperty from application import log from application.python.types import Singleton from application.process import process from application.system import unlink from random import getrandbits from twisted.internet import reactor, defer from twisted.internet.protocol import DatagramProtocol from twisted.python.failure import Failure from mediaproxy.configuration import OpenSIPSConfig class Error(Exception): pass class TimeoutError(Error): pass class OpenSIPSError(Error): pass class NegativeReplyError(OpenSIPSError): def __init__(self, code, message): super(NegativeReplyError, self).__init__(code, message) self.code = code self.message = message def __repr__(self): return '{0.__class__.__name__}({0.code!r}, {0.message!r})'.format(self) def __str__(self): return '[{0.code}] {0.message}'.format(self) -class Request(object): - __metaclass__ = ABCMeta - +class Request(object, metaclass=ABCMeta): method = abstractproperty() @abstractmethod def __init__(self, *args): self.id = '{:x}'.format(getrandbits(32)) self.args = list(args) self.deferred = defer.Deferred() @property def __data__(self): return dict(jsonrpc='2.0', id=self.id, method=self.method, params=self.args) @abstractmethod def process_response(self, response): raise NotImplementedError # noinspection PyAbstractClass class BooleanRequest(Request): """A request that returns True if successful, False otherwise""" def process_response(self, response): return not isinstance(response, Failure) class AddressReload(BooleanRequest): method = 'address_reload' def __init__(self): super(AddressReload, self).__init__() class DomainReload(BooleanRequest): method = 'domain_reload' def __init__(self): super(DomainReload, self).__init__() class EndDialog(BooleanRequest): method = 'dlg_end_dlg' def __init__(self, dialog_id): super(EndDialog, self).__init__(dialog_id) class RefreshWatchers(BooleanRequest): method = 'refresh_watchers' def __init__(self, account, refresh_type): super(RefreshWatchers, self).__init__('sip:{}'.format(account), 'presence', refresh_type) class UpdateSubscriptions(BooleanRequest): method = 'rls_update_subscriptions' def __init__(self, account): super(UpdateSubscriptions, self).__init__('sip:{}'.format(account)) class GetOnlineDevices(Request): method = 'ul_show_contact' def __init__(self, account): super(GetOnlineDevices, self).__init__(OpenSIPSConfig.location_table, account) def process_response(self, response): if isinstance(response, Failure): if response.type is NegativeReplyError and response.value.code == 404: return [] return response - return [ContactData(contact) for contact in response[u'Contacts']] + return [ContactData(contact) for contact in response['Contacts']] class ContactData(dict): - __fields__ = {u'contact', u'expires', u'received', u'user_agent'} + __fields__ = {'contact', 'expires', 'received', 'user_agent'} def __init__(self, data): - super(ContactData, self).__init__({key: value for key, value in ((key.lower().replace(u'-', u'_'), value) for key, value in data.iteritems()) if key in self.__fields__}) - self.setdefault(u'user_agent', None) - if u'received' in self: - parsed_received = urlparse.parse_qs(self[u'received']) - if u'target' in parsed_received: - self[u'NAT_contact'] = parsed_received[u'target'][0] + super(ContactData, self).__init__({key: value for key, value in ((key.lower().replace('-', '_'), value) for key, value in data.items()) if key in self.__fields__}) + self.setdefault('user_agent', None) + if 'received' in self: + parsed_received = urllib.parse.parse_qs(self['received']) + if 'target' in parsed_received: + self['NAT_contact'] = parsed_received['target'][0] else: - self[u'NAT_contact'] = self[u'received'] - del self[u'received'] + self['NAT_contact'] = self['received'] + del self['received'] else: - self[u'NAT_contact'] = self[u'contact'] + self['NAT_contact'] = self['contact'] class UNIXSocketProtocol(DatagramProtocol): noisy = False def datagramReceived(self, data, address): log.debug('Got MI response: {}'.format(data)) try: response = json.loads(data) except ValueError: code, _, message = data.partition(' ') try: code = int(code) except ValueError: log.error('MI response from OpenSIPS cannot be parsed (neither JSON nor status reply)') return # we got one of the 'code message' type of replies. This means either parsing error or internal error in OpenSIPS. # if we only have one request pending, we can associate the response with it, otherwise is impossible to tell to # which request the response corresponds. The failed request will fail with timeout later. if len(self.transport.requests) == 1: _, request = self.transport.requests.popitem() request.deferred.errback(Failure(NegativeReplyError(code, message))) log.error('MI request {.method} failed with: {} {}'.format(request, code, message)) else: log.error('Got MI status reply from OpenSIPS that cannot be associated with a request: {!r}'.format(data)) else: try: request_id = response['id'] except KeyError: log.error('MI JSON response from OpenSIPS lacks id field') return if request_id not in self.transport.requests: log.error('MI JSON response from OpenSIPS has unknown id: {!r}'.format(request_id)) return request = self.transport.requests.pop(request_id) if 'result' in response: request.deferred.callback(response['result']) elif 'error' in response: log.error('MI request {0.method} failed with: {1[error][code]} {1[error][message]}'.format(request, response)) request.deferred.errback(Failure(NegativeReplyError(response['error']['code'], response['error']['message']))) else: log.error('Invalid MI JSON response from OpenSIPS') request.deferred.errback(Failure(OpenSIPSError('Invalid MI JSON response from OpenSIPS'))) class UNIXSocketConnection(object): timeout = 3 def __init__(self): socket_path = process.runtime.file('opensips.sock') unlink(socket_path) self.path = socket_path self.transport = reactor.listenUNIXDatagram(self.path, UNIXSocketProtocol()) self.transport.requests = {} reactor.addSystemEventTrigger('during', 'shutdown', self.close) def close(self): - for request in self.transport.requests.values(): + for request in list(self.transport.requests.values()): if not request.deferred.called: request.deferred.errback(Error('shutting down')) self.transport.requests.clear() self.transport.stopListening() unlink(self.path) def send(self, request): try: self.transport.write(json.dumps(request.__data__), OpenSIPSConfig.socket_path) except socket.error as e: log.error("cannot write request to %s: %s" % (OpenSIPSConfig.socket_path, e[1])) request.deferred.errback(Failure(Error("Cannot send MI request %s to OpenSIPS" % request.method))) else: self.transport.requests[request.id] = request request.deferred.addBoth(request.process_response) reactor.callLater(self.timeout, self._did_timeout, request) log.debug('Send MI request: {}'.format(request.__data__)) return request.deferred def _did_timeout(self, request): if not request.deferred.called: request.deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) self.transport.requests.pop(request.id) -class ManagementInterface(object): - __metaclass__ = Singleton - +class ManagementInterface(object, metaclass=Singleton): def __init__(self): self.connection = UNIXSocketConnection() def reload_domains(self): return self.connection.send(DomainReload()) def reload_addresses(self): return self.connection.send(AddressReload()) def end_dialog(self, dialog_id): return self.connection.send(EndDialog(dialog_id)) def get_online_devices(self, account): return self.connection.send(GetOnlineDevices(account)) def refresh_watchers(self, account, refresh_type): return self.connection.send(RefreshWatchers(account, refresh_type)) def update_subscriptions(self, account): return self.connection.send(UpdateSubscriptions(account)) diff --git a/mediaproxy/mediacontrol.py b/mediaproxy/mediacontrol.py index 3a6f1e2..ed487ee 100644 --- a/mediaproxy/mediacontrol.py +++ b/mediaproxy/mediacontrol.py @@ -1,827 +1,829 @@ import hashlib import struct from application import log from base64 import b64encode as base64_encode from itertools import chain from collections import deque from operator import attrgetter from time import time from twisted.internet import reactor from twisted.internet.interfaces import IReadDescriptor from twisted.internet.protocol import DatagramProtocol from twisted.internet.error import CannotListenError from twisted.python.log import Logger -from zope.interface import implements +from zope.interface import implementer from mediaproxy.configuration import RelayConfig from mediaproxy.interfaces.system import _conntrack from mediaproxy.iputils import is_routable_ip from mediaproxy.scheduler import RecurrentCall, KeepRunning UDP_TIMEOUT_FILE = '/proc/sys/net/ipv4/netfilter/ip_conntrack_udp_timeout_stream' rtp_payloads = { 0: 'G711u', 1: '1016', 2: 'G721', 3: 'GSM', 4: 'G723', 5: 'DVI4', 6: 'DVI4', 7: 'LPC', 8: 'G711a', 9: 'G722', 10: 'L16', 11: 'L16', 14: 'MPA', 15: 'G728', 18: 'G729', 25: 'CelB', 26: 'JPEG', 28: 'nv', 31: 'H261', 32: 'MPV', 33: 'MP2T', 34: 'H263' } class RelayPortsExhaustedError(Exception): pass if RelayConfig.relay_ip is None: raise RuntimeError('Could not determine default host IP; either add default route or specify relay IP manually') class SessionLogger(log.ContextualLogger): def __init__(self, session): super(SessionLogger, self).__init__(logger=log.get_logger()) # use the main logger as backend self.session_id = session.call_id def apply_context(self, message): return '[session {0.session_id}] {1}'.format(self, message) if message != '' else '' class Address(object): """Representation of an endpoint address""" def __init__(self, host, port, in_use=True, got_rtp=False): self.host = host self.port = port - self.in_use = self.__nonzero__() and in_use + self.in_use = self.__bool__() and in_use self.got_rtp = got_rtp def __len__(self): return 2 - def __nonzero__(self): + def __bool__(self): return None not in (self.host, self.port) def __getitem__(self, index): return (self.host, self.port)[index] def __contains__(self, item): return item in (self.host, self.port) def __iter__(self): yield self.host yield self.port def __str__(self): - return self.__nonzero__() and ('%s:%d' % (self.host, self.port)) or 'Unknown' + return self.__bool__() and ('%s:%d' % (self.host, self.port)) or 'Unknown' def __repr__(self): return '%s(%r, %r, in_use=%r, got_rtp=%r)' % (self.__class__.__name__, self.host, self.port, self.in_use, self.got_rtp) def forget(self): self.host, self.port, self.in_use, self.got_rtp = None, None, False, False @property def unknown(self): return None in (self.host, self.port) @property def obsolete(self): - return self.__nonzero__() and not self.in_use + return self.__bool__() and not self.in_use class Counters(dict): def __add__(self, other): n = Counters(self) - for k, v in other.iteritems(): + for k, v in other.items(): n[k] += v return n def __iadd__(self, other): - for k, v in other.iteritems(): + for k, v in other.items(): self[k] += v return self @property def caller_bytes(self): return self['caller_bytes'] @property def callee_bytes(self): return self['callee_bytes'] @property def caller_packets(self): return self['caller_packets'] @property def callee_packets(self): return self['callee_packets'] @property def relayed_bytes(self): return self['caller_bytes'] + self['callee_bytes'] @property def relayed_packets(self): return self['caller_packets'] + self['callee_packets'] class StreamListenerProtocol(DatagramProtocol): noisy = False def __init__(self): self.cb_func = None self.sdp = None self.send_packet_count = 0 self.stun_queue = [] - def datagramReceived(self, data, (host, port)): + def datagramReceived(self, data, addr): + (host, port) = addr if self.cb_func is not None: self.cb_func(host, port, data) def set_remote_sdp(self, ip, port): if is_routable_ip(ip): self.sdp = ip, port else: self.sdp = None def send(self, data, is_stun, ip=None, port=None): if is_stun: self.stun_queue.append(data) if ip is None or port is None: # this means that we have not received any packets from this host yet, # so we have not learnt its address if self.sdp is None: # we can't do anything if we haven't received the SDP IP yet or # it was in a private range return ip, port = self.sdp # we learnt the IP, empty the STUN packets queue if self.stun_queue: for data in self.stun_queue: self.transport.write(data, (ip, port)) self.stun_queue = [] if not is_stun: if not self.send_packet_count % RelayConfig.userspace_transmit_every: self.transport.write(data, (ip, port)) self.send_packet_count += 1 def _stun_test(data): # Check if data is a STUN request and if it's a binding request if len(data) < 20: return False, False msg_type, msg_len, magic = struct.unpack('!HHI', data[:8]) if msg_type & 0xc == 0 and magic == 0x2112A442: if msg_type == 0x0001: return True, True else: return True, False else: return False, False class MediaSubParty(object): def __init__(self, substream, listener): self.substream = substream self.logger = substream.logger self.listener = listener self.listener.protocol.cb_func = self.got_data self.remote = Address(None, None) host = self.listener.protocol.transport.getHost() self.local = Address(host.host, host.port) self.timer = None self.codec = 'Unknown' self.got_stun_probing = False self.reset() def reset(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout) self.remote.in_use = False # keep remote address around but mark it as obsolete self.remote.got_rtp = False self.got_stun_probing = False self.listener.protocol.send_packet_count = 0 def before_hold(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = reactor.callLater(RelayConfig.on_hold_timeout, self.substream.expired, 'on hold timeout', RelayConfig.on_hold_timeout) def after_hold(self): if self.timer and self.timer.active(): self.timer.cancel() if not self.remote.in_use: self.timer = reactor.callLater(RelayConfig.stream_timeout, self.substream.expired, 'no-traffic timeout', RelayConfig.stream_timeout) def got_data(self, host, port, data): if (host, port) == tuple(self.remote): if self.remote.obsolete: # the received packet matches the previously used IP/port, # which has been made obsolete, so ignore it return else: if self.remote.in_use: # the received packet is different than the recorded IP/port, # so we will discard it return # we have learnt the remote IP/port self.remote.host, self.remote.port = host, port self.remote.in_use = True self.logger.info('discovered peer: %s' % self.substream.stream) is_stun, is_binding_request = _stun_test(data) self.substream.send_data(self, data, is_stun) if not self.remote.got_rtp and not is_stun: # This is the first RTP packet received self.remote.got_rtp = True if self.timer: if self.timer.active(): self.timer.cancel() self.timer = None if self.codec == 'Unknown' and self.substream is self.substream.stream.rtp: try: - pt = ord(data[1]) & 127 + pt = data[1] & 127 except IndexError: pass else: if pt > 95: self.codec = 'Dynamic(%d)' % pt elif pt in rtp_payloads: self.codec = rtp_payloads[pt] else: self.codec = 'Unknown(%d)' % pt self.substream.check_create_conntrack() if is_binding_request: self.got_stun_probing = True def cleanup(self): if self.timer and self.timer.active(): self.timer.cancel() self.timer = None self.listener.protocol.cb_func = None self.substream = None class MediaSubStream(object): def __init__(self, stream, listener_caller, listener_callee): self.stream = stream self.logger = stream.logger self.forwarding_rule = None self.caller = MediaSubParty(self, listener_caller) self.callee = MediaSubParty(self, listener_callee) self._counters = Counters(caller_bytes=0, callee_bytes=0, caller_packets=0, callee_packets=0) @property def counters(self): """Accumulated counters from all the forwarding rules the stream had""" if self.forwarding_rule is None: return self._counters else: try: return self._counters + self.forwarding_rule.counters except _conntrack.Error: return self._counters def _stop_relaying(self): if self.forwarding_rule is not None: try: self._counters += self.forwarding_rule.counters except _conntrack.Error: pass self.forwarding_rule = None def reset(self, party): if party == 'caller': self.caller.reset() else: self.callee.reset() self._stop_relaying() def check_create_conntrack(self): if self.stream.first_media_time is None: self.stream.first_media_time = time() if self.caller.remote.in_use and self.caller.remote.got_rtp and self.callee.remote.in_use and self.callee.remote.got_rtp: self.forwarding_rule = _conntrack.ForwardingRule(self.caller.remote, self.caller.local, self.callee.remote, self.callee.local, self.stream.session.mark) self.forwarding_rule.expired_func = self.conntrack_expired def send_data(self, source, data, is_stun): if source is self.caller: dest = self.callee else: dest = self.caller if dest.remote: # if we have already learnt the remote address of the destination, use that ip, port = dest.remote.host, dest.remote.port dest.listener.protocol.send(data, is_stun, ip, port) else: # otherwise use the IP/port specified in the SDP, if public dest.listener.protocol.send(data, is_stun) def conntrack_expired(self): try: timeout_wait = int(open(UDP_TIMEOUT_FILE).read()) except: timeout_wait = 0 self.expired('conntrack timeout', timeout_wait) def expired(self, reason, timeout_wait): self._stop_relaying() self.stream.substream_expired(self, reason, timeout_wait) def cleanup(self): self.caller.cleanup() self.callee.cleanup() self._stop_relaying() self.stream = None class MediaParty(object): def __init__(self, stream): self.manager = stream.session.manager self.logger = stream.logger self._remote_sdp = None self.is_on_hold = False self.uses_ice = False while True: self.listener_rtp = None self.ports = port_rtp, port_rtcp = self.manager.get_ports() try: self.listener_rtp = reactor.listenUDP(port_rtp, StreamListenerProtocol(), interface=RelayConfig.relay_ip) self.listener_rtcp = reactor.listenUDP(port_rtcp, StreamListenerProtocol(), interface=RelayConfig.relay_ip) except CannotListenError: if self.listener_rtp is not None: self.listener_rtp.stopListening() self.manager.set_bad_ports(self.ports) self.logger.warning('Cannot use port pair %d/%d' % self.ports) else: break def _get_remote_sdp(self): return self._remote_sdp - def _set_remote_sdp(self, (ip, port)): + def _set_remote_sdp(self, addr): + (ip, port) = addr self._remote_sdp = ip, port self.listener_rtp.protocol.set_remote_sdp(ip, port) remote_sdp = property(_get_remote_sdp, _set_remote_sdp) def cleanup(self): self.listener_rtp.stopListening() self.listener_rtcp.stopListening() self.manager.free_ports(self.ports) self.manager = None class MediaStream(object): def __init__(self, session, media_type, media_ip, media_port, direction, media_parameters, initiating_party): self.is_alive = True self.session = session # type: Session self.logger = session.logger self.media_type = media_type self.caller = MediaParty(self) self.callee = MediaParty(self) self.rtp = MediaSubStream(self, self.caller.listener_rtp, self.callee.listener_rtp) self.rtcp = MediaSubStream(self, self.caller.listener_rtcp, self.callee.listener_rtcp) getattr(self, initiating_party).remote_sdp = (media_ip, media_port) getattr(self, initiating_party).uses_ice = (media_parameters.get('ice', 'no') == 'yes') self.check_hold(initiating_party, direction, media_ip) self.create_time = time() self.first_media_time = None self.start_time = None self.end_time = None self.status = 'active' self.timeout_wait = 0 def __str__(self): if self.caller.remote_sdp is None: src = 'Unknown' else: src = '%s:%d' % self.caller.remote_sdp if self.caller.is_on_hold: src += ' ON HOLD' if self.caller.uses_ice: src += ' (ICE)' if self.callee.remote_sdp is None: dst = 'Unknown' else: dst = '%s:%d' % self.callee.remote_sdp if self.callee.is_on_hold: dst += ' ON HOLD' if self.callee.uses_ice: dst += ' (ICE)' rtp = self.rtp rtcp = self.rtcp return '(%s) %s (RTP: %s, RTCP: %s) <-> %s <-> %s <-> %s (RTP: %s, RTCP: %s)' % ( self.media_type, src, rtp.caller.remote, rtcp.caller.remote, rtp.caller.local, rtp.callee.local, dst, rtp.callee.remote, rtcp.callee.remote) @property def counters(self): return self.rtp.counters + self.rtcp.counters @property def is_on_hold(self): return self.caller.is_on_hold or self.callee.is_on_hold def check_hold(self, party, direction, ip): previous_hold = self.is_on_hold party = getattr(self, party) if direction == 'sendonly' or direction == 'inactive': party.is_on_hold = True elif ip == '0.0.0.0': party.is_on_hold = True else: party.is_on_hold = False if previous_hold and not self.is_on_hold: for substream in [self.rtp, self.rtcp]: for subparty in [substream.caller, substream.callee]: self.status = 'active' subparty.after_hold() if not previous_hold and self.is_on_hold: for substream in [self.rtp, self.rtcp]: for subparty in [substream.caller, substream.callee]: self.status = 'on hold' subparty.before_hold() def reset(self, party, media_ip, media_port): self.rtp.reset(party) self.rtcp.reset(party) getattr(self, party).remote_sdp = (media_ip, media_port) def substream_expired(self, substream, reason, timeout_wait): if substream is self.rtp and self.caller.uses_ice and self.callee.uses_ice: reason = 'unselected ICE candidate' self.logger.info('RTP stream expired: {}'.format(reason)) if not substream.caller.got_stun_probing and not substream.callee.got_stun_probing: self.logger.info('unselected ICE candidate, but no STUN was received') if substream is self.rtcp: # Forget about the remote addresses, this will cause any # re-occurrence of the same traffic to be forwarded again substream.caller.remote.forget() substream.caller.listener.protocol.send_packet_count = 0 substream.callee.remote.forget() substream.callee.listener.protocol.send_packet_count = 0 else: session = self.session self.cleanup(reason) self.timeout_wait = timeout_wait session.stream_expired(self) def cleanup(self, status='closed'): if self.is_alive: self.is_alive = False self.status = status self.caller.cleanup() self.callee.cleanup() self.rtp.cleanup() self.rtcp.cleanup() self.session = None self.end_time = time() class Session(object): def __init__(self, manager, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media_list, is_downstream, is_caller_cseq, mark=0): self.manager = manager self.dispatcher = dispatcher - self.session_id = base64_encode(hashlib.md5(call_id).digest()).rstrip('=') + self.session_id = base64_encode(hashlib.md5(call_id.encode()).digest()).rstrip(b'=') self.call_id = call_id self.from_tag = from_tag self.to_tag = None self.mark = mark self.from_uri = from_uri self.to_uri = to_uri self.caller_ua = None self.callee_ua = None self.cseq = None self.previous_cseq = None self.streams = {} self.start_time = None self.end_time = None self.logger = SessionLogger(self) self.logger.info('created: from-tag {0.from_tag})'.format(self)) self.update_media(cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq) def update_media(self, cseq, to_tag, user_agent, media_list, is_downstream, is_caller_cseq): if self.cseq is None: old_cseq = (0, 0) else: old_cseq = self.cseq if is_caller_cseq: cseq = (cseq, old_cseq[1]) if self.to_tag is None and to_tag is not None: self.to_tag = to_tag else: cseq = (old_cseq[0], cseq) if is_downstream: party = 'caller' if self.caller_ua is None: self.caller_ua = user_agent else: party = 'callee' if self.callee_ua is None: self.callee_ua = user_agent if self.cseq is None or cseq > self.cseq: if not media_list: return self.logger.info('got SDP offer') self.streams[cseq] = new_streams = [] if self.cseq is None: old_streams = [] else: old_streams = self.streams[self.cseq] for media_type, media_ip, media_port, media_direction, media_parameters in media_list: for old_stream in old_streams: old_remote = getattr(old_stream, party).remote_sdp if old_remote is not None: old_ip, old_port = old_remote else: old_ip, old_port = None, None if old_stream.is_alive and old_stream.media_type == media_type and ((media_ip, media_port) in ((old_ip, old_port), ('0.0.0.0', old_port), (old_ip, 0))): stream = old_stream stream.check_hold(party, media_direction, media_ip) if media_port == 0: self.logger.info('disabled stream: %s', stream) else: self.logger.info('retained stream: %s', stream) break else: stream = MediaStream(self, media_type, media_ip, media_port, media_direction, media_parameters, party) self.logger.info('proposed stream: %s' % stream) if media_port == 0: stream.cleanup() new_streams.append(stream) if self.previous_cseq is not None: for stream in self.streams[self.previous_cseq]: if stream not in self.streams[self.cseq] + new_streams: stream.cleanup() self.previous_cseq = self.cseq self.cseq = cseq elif self.cseq == cseq: self.logger.info('got SDP answer') now = time() if self.start_time is None: self.start_time = now current_streams = self.streams[cseq] for stream in current_streams: if stream.start_time is None: stream.start_time = now if to_tag is not None and not media_list: return if len(media_list) < len(current_streams): for stream in current_streams[len(media_list):]: self.logger.info('removed! stream: %s' % stream) stream.cleanup('rejected') for stream, (media_type, media_ip, media_port, media_direction, media_parameters) in zip(current_streams, media_list): if stream.media_type != media_type: raise ValueError('Media types do not match: %r and %r' % (stream.media_type, media_type)) if media_port == 0: if stream.is_alive: self.logger.info('rejected stream: %s' % stream) else: self.logger.info('disabled stream: %s' % stream) stream.cleanup('rejected') continue stream.check_hold(party, media_direction, media_ip) party_info = getattr(stream, party) party_info.uses_ice = (media_parameters.get('ice', 'no') == 'yes') if party_info.remote_sdp is None or party_info.remote_sdp[0] == '0.0.0.0': party_info.remote_sdp = (media_ip, media_port) self.logger.info('accepted stream: %s' % stream) else: if party_info.remote_sdp[1] != media_port or (party_info.remote_sdp[0] != media_ip != '0.0.0.0'): stream.reset(party, media_ip, media_port) self.logger.info('updating stream: %s' % stream) else: self.logger.info('retained stream: %s' % stream) if self.previous_cseq is not None: for stream in [stream for stream in self.streams[self.previous_cseq] if stream not in current_streams]: self.logger.info('removing stream: %s' % stream) stream.cleanup() else: self.logger.info('got old CSeq %d:%d, ignoring' % cseq) def get_local_media(self, is_downstream, cseq, is_caller_cseq): if is_caller_cseq: pos = 0 else: pos = 1 try: - cseq = max(key for key in self.streams.keys() if key[pos] == cseq) + cseq = max(key for key in list(self.streams.keys()) if key[pos] == cseq) except ValueError: return None if is_downstream: retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.callee.local) or (stream.rtp.callee.local.host, 0) for stream in self.streams[cseq]] else: retval = [(stream.status in ['active', 'on hold']) and tuple(stream.rtp.caller.local) or (stream.rtp.caller.local.host, 0) for stream in self.streams[cseq]] return retval def cleanup(self): self.end_time = time() for cseq in [self.previous_cseq, self.cseq]: if cseq is not None: for stream in self.streams[cseq]: stream.cleanup() def stream_expired(self, stream): active_streams = set() for cseq in [self.previous_cseq, self.cseq]: if cseq is not None: active_streams.update({stream for stream in self.streams[cseq] if stream.is_alive}) if len(active_streams) == 0: self.manager.session_expired(self.call_id, self.from_tag) @property def duration(self): if self.start_time is not None: if self.end_time is not None: return int(self.end_time - self.start_time) else: return int(time() - self.start_time) else: return 0 @property def relayed_bytes(self): - return sum(stream.counters.relayed_bytes for stream in set(chain(*self.streams.itervalues()))) + return sum(stream.counters.relayed_bytes for stream in set(chain(*iter(self.streams.values())))) @property def statistics(self): - all_streams = set(chain(*self.streams.itervalues())) + all_streams = set(chain(*iter(self.streams.values()))) attributes = ('call_id', 'from_tag', 'from_uri', 'to_tag', 'to_uri', 'start_time', 'duration') stats = dict((name, getattr(self, name)) for name in attributes) stats['caller_ua'] = self.caller_ua or 'Unknown' stats['callee_ua'] = self.callee_ua or 'Unknown' stats['streams'] = streams = [] stream_attributes = ('media_type', 'status', 'timeout_wait') for stream in sorted(all_streams, key=attrgetter('start_time')): # type: MediaStream info = dict((name, getattr(stream, name)) for name in stream_attributes) info['caller_codec'] = stream.rtp.caller.codec info['callee_codec'] = stream.rtp.callee.codec if stream.start_time is None: info['start_time'] = info['end_time'] = None elif self.start_time is None: info['start_time'] = info['end_time'] = 0 else: info['start_time'] = max(int(stream.start_time - self.start_time), 0) if stream.status == 'rejected': info['end_time'] = info['start_time'] else: if stream.end_time is None: info['end_time'] = stats['duration'] else: info['end_time'] = min(int(stream.end_time - self.start_time), self.duration) if stream.first_media_time is None: info['post_dial_delay'] = None else: info['post_dial_delay'] = stream.first_media_time - stream.create_time caller = stream.rtp.caller callee = stream.rtp.callee info.update(stream.counters) info['caller_local'] = str(caller.local) info['callee_local'] = str(callee.local) info['caller_remote'] = str(caller.remote) info['callee_remote'] = str(callee.remote) streams.append(info) return stats class SessionManager(Logger): - implements(IReadDescriptor) + @implementer(IReadDescriptor) def __init__(self, relay, start_port, end_port): self.relay = relay - self.ports = deque((i, i + 1) for i in xrange(start_port, end_port, 2)) + self.ports = deque((i, i + 1) for i in range(start_port, end_port, 2)) self.bad_ports = deque() self.sessions = {} self.watcher = _conntrack.ExpireWatcher() self.active_byte_counter = 0 # relayed byte counter for sessions active during last speed measurement self.closed_byte_counter = 0 # relayed byte counter for sessions closed after last speed measurement self.bps_relayed = 0 if RelayConfig.traffic_sampling_period > 0: self.speed_calculator = RecurrentCall(RelayConfig.traffic_sampling_period, self._measure_speed) else: self.speed_calculator = None reactor.addReader(self) def _measure_speed(self): start_time = time() - current_byte_counter = sum(session.relayed_bytes for session in self.sessions.itervalues()) + current_byte_counter = sum(session.relayed_bytes for session in self.sessions.values()) self.bps_relayed = 8 * (current_byte_counter + self.closed_byte_counter - self.active_byte_counter) / RelayConfig.traffic_sampling_period self.active_byte_counter = current_byte_counter self.closed_byte_counter = 0 us_taken = int((time() - start_time) * 1000000) if us_taken > 10000: log.warning('Aggregate speed calculation time exceeded 10ms: %d us for %d sessions' % (us_taken, len(self.sessions))) return KeepRunning # implemented for IReadDescriptor def fileno(self): return self.watcher.fd def doRead(self): stream = self.watcher.read() if stream: stream.expired_func() def connectionLost(self, reason): reactor.removeReader(self) # port management def get_ports(self): if len(self.bad_ports) > len(self.ports): log.debug('Excessive amount of bad ports, doing cleanup') self.ports.extend(self.bad_ports) self.bad_ports = deque() try: return self.ports.popleft() except IndexError: raise RelayPortsExhaustedError() def set_bad_ports(self, ports): self.bad_ports.append(ports) def free_ports(self, ports): self.ports.append(ports) # called by higher level def _find_session_key(self, call_id, from_tag, to_tag): key_from = (call_id, from_tag) if key_from in self.sessions: return key_from if to_tag: key_to = (call_id, to_tag) if key_to in self.sessions: return key_to return None def has_session(self, call_id, from_tag, to_tag=None, **kw): return any((call_id, tag) in self.sessions for tag in (from_tag, to_tag) if tag is not None) def update_session(self, dispatcher, call_id, from_tag, from_uri, to_uri, cseq, user_agent, type, media=[], to_tag=None, **kw): key = self._find_session_key(call_id, from_tag, to_tag) if key: session = self.sessions[key] is_downstream = (session.from_tag != from_tag) ^ (type == 'request') is_caller_cseq = (session.from_tag == from_tag) session.update_media(cseq, to_tag, user_agent, media, is_downstream, is_caller_cseq) elif type == 'reply' and not media: return None else: is_downstream = type == 'request' is_caller_cseq = True session = Session(self, dispatcher, call_id, from_tag, from_uri, to_tag, to_uri, cseq, user_agent, media, is_downstream, is_caller_cseq) self.sessions[(call_id, from_tag)] = session self.relay.add_session(dispatcher) return session.get_local_media(is_downstream, cseq, is_caller_cseq) def remove_session(self, call_id, from_tag, to_tag=None, **kw): key = self._find_session_key(call_id, from_tag, to_tag) try: session = self.sessions[key] except KeyError: log.warning('The dispatcher tried to remove a session which is no longer present on the relay') return None session.logger.info('removed') session.cleanup() self.closed_byte_counter += session.relayed_bytes del self.sessions[key] reactor.callLater(0, self.relay.remove_session, session.dispatcher) return session def session_expired(self, call_id, from_tag): key = (call_id, from_tag) try: session = self.sessions[key] except KeyError: log.warning('A session expired but is no longer present on the relay') return session.logger.info('expired') session.cleanup() self.closed_byte_counter += session.relayed_bytes del self.sessions[key] self.relay.session_expired(session) self.relay.remove_session(session.dispatcher) def cleanup(self): if self.speed_calculator is not None: self.speed_calculator.cancel() - for key in self.sessions.keys(): + for key in list(self.sessions.keys()): self.session_expired(*key) @property def statistics(self): - return [session.statistics for session in self.sessions.itervalues()] + return [session.statistics for session in self.sessions.values()] @property def stream_count(self): stream_count = {} - for session in self.sessions.itervalues(): - for stream in set(chain(*session.streams.itervalues())): + for session in self.sessions.values(): + for stream in set(chain(*iter(session.streams.values()))): if stream.is_alive: stream_count[stream.media_type] = stream_count.get(stream.media_type, 0) + 1 return stream_count diff --git a/mediaproxy/relay.py b/mediaproxy/relay.py index 05f7951..b1e1ffa 100644 --- a/mediaproxy/relay.py +++ b/mediaproxy/relay.py @@ -1,396 +1,413 @@ """Implementation of the MediaProxy relay""" -import cjson +import json import signal import resource +""" try: from twisted.internet import epollreactor; epollreactor.install() except: raise RuntimeError('mandatory epoll reactor support is not available from the twisted framework') +""" from application import log from application.process import process from gnutls.errors import CertificateError, CertificateSecurityError from gnutls.interfaces.twisted import TLSContext from time import time from twisted.protocols.basic import LineOnlyReceiver +from twisted.protocols.policies import TimeoutMixin from twisted.internet.error import ConnectionDone, TCPTimedOutError, DNSLookupError from twisted.internet.protocol import ClientFactory, connectionDone from twisted.internet.defer import DeferredList, succeed from twisted.internet import reactor from twisted.python import failure from twisted.names import dns from twisted.names.client import lookupService from twisted.names.error import DomainError from mediaproxy import __version__ from mediaproxy.configuration import RelayConfig from mediaproxy.headers import DecodingDict, DecodingError from mediaproxy.mediacontrol import SessionManager, RelayPortsExhaustedError from mediaproxy.scheduler import RecurrentCall, KeepRunning from mediaproxy.tls import X509Credentials # Increase the system limit for the maximum number of open file descriptors # to be able to handle connections to all ports in port_range + fd_limit = RelayConfig.port_range.end - RelayConfig.port_range.start + 1000 try: resource.setrlimit(resource.RLIMIT_NOFILE, (fd_limit, fd_limit)) except ValueError: raise RuntimeError('Cannot set resource limit for maximum open file descriptors to %d' % fd_limit) else: new_limits = resource.getrlimit(resource.RLIMIT_NOFILE) if new_limits < (fd_limit, fd_limit): raise RuntimeError("Allocated resource limit for maximum open file descriptors is less then requested (%d instead of %d)" % (new_limits[0], fd_limit)) else: log.info('Set resource limit for maximum open file descriptors to %d' % fd_limit) -class RelayClientProtocol(LineOnlyReceiver): +class RelayClientProtocol(LineOnlyReceiver, TimeoutMixin): noisy = False required_headers = {'update': {'call_id', 'from_tag', 'from_uri', 'to_uri', 'cseq', 'user_agent', 'type'}, 'remove': {'call_id', 'from_tag'}, 'summary': set(), 'sessions': set()} def __init__(self): self.command = None self.seq = None self.headers = DecodingDict() self._connection_watcher = None self._queued_keepalives = 0 def _send_keepalive(self): if self._queued_keepalives >= 3: log.error('missed 3 keepalive answers in a row. assuming the connection is down.') # do not use loseConnection() as it waits to flush the output buffers. reactor.callLater(0, self.transport.connectionLost, failure.Failure(TCPTimedOutError())) return None - self.transport.write('ping' + self.delimiter) + self.transport.write(b'ping' + self.delimiter) self._queued_keepalives += 1 return KeepRunning def reply(self, reply): - self.transport.write(reply + self.delimiter) + log.debug(f"Send reply: {reply} to {self.transport.getPeer().host}:{self.transport.getPeer().port}") + self.transport.write(reply.encode() + self.delimiter) def connectionMade(self): peer = self.transport.getPeer() log.info('Connected to dispatcher at %s:%d' % (peer.host, peer.port)) if RelayConfig.passport is not None: peer_cert = self.transport.getPeerCertificate() + log.debug(f"peer {self.transport.getPeer().host}:{self.transport.getPeer().port} {peer_cert.subject}") if not RelayConfig.passport.accept(peer_cert): + log.debug("Media dispatcher certificate %s refused" % peer_cert.subject) self.transport.loseConnection() self._connection_watcher = RecurrentCall(RelayConfig.keepalive_interval, self._send_keepalive) def connectionLost(self, reason=connectionDone): if self._connection_watcher is not None: self._connection_watcher.cancel() self._connection_watcher = None self._queued_keepalives = 0 def lineReceived(self, line): + line = line.decode() + log.debug(f"Line received: {line} from {self.transport.getPeer().host}:{self.transport.getPeer().port}") if line == 'pong': self._queued_keepalives -= 1 return if self.command is None: try: command, seq = line.split() except ValueError: log.error('Could not decode command/sequence number pair from dispatcher: %s' % line) return if command in self.required_headers: self.command = command self.seq = seq self.headers = DecodingDict() else: log.error('Unknown command: %s' % command) self.reply('{} error'.format(seq)) elif line == '': missing_headers = self.required_headers[self.command].difference(self.headers) if missing_headers: for header in missing_headers: log.error('Missing mandatory header %r from %r command' % (header, self.command)) response = 'error' else: # noinspection PyBroadException try: response = self.factory.parent.got_command(self.factory.host, self.command, self.headers) except Exception: log.exception() response = 'error' self.reply('{} {}'.format(self.seq, response)) self.command = None else: try: name, value = line.split(": ", 1) except ValueError: log.error('Unable to parse header: %s' % line) else: try: self.headers[name] = value - except DecodingError, e: + except DecodingError as e: log.error('Could not decode header: %s' % e) class DispatcherConnectingFactory(ClientFactory): noisy = False protocol = RelayClientProtocol def __init__(self, parent, host, port): self.parent = parent self.host = (host, port) self.delayed = None self.connection_lost = False def __eq__(self, other): return self.host == other.host def clientConnectionFailed(self, connector, reason): log.error('Could not connect to dispatcher at %(host)s:%(port)d (retrying in %%d seconds): %%s' % connector.__dict__ % (RelayConfig.reconnect_delay, reason.value)) if self.parent.connector_needs_reconnect(connector): self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) def clientConnectionLost(self, connector, reason): self.cancel_delayed() - if reason.type != ConnectionDone: - log.error('Connection with dispatcher at %(host)s:%(port)d was lost: %%s' % connector.__dict__ % reason.value) - else: - log.info('Connection with dispatcher at %(host)s:%(port)d was closed' % connector.__dict__) +# if reason.type != ConnectionDone: + log.error('Connection with dispatcher at %(host)s:%(port)d was lost: %%s' % connector.__dict__ % reason.value) +# else: +# log.info('Connection with dispatcher at %(host)s:%(port)d was closed' % connector.__dict__) if self.parent.connector_needs_reconnect(connector): if isinstance(reason.value, CertificateError) or self.connection_lost: self.delayed = reactor.callLater(RelayConfig.reconnect_delay, connector.connect) else: self.delayed = reactor.callLater(min(RelayConfig.reconnect_delay, 1), connector.connect) self.connection_lost = True def buildProtocol(self, addr): self.delayed = reactor.callLater(5, self._connected_successfully) return ClientFactory.buildProtocol(self, addr) def _connected_successfully(self): + log.debug('Connected successfully') self.connection_lost = False def cancel_delayed(self): if self.delayed: if self.delayed.active(): self.delayed.cancel() self.delayed = None class SRVMediaRelayBase(object): def __init__(self): self.shutting_down = False self.srv_monitor = RecurrentCall(RelayConfig.dns_check_interval, self._do_lookup) self._do_lookup() def _do_lookup(self): defers = [] for addr, port, is_domain in RelayConfig.dispatchers: if is_domain: defer = lookupService("_sip._udp.%s" % addr) defer.addCallback(self._cb_got_srv, port) defer.addErrback(self._eb_no_srv, addr, port) defers.append(defer) else: defers.append(succeed((addr, port))) defer = DeferredList(defers) defer.addCallback(self._cb_got_all) return KeepRunning - def _cb_got_srv(self, (answers, auth, add), port): + def _cb_got_srv(self, answ_auth_add, port): + (answers, auth, add) = answ_auth_add for answer in answers: if answer.type == dns.SRV and answer.payload and answer.payload.target != dns.Name("."): return str(answer.payload.target), port raise DomainError def _eb_no_srv(self, failure, addr, port): failure.trap(DomainError) return reactor.resolve(addr).addCallback(lambda host: (host, port)).addErrback(self._eb_no_dns, addr) def _eb_no_dns(self, failure, addr): failure.trap(DNSLookupError) log.error("Could resolve neither SRV nor A record for '%s'" % addr) def _cb_got_all(self, results): if not self.shutting_down: dispatchers = [result[1] for result in results if result[0] and result[1] is not None] self.update_dispatchers(dispatchers) def update_dispatchers(self, dispatchers): raise NotImplementedError() def run(self): process.signals.add_handler(signal.SIGHUP, self._handle_signal) process.signals.add_handler(signal.SIGINT, self._handle_signal) process.signals.add_handler(signal.SIGTERM, self._handle_signal) process.signals.add_handler(signal.SIGUSR1, self._handle_signal) reactor.run(installSignalHandlers=False) def stop(self, graceful=False): reactor.callFromThread(self._shutdown, graceful=graceful) def _handle_signal(self, signum, frame): if signum == signal.SIGUSR1: # toggle debugging if log.level.current != log.level.DEBUG: log.level.current = log.level.DEBUG log.info('Switched logging level to DEBUG') else: log.info('Switched logging level to {}'.format(RelayConfig.log_level)) log.level.current = RelayConfig.log_level else: # terminate program signal_map = {signal.SIGTERM: 'Terminated', signal.SIGINT: 'Interrupted', signal.SIGHUP: 'Graceful shutdown'} log.info(signal_map.get(signum, 'Received signal {}, exiting.'.format(signum))) self.stop(graceful=(signum == signal.SIGHUP)) def _shutdown(self, graceful=False): raise NotImplementedError() @staticmethod def _shutdown_done(): reactor.stop() try: from mediaproxy.sipthor import SIPThorMediaRelayBase as MediaRelayBase except ImportError: MediaRelayBase = SRVMediaRelayBase class MediaRelay(MediaRelayBase): def __init__(self): self.cred = X509Credentials(cert_name='relay') self.tls_context = TLSContext(self.cred) self.session_manager = SessionManager(self, RelayConfig.port_range.start, RelayConfig.port_range.end) self.dispatchers = set() self.dispatcher_session_count = {} self.dispatcher_connectors = {} self.old_connectors = {} self.shutting_down = False self.graceful_shutdown = False self.start_time = time() - super(MediaRelay, self).__init__() + super().__init__() @property def status(self): if self.graceful_shutdown or self.shutting_down: return 'halting' else: return 'active' def update_dispatchers(self, dispatchers): dispatchers = set(dispatchers) for new_dispatcher in dispatchers.difference(self.dispatchers): - if new_dispatcher in self.old_connectors.iterkeys(): + if new_dispatcher in iter(self.old_connectors.keys()): log.info('Restoring old dispatcher at %s:%d' % new_dispatcher) self.dispatcher_connectors[new_dispatcher] = self.old_connectors.pop(new_dispatcher) else: log.info('Adding new dispatcher at %s:%d' % new_dispatcher) dispatcher_addr, dispatcher_port = new_dispatcher factory = DispatcherConnectingFactory(self, dispatcher_addr, dispatcher_port) - self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, dispatcher_port, factory, self.tls_context) + self.dispatcher_connectors[new_dispatcher] = reactor.connectTLS(dispatcher_addr, + dispatcher_port, + factory, + self.tls_context) for old_dispatcher in self.dispatchers.difference(dispatchers): try: self.old_connectors[old_dispatcher] = self.dispatcher_connectors.pop(old_dispatcher) except KeyError: pass else: log.info('Removing old dispatcher at %s:%d' % old_dispatcher) self._check_disconnect(old_dispatcher) self.dispatchers = dispatchers def got_command(self, dispatcher, command, headers): if command == 'summary': summary = {'ip': RelayConfig.relay_ip, 'version': __version__, 'status': self.status, 'uptime': int(time() - self.start_time), 'session_count': len(self.session_manager.sessions), 'stream_count': self.session_manager.stream_count, 'bps_relayed': self.session_manager.bps_relayed} - return cjson.encode(summary) + return json.dumps(summary) elif command == 'sessions': - return cjson.encode(self.session_manager.statistics) + return json.dumps(self.session_manager.statistics) elif command == 'update': if self.graceful_shutdown or self.shutting_down: if not self.session_manager.has_session(**headers): log.info('cannot add new session: media-relay is shutting down') return 'halting' try: local_media = self.session_manager.update_session(dispatcher, **headers) except RelayPortsExhaustedError: log.error('Could not reserve relay ports for session, all allocated ports are being used') return 'error' if local_media: return ' '.join([RelayConfig.advertised_ip or local_media[0][0]] + [str(media[1]) for media in local_media]) else: # command == 'remove' session = self.session_manager.remove_session(**headers) if session is None: return 'error' else: - return cjson.encode(session.statistics) + return json.dumps(session.statistics) def session_expired(self, session): connector = self.dispatcher_connectors.get(session.dispatcher) if connector is None: connector = self.old_connectors.get(session.dispatcher) if connector and connector.state == 'connected': - connector.transport.write(' '.join(['expired', cjson.encode(session.statistics)]) + connector.factory.protocol.delimiter) +# connector.transport.write(' '.join(['expired', json.dumps(session.statistics)]) + connector.factory.protocol.delimiter) + to_write = [elem.encode() for elem in ['expired', json.dumps(session.statistics)]] + connector.transport.write(connector.factory.protocol.delimiter.join(to_write) + connector.factory.protocol.delimiter) + else: log.warning('dispatcher for expired session is no longer online, statistics are lost!') def add_session(self, dispatcher): self.dispatcher_session_count[dispatcher] = self.dispatcher_session_count.get(dispatcher, 0) + 1 def remove_session(self, dispatcher): self.dispatcher_session_count[dispatcher] -= 1 if self.dispatcher_session_count[dispatcher] == 0: del self.dispatcher_session_count[dispatcher] if self.graceful_shutdown and not self.dispatcher_session_count: self._shutdown() elif dispatcher in self.old_connectors: self._check_disconnect(dispatcher) def _check_disconnect(self, dispatcher): connector = self.old_connectors[dispatcher] if self.dispatcher_session_count.get(dispatcher, 0) == 0: old_state = connector.state connector.factory.cancel_delayed() connector.disconnect() if old_state == "disconnected": del self.old_connectors[dispatcher] if self.shutting_down and len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown_done() def connector_needs_reconnect(self, connector): - if connector in self.dispatcher_connectors.values(): + if connector in list(self.dispatcher_connectors.values()): return True else: - for dispatcher, old_connector in self.old_connectors.items(): + for dispatcher, old_connector in list(self.old_connectors.items()): if old_connector is connector: if self.dispatcher_session_count.get(dispatcher, 0) > 0: return True else: del self.old_connectors[dispatcher] break if self.shutting_down: if len(self.old_connectors) == 0: self._shutdown_done() return False def _shutdown(self, graceful=False): if graceful: self.graceful_shutdown = True if self.dispatcher_session_count: return if not self.shutting_down: self.shutting_down = True self.srv_monitor.cancel() self.session_manager.cleanup() if len(self.dispatcher_connectors) + len(self.old_connectors) == 0: self._shutdown_done() else: self.update_dispatchers([]) diff --git a/mediaproxy/scheduler.py b/mediaproxy/scheduler.py index dfa4a3d..023716a 100644 --- a/mediaproxy/scheduler.py +++ b/mediaproxy/scheduler.py @@ -1,46 +1,46 @@ """Schedule calls on the twisted reactor""" __all__ = ['RecurrentCall', 'KeepRunning'] from time import time class KeepRunning: """Return this class from a recurrent function to indicate that it should keep running""" pass class RecurrentCall(object): """Execute a function repeatedly at the given interval, until signaled to stop""" def __init__(self, period, func, *args, **kwargs): from twisted.internet import reactor self.func = func self.args = args self.kwargs = kwargs self.period = period self.now = None self.next = None self.callid = reactor.callLater(period, self) def __call__(self): from twisted.internet import reactor self.callid = None if self.now is None: self.now = time() self.next = self.now + self.period else: self.now, self.next = self.next, self.next + self.period result = self.func(*self.args, **self.kwargs) if result is KeepRunning: - delay = max(self.next-time(), 0) + delay = max(self.next - time(), 0) self.callid = reactor.callLater(delay, self) def cancel(self): if self.callid is not None: try: self.callid.cancel() except ValueError: pass self.callid = None diff --git a/mediaproxy/tls.py b/mediaproxy/tls.py index f0e7518..b3ba24b 100644 --- a/mediaproxy/tls.py +++ b/mediaproxy/tls.py @@ -1,89 +1,91 @@ """TLS support""" __all__ = ['X509Credentials'] import os import stat from application.process import process from gnutls import crypto from gnutls.interfaces import twisted from mediaproxy.configuration import TLSConfig class FileDescriptor(object): def __init__(self, name, type): certs_path = os.path.normpath(TLSConfig.certs_path) +# print(f"Tls config from {certs_path}") self.path = os.path.join(certs_path, name) self.klass = type self.timestamp = 0 self.object = None def get(self): path = process.configuration.file(self.path) if path is None: raise RuntimeError('missing or unreadable file: %s' % self.path) mtime = os.stat(path)[stat.ST_MTIME] if self.timestamp < mtime: f = open(path) try: self.object = self.klass(f.read()) self.timestamp = mtime finally: f.close() return self.object class X509Entity(object): type = None def __init__(self, name_attr): self.name_attr = name_attr self.descriptors = {} def __get__(self, obj, type_=None): name = getattr(obj or type_, self.name_attr, None) if name is None: return None descriptor = self.descriptors.setdefault(name, FileDescriptor(name, self.type)) return descriptor.get() def __set__(self, obj, value): raise AttributeError('cannot set attribute') def __delete__(self, obj): raise AttributeError('cannot delete attribute') class X509Certificate(X509Entity): type = crypto.X509Certificate class X509PrivateKey(X509Entity): type = crypto.X509PrivateKey class X509CRL(X509Entity): type = crypto.X509CRL class X509Credentials(twisted.X509Credentials): """SIPThor X509 credentials""" X509cert_name = None # will be defined by each instance X509key_name = None # will be defined by each instance X509ca_name = 'ca.pem' X509crl_name = 'crl.pem' X509cert = X509Certificate(name_attr='X509cert_name') X509key = X509PrivateKey(name_attr='X509key_name') X509ca = X509Certificate(name_attr='X509ca_name') X509crl = X509CRL(name_attr='X509crl_name') def __init__(self, cert_name): self.X509cert_name = '%s.crt' % cert_name self.X509key_name = '%s.key' % cert_name +# print(f"cert file called {cert_name}") twisted.X509Credentials.__init__(self, self.X509cert, self.X509key, [self.X509ca], [self.X509crl]) self.verify_peer = True self.verify_period = TLSConfig.verify_interval diff --git a/setup.py b/setup.py index e2a8d2f..e6bd61d 100755 --- a/setup.py +++ b/setup.py @@ -1,54 +1,54 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 import re import sys import mediaproxy from distutils.core import setup, Extension # Get the title and description from README readme = open('README').read() title, description = re.findall(r'^\s*([^\n]+)\s+(.*)$', readme, re.DOTALL)[0] # media-relay is not supported on non-linux platforms # -if sys.platform == 'linux2': +if 'linux' in sys.platform: scripts = ['media-relay', 'media-dispatcher'] ext_modules = [Extension(name='mediaproxy.interfaces.system._conntrack', sources=['mediaproxy/interfaces/system/_conntrack.c'], libraries=['netfilter_conntrack', 'ip4tc'], define_macros=[('MODULE_VERSION', mediaproxy.__version__)])] else: print('WARNING: skipping the media relay component as this is a non-linux platform') scripts = ['media-dispatcher'] ext_modules = [] setup( name='mediaproxy', version=mediaproxy.__version__, description=title, long_description=description, url='http://www.ag-projects.com/MediaProxy.html', author='AG Projects', author_email='support@ag-projects.com', license='GPLv2', platforms=['Linux'], classifiers=[ 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Service Providers', 'License :: GNU General Public License (GPLv2)', 'Operating System :: POSIX :: Linux', 'Programming Language :: Python', 'Programming Language :: C' ], packages=['mediaproxy', 'mediaproxy.configuration', 'mediaproxy.interfaces', 'mediaproxy.interfaces.accounting', 'mediaproxy.interfaces.system'], data_files=[('/etc/mediaproxy', ['config.ini.sample']), ('/etc/mediaproxy/radius', ['radius/dictionary']), ('/etc/mediaproxy/tls', ['tls/README'])], scripts=scripts, ext_modules=ext_modules ) diff --git a/test/common.py b/test/common.py index 928914f..f5f6317 100644 --- a/test/common.py +++ b/test/common.py @@ -1,215 +1,217 @@ # Copyright (C) 2008 AG Projects # import sys; sys.path.extend(['.', '..']) import os import random import string import struct import mediaproxy from application.configuration import * from application.process import process from application.system import host from twisted.internet import reactor from twisted.internet.defer import Deferred, DeferredList, succeed from twisted.internet.protocol import DatagramProtocol, ClientFactory from twisted.internet.task import LoopingCall from twisted.protocols.basic import LineOnlyReceiver from mediaproxy.headers import EncodingDict process.configuration.user_directory = None process.configuration.subdirectory = mediaproxy.mediaproxy_subdirectory class Config(ConfigSection): __cfgfile__ = mediaproxy.configuration_file __section__ = 'Dispatcher' socket = '/run/mediaproxy/dispatcher.sock' random_data = os.urandom(512) stun_data = struct.pack('!HHIIII', 0x0001, 0, 0x2112A442, 0, 0, 0) default_host_ip = host.default_ip class OpenSIPSControlClientProtocol(LineOnlyReceiver): def __init__(self): self.defer = None def lineReceived(self, line): + line = line.decode() if line == 'error': print('got error from dispatcher!') reactor.stop() elif self.defer is not None: - print('got ip/ports from dispatcher: %s' % line) + print(('got ip/ports from dispatcher: %s' % line)) ip, ports = line.split(' ', 1) defer = self.defer self.defer = None defer.callback((ip, [int(i) for i in ports.split()])) else: - print('got reply from dispatcher: %s' % line) + print(('got reply from dispatcher: %s' % line)) defer = self.defer self.defer = None defer.callback(line) def _send_command(self, command, headers): self.defer = Deferred() - data = self.delimiter.join([command] + ['%s: %s' % item for item in headers.iteritems()]) + 2*self.delimiter - # print('writing on socket:\n%s' % data) - self.transport.write(data) + data = self.delimiter.decode().join([command] + ['%s: %s' % item for item in headers.items()]) + 2 * self.delimiter.decode() + print('writing on socket:\n%s' % data) + self.transport.write(data.encode()) return self.defer def update(self, **kw_args): return self._send_command('update', EncodingDict(kw_args)) def remove(self, **kw_args): return self._send_command('remove', EncodingDict(kw_args)) class OpenSIPSConnectorFactory(ClientFactory): protocol = OpenSIPSControlClientProtocol def __init__(self): self.defer = Deferred() def buildProtocol(self, addr): prot = ClientFactory.buildProtocol(self, addr) reactor.callLater(0, self.defer.callback, prot) return prot class MediaReceiverProtocol(DatagramProtocol): def __init__(self, endpoint, index): self.endpoint = endpoint self.index = index self.loop = None self.received_media = False self.defer = Deferred() - def datagramReceived(self, data, (host, port)): + def datagramReceived(self, data, addr): + (host, port) = addr if not self.received_media: self.received_media = True - print('received media %d for %s from %s:%d' % (self.index, self.endpoint.name, host, port)) + print(('received media %d for %s from %s:%d' % (self.index, self.endpoint.name, host, port))) self.defer.callback(None) def connectionRefused(self): - print('connection refused for media %d for %s' % (self.index, self.endpoint.name)) + print(('connection refused for media %d for %s' % (self.index, self.endpoint.name))) class Endpoint(object): def __init__(self, sip_uri, user_agent, is_caller): if is_caller: self.name = 'caller' else: self.name = 'callee' self.sip_uri = sip_uri self.user_agent = user_agent self.tag = ''.join(random.sample(string.ascii_lowercase, 8)) self.connectors = [] self.media = [] self.cseq = 1 def set_media(self, media): assert(len(self.connectors) == 0) self.media = media for index, (media_type, port, direction, parameters) in enumerate(self.media): if port != 0: protocol = MediaReceiverProtocol(self, index) connector = reactor.listenUDP(port, protocol) else: connector = None self.connectors.append(connector) return DeferredList([connector.protocol.defer for connector in self.connectors if connector is not None]) def get_media(self, use_old_hold): if use_old_hold: ip = '0.0.0.0' else: ip = default_host_ip return [(media_type, ip, port, direction, parameters) for media_type, port, direction, parameters in self.media] def start_media(self, ip, ports, send_stun=False): for port, connector in zip(ports, self.connectors): if connector is not None: protocol = connector.protocol if port != 0: protocol.transport.connect(ip, port) protocol.loop = LoopingCall(protocol.transport.write, send_stun and stun_data or random_data) protocol.loop.start(random.uniform(0.5, 1)) else: protocol.defer.callback(None) def stop_media(self): defers = [] for connector in self.connectors: if connector is not None: if connector.protocol.loop is not None: connector.protocol.loop.stop() connector.protocol.loop = None defer = connector.stopListening() if defer is not None: defers.append(defer) self.connectors = [] if defers: return DeferredList(defers) else: return succeed(None) class Session(object): def __init__(self, caller, callee): self.caller = caller self.callee = callee self.call_id = ''.join(random.sample(string.ascii_letters, 24)) def _get_parties(self, party): party = getattr(self, party) if party is self.caller: other = self.callee else: other = self.caller return party, other def do_update(self, opensips, party, type, is_final, use_old_hold=False): party, other = self._get_parties(party) if type == 'request': from_tag = party.tag to_tag = other.tag from_uri = party.sip_uri to_uri = other.sip_uri cseq = party.cseq else: from_tag = other.tag to_tag = party.tag from_uri = other.sip_uri to_uri = party.sip_uri cseq = other.cseq if is_final: defer = opensips.update(call_id=self.call_id, from_tag=from_tag, to_tag=to_tag, from_uri=from_uri, to_uri=to_uri, cseq=cseq, user_agent=party.user_agent, media=party.get_media(use_old_hold), type=type, dialog_id='1234567890') else: defer = opensips.update(call_id=self.call_id, from_tag=from_tag, to_tag=to_tag, from_uri=from_uri, to_uri=to_uri, cseq=cseq, user_agent=party.user_agent, media=party.get_media(use_old_hold), type=type, dialog_id='1234567890') if is_final: if type == 'request': party.cseq += 1 else: other.cseq += 1 return defer def do_remove(self, opensips, party): party, other = self._get_parties(party) opensips.remove(call_id=self.call_id, from_tag=party.tag, to_tag=other.tag) def connect_to_dispatcher(): factory = OpenSIPSConnectorFactory() connector = reactor.connectUNIX(Config.socket, factory) return connector, factory.defer diff --git a/test/holdtest1.py b/test/holdtest1.py index ccdcc24..4278f5b 100755 --- a/test/holdtest1.py +++ b/test/holdtest1.py @@ -1,126 +1,128 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a session that starts with 1 audio stream, then gets put on hold by the caller for 5 minutes, the gets taken out of hold again. This test uses the newer 'sendonly' direction attribute to indicate hold status. """ from common import * def phase1(protocol, session): print('setting up audio stream') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = succeed(None) defer.addCallback(caller_update, protocol, session, media_defer, phase2) return defer def phase2(result, protocol, session): print('setting stream on hold') session.caller.set_media([('audio', 40000, 'sendonly', {})]) session.callee.set_media([('audio', 30000, 'recvonly', {})]) defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update_hold, protocol, session) return defer def callee_update_hold(result, protocol, session): print('updating hold for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(wait_hold, protocol, session) return defer def wait_hold(result, protocol, session): print('on hold, waiting 5 minutes...') defer = Deferred() defer.addCallback(stop_media_hold, protocol, session) reactor.callLater(300, defer.callback, None) return defer def stop_media_hold(result, protocol, session): print('stopping media for hold') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(phase3, protocol, session) return defer def phase3(result, protocol, session): print('continuing audio stream') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = succeed(None) defer.addCallback(caller_update, protocol, session, media_defer, kthxbye) return defer def caller_update(result, protocol, session, media_defer, do_after): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, media_defer, do_after) return defer def callee_update(callee_addr, protocol, session, media_defer, do_after): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, media_defer, do_after) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, media_defer, do_after): +def do_media(caller_addr, callee_addr, protocol, session, media_defer, do_after): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait, protocol, session, do_after) return media_defer def wait(result, protocol, session, do_after): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session, do_after) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session, do_after): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(do_after, protocol, session) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'caller') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) callee = Endpoint('Bob ', 'Callee UA', False) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(phase1, session) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/holdtest2.py b/test/holdtest2.py index 6f27731..0a2d2ea 100755 --- a/test/holdtest2.py +++ b/test/holdtest2.py @@ -1,126 +1,128 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a session that starts with 1 audio stream, then gets put on hold by the caller for 5 minutes, the gets taken out of hold again. This test uses the older 0.0.0.0 IP address to indicate hold status. """ from common import * def phase1(protocol, session): print('setting up audio stream') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = succeed(None) defer.addCallback(caller_update, protocol, session, media_defer, phase2) return defer def phase2(result, protocol, session): print('setting stream on hold') session.caller.set_media([('audio', 40000, 'sendrecv', {})]) session.callee.set_media([('audio', 30000, 'sendrecv', {})]) defer = session.do_update(protocol, 'caller', 'request', False, True) defer.addCallback(callee_update_hold, protocol, session) return defer def callee_update_hold(result, protocol, session): print('updating hold for callee') defer = session.do_update(protocol, 'callee', 'reply', True, True) defer.addCallback(wait_hold, protocol, session) return defer def wait_hold(result, protocol, session): print('on hold, waiting 5 minutes...') defer = Deferred() defer.addCallback(stop_media_hold, protocol, session) reactor.callLater(300, defer.callback, None) return defer def stop_media_hold(result, protocol, session): print('stopping media for hold') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(phase3, protocol, session) return defer def phase3(result, protocol, session): print('continuing audio stream') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = succeed(None) defer.addCallback(caller_update, protocol, session, media_defer, kthxbye) return defer def caller_update(result, protocol, session, media_defer, do_after): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, media_defer, do_after) return defer def callee_update(callee_addr, protocol, session, media_defer, do_after): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, media_defer, do_after) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, media_defer, do_after): +def do_media(caller_addr, callee_addr, protocol, session, media_defer, do_after): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait, protocol, session, do_after) return media_defer def wait(result, protocol, session, do_after): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session, do_after) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session, do_after): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(do_after, protocol, session) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'caller') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) callee = Endpoint('Bob ', 'Callee UA', False) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(phase1, session) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/holdtest3.py b/test/holdtest3.py index a0734fb..517b01a 100755 --- a/test/holdtest3.py +++ b/test/holdtest3.py @@ -1,103 +1,105 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a session that starts with 1 audio stream, then gets put on hold by the caller and stops without a BYE after 10 seconds. It is meant to test the on hold timeout. """ from common import * def phase1(protocol, session): print('setting up audio stream') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = succeed(None) defer.addCallback(caller_update, protocol, session, media_defer, phase2) return defer def phase2(result, protocol, session): print('setting stream on hold') session.caller.set_media([('audio', 40000, 'sendonly', {})]) session.callee.set_media([('audio', 30000, 'recvonly', {})]) defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update_hold, protocol, session) return defer def callee_update_hold(result, protocol, session): print('updating hold for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(wait_hold, protocol, session) return defer def wait_hold(result, protocol, session): print('on hold, waiting 10 seconds...') defer = Deferred() reactor.callLater(10, defer.callback, None) return defer def caller_update(result, protocol, session, media_defer, do_after): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, media_defer, do_after) return defer def callee_update(callee_addr, protocol, session, media_defer, do_after): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, media_defer, do_after) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, media_defer, do_after): +def do_media(caller_addr, callee_addr, protocol, session, media_defer, do_after): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait, protocol, session, do_after) return media_defer def wait(result, protocol, session, do_after): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session, do_after) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session, do_after): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(do_after, protocol, session) return defer def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) callee = Endpoint('Bob ', 'Callee UA', False) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(phase1, session) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/icetest1.py b/test/icetest1.py index 3815444..9e4193c 100755 --- a/test/icetest1.py +++ b/test/icetest1.py @@ -1,96 +1,98 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2009 AG Projects # """ This test simulates a call flow with ICE where the relay is NOT selected as a candidate: - The caller sends an INVITE - the callee sends a 200 OK - Both parties will send probing STUN requests for a few seconds - Both parties will stop the probes and not send media through the relay - After 4 minutes, the callee will send a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_stun, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_stun((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_stun(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting STUN probes for both parties') session.caller.start_media(caller_ip, caller_ports, send_stun=True) session.callee.start_media(callee_ip, callee_ports, send_stun=True) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait_stun, session, protocol) return defer def wait_stun(result, session, protocol): print('got STUN probes, waiting 3 seconds') defer = Deferred() defer.addCallback(stop_stun_caller, session, protocol) reactor.callLater(3, defer.callback, None) return defer def stop_stun_caller(result, session, protocol): print('stopping STUN probes for caller') defer = session.caller.stop_media() defer.addCallback(stop_stun_callee, session, protocol) return defer def stop_stun_callee(result, session, protocol): print('stopping STUN probes for callee') defer = session.callee.stop_media() defer.addCallback(wait_end, session, protocol) return defer def wait_end(result, session, protocol): print('media is flowing via a different path than the relay for 4 minutes') defer = Deferred() defer.addCallback(end, session, protocol) reactor.callLater(240, defer.callback, None) return defer def end(result, session, protocol): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {'ice': 'yes'})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {'ice': 'yes'})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/icetest2.py b/test/icetest2.py index 1e13d5d..d821652 100755 --- a/test/icetest2.py +++ b/test/icetest2.py @@ -1,115 +1,119 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2009 AG Projects # """ This test simulates a call flow with ICE where the relay is selected as a candidate: - The caller sends an INVITE - the callee sends a 200 OK - Both parties will send probing STUN requests for a few seconds - Both parties will stop the probes and start sending media through the relay (Note that a re-INVITE will be sent, this is due to a limitatin in the test framework) - After 5 seconds, the caller will send a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting STUN probes for both parties') session.caller.start_media(caller_ip, caller_ports, send_stun=True) session.callee.start_media(callee_ip, callee_ports, send_stun=True) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got STUN, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session): print('stopping STUN probes') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(change_callee, protocol, session) return defer def change_callee(result, protocol, session): print('sending new update for callee') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {'ice': 'yes'})]) callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {'ice': 'yes'})]) media_defer = DeferredList([caller_media, callee_media]) defer = session.do_update(protocol, 'callee', 'request', False) defer.addCallback(change_caller, protocol, session, media_defer) return defer -def change_caller((caller_ip, caller_ports), protocol, session, media_defer): +def change_caller(caller_addr, protocol, session, media_defer): + (caller_ip, caller_ports) = caller_addr print('sending new update for caller') defer = session.do_update(protocol, 'caller', 'reply', True) defer.addCallback(start_new_media, protocol, session, media_defer, caller_ip, caller_ports) return defer -def start_new_media((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports): +def start_new_media(callee_addr, protocol, session, media_defer, caller_ip, caller_ports): + (callee_ip, callee_ports) = callee_addr print('starting media') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait2, protocol, session) return media_defer def wait2(result, protocol, session): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'caller') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {'ice': 'yes'})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {'ice': 'yes'})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/multitest1.py b/test/multitest1.py index f1ffe13..6a378b5 100755 --- a/test/multitest1.py +++ b/test/multitest1.py @@ -1,74 +1,77 @@ -#!/usr/bin/python2 + +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a normal call flow: - The caller sends an INVITE - the callee sends a 200 OK - Both parties will start sending media - Media will flow for 5 seconds - The callee will send a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 30 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(30, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/multitest2.py b/test/multitest2.py index e29352b..34be241 100755 --- a/test/multitest2.py +++ b/test/multitest2.py @@ -1,74 +1,77 @@ -#!/usr/bin/python2 + +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a normal call flow: - The caller sends an INVITE - the callee sends a 200 OK - Both parties will start sending media - Media will flow for 5 seconds - The callee will send a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 35 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(35, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40001, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30001, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/multitest3.py b/test/multitest3.py index bb98f89..828227d 100755 --- a/test/multitest3.py +++ b/test/multitest3.py @@ -1,74 +1,76 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a normal call flow: - The caller sends an INVITE - the callee sends a 200 OK - Both parties will start sending media - Media will flow for 5 seconds - The callee will send a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 25 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(25, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40002, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30002, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/multitest4.py b/test/multitest4.py index d35752f..51d57cf 100755 --- a/test/multitest4.py +++ b/test/multitest4.py @@ -1,74 +1,76 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a normal call flow: - The caller sends an INVITE - the callee sends a 200 OK - Both parties will start sending media - Media will flow for 5 seconds - The callee will send a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 40 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(40, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40004, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30004, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/setuptest1.py b/test/setuptest1.py index 048fd9b..f1cd65b 100755 --- a/test/setuptest1.py +++ b/test/setuptest1.py @@ -1,39 +1,39 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test scenario simulates the caller sending an INVITE, nothing is received in return. The relay should discard the session after a while. """ from common import * def caller_update(protocol, session): print('doing update for caller') return session.do_update(protocol, 'caller', 'request', False) def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller.set_media([('audio', 40000, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee.set_media([('audio', 30000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/setuptest2.py b/test/setuptest2.py index 1cd1f7a..f590902 100755 --- a/test/setuptest2.py +++ b/test/setuptest2.py @@ -1,74 +1,76 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a normal call flow: - The caller sends an INVITE - the callee sends a 200 OK - Both parties will start sending media - Media will flow for 5 seconds - The callee will send a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/setuptest3.py b/test/setuptest3.py index 76934c6..ca52840 100755 --- a/test/setuptest3.py +++ b/test/setuptest3.py @@ -1,74 +1,76 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a normal call flow without a BYE: - The caller sends an INVITE - the callee sends a 200 OK - Both parties will start sending media - Media will flow for 5 seconds - Both parties will stop sending media """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = callee_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session): print('stopping media for callee') return session.callee.stop_media() def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/setuptest4.py b/test/setuptest4.py index d8c6228..f17784b 100755 --- a/test/setuptest4.py +++ b/test/setuptest4.py @@ -1,85 +1,88 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a normal call flow, with an added ACK confirming the SDP: - The caller sends an INVITE - the callee sends a 200 OK - Both parties will start sending media - the caller sends an ACK with SDP - Media will flow for 5 seconds - The callee will send a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', False) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(caller_ack, protocol, session, callee_ip, callee_ports) return defer def caller_ack(result, protocol, session, callee_ip, callee_ports): print('got media, doing ACK for caller') defer = session.do_update(protocol, 'caller', 'request', True) defer.addCallback(wait, protocol, session, callee_ip, callee_ports) return defer -def wait((callee_ack_ip, callee_ack_ports), protocol, session, callee_ip, callee_ports): +def wait(callee_ack_addr, protocol, session, callee_ip, callee_ports): + (callee_ack_ip, callee_ack_ports) = callee_ack_addr print('waiting 5 seconds') assert (callee_ack_ip == callee_ip) assert (callee_ack_ports == callee_ports) defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/setuptest5.py b/test/setuptest5.py index b82a708..e18382c 100755 --- a/test/setuptest5.py +++ b/test/setuptest5.py @@ -1,74 +1,76 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates call setup where no SDP is sent in the INVITE: - the callee sends a 200 OK - the caller sends a ACK with SDP - Both parties will start sending media - Media will flow for 5 seconds - The callee will send a BYE """ from common import * def callee_update(protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', False) defer.addCallback(caller_update, protocol, session, caller_media, callee_media) return defer def caller_update(caller_addr, protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', True) defer.addCallback(do_media, caller_addr, protocol, session, caller_media, callee_media) return defer -def do_media((callee_ip, callee_ports), (caller_ip, caller_ports), protocol, session, caller_media, callee_media): +def do_media(callee_addr, caller_addr, protocol, session, caller_media, callee_media): + (callee_ip, callee_ports) = callee_addr + (caller_ip, caller_ports) = caller_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(callee_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/setuptest6.py b/test/setuptest6.py index dea99c5..55acdbb 100755 --- a/test/setuptest6.py +++ b/test/setuptest6.py @@ -1,88 +1,90 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a session in which the caller proposes 3 streams in the INVITE and the callee rejects two of these. - The caller sends an INVITE with 1 video stream and 2 audio streams - the callee sends a 200 OK with the ports for two of the streams set to 0 - Both parties start sending media - Media flows for 5 seconds - The callee sends a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(change_caller1, callee_addr, protocol, session, caller_media, callee_media) return defer def change_caller1(caller_addr, callee_addr, protocol, session, caller_media, callee_media): print('stopping media for caller') defer = session.caller.stop_media() defer.addCallback(change_caller2, caller_addr, callee_addr, protocol, session, callee_media) return defer def change_caller2(result, caller_addr, callee_addr, protocol, session, callee_media): print('setting new media for caller') caller_media = caller.set_media([('audio', 0, 'sendrecv', {}), ('video', 0, 'sendrecv', {}), ('audio', 40020, 'sendrecv', {})]) return do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media) -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 40010, 'sendrecv', {}), ('audio', 40020, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 0, 'sendrecv', {}), ('video', 0, 'sendrecv', {}), ('audio', 30020, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/setuptest7.py b/test/setuptest7.py index fa7ccc5..b1b242e 100755 --- a/test/setuptest7.py +++ b/test/setuptest7.py @@ -1,72 +1,74 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a normal call flow: - The caller sends an INVITE with a media stream with port=0 - The callee sends a 200 OK - The callee will send a BYE after 5 seconds """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'callee') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 0, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 40000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/updatetest1.py b/test/updatetest1.py index 344dd05..4000ff1 100755 --- a/test/updatetest1.py +++ b/test/updatetest1.py @@ -1,110 +1,113 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a call setup with an updated reply from the callee: - The caller sends an INVITE - The callee replies with .e.g a 183 - Both parties start sending media - Media flows for 5 seconds - Media stops - The callee sends a 200 OK with a new port - Media flows again for 5 seconds - The caller sends a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', False) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session, callee_ip, callee_ports) return defer def wait(result, protocol, session, callee_ip, callee_ports): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session, callee_ip, callee_ports) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session, callee_ip, callee_ports): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(change_callee, protocol, session, callee_ip, callee_ports) return defer def change_callee(result, protocol, session, callee_ip, callee_ports): print('sending new update for callee') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30020, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(start_new_media, protocol, session, media_defer, callee_ip, callee_ports) return defer -def start_new_media((caller_ip, caller_ports), protocol, session, media_defer, callee_ip, callee_ports): +def start_new_media(caller_addr, protocol, session, media_defer, callee_ip, callee_ports): + (caller_ip, caller_ports) = caller_addr print('starting new media') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait2, protocol, session) return media_defer def wait2(result, protocol, session): print('got new media, waiting 5 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'caller') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/updatetest2.py b/test/updatetest2.py index edc1671..b38b0d8 100755 --- a/test/updatetest2.py +++ b/test/updatetest2.py @@ -1,116 +1,120 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a session with audio and video media flowing, after which the callee removes the video stream and only audio flows: - caller sends INVITE, callee sends 200 ok - audio and video media flows for 5 seconds - callee proposes to keep only the audio stream using a re-INVITE, caller sends OK - audio media flows for 5 seconds - caller sends BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(change_callee, protocol, session) return defer def change_callee(result, protocol, session): print('sending new update for callee') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = session.do_update(protocol, 'callee', 'request', False) defer.addCallback(change_caller, protocol, session, media_defer) return defer -def change_caller((caller_ip, caller_ports), protocol, session, media_defer): +def change_caller(caller_addr, protocol, session, media_defer): + (caller_ip, caller_ports) = caller_addr print('sending new update for caller') defer = session.do_update(protocol, 'caller', 'reply', True) defer.addCallback(start_new_media, protocol, session, media_defer, caller_ip, caller_ports) return defer -def start_new_media((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports): +def start_new_media(callee_addr, protocol, session, media_defer, caller_ip, caller_ports): + (callee_ip, callee_ports) = callee_addr print('starting new media') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait2, protocol, session) return media_defer def wait2(result, protocol, session): print('got new media, waiting 5 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'caller') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 40010, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {}), ('video', 30010, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/updatetest3.py b/test/updatetest3.py index 54fa48e..66831fc 100755 --- a/test/updatetest3.py +++ b/test/updatetest3.py @@ -1,104 +1,106 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a session that starts with only video, then two audio streams are added and finally only one of the audio streams remains. """ from common import * def phase1(protocol, session): print('setting up 1 video stream') caller_media = session.caller.set_media([('video', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('video', 30000, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = succeed(None) defer.addCallback(caller_update, protocol, session, media_defer, phase2) return defer def phase2(result, protocol, session): print('adding 2 audio streams') caller_media = session.caller.set_media([('video', 40000, 'sendrecv', {}), ('audio', 40010, 'sendrecv', {}), ('audio', 40020, 'sendrecv', {})]) callee_media = session.callee.set_media([('video', 30000, 'sendrecv', {}), ('audio', 30010, 'sendrecv', {}), ('audio', 30020, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = succeed(None) defer.addCallback(caller_update, protocol, session, media_defer, phase3) return defer def phase3(result, protocol, session): print('removing 1 video and 1 audio stream') caller_media = session.caller.set_media([('audio', 40020, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30020, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = succeed(None) defer.addCallback(caller_update, protocol, session, media_defer, kthxbye) return defer def caller_update(result, protocol, session, media_defer, do_after): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, media_defer, do_after) return defer def callee_update(callee_addr, protocol, session, media_defer, do_after): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, media_defer, do_after) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, media_defer, do_after): +def do_media(caller_addr, callee_addr, protocol, session, media_defer, do_after): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait, protocol, session, do_after) return media_defer def wait(result, protocol, session, do_after): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session, do_after) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session, do_after): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(do_after, protocol, session) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'caller') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) callee = Endpoint('Bob ', 'Callee UA', False) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(phase1, session) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/updatetest4.py b/test/updatetest4.py index a258450..7a44e87 100755 --- a/test/updatetest4.py +++ b/test/updatetest4.py @@ -1,115 +1,119 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a session with audio media flowing, after which the callee changes the port of the media, e.g. through an UPDATE: - caller sends INVITE, callee sends 200 ok - audio and video media flows for 5 seconds - callee changes the port of the audio stream through an UPATE or re-INVITE - audio media flows for 5 seconds - caller sends BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(change_callee, protocol, session) return defer def change_callee(result, protocol, session): print('sending new update for callee') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30010, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = session.do_update(protocol, 'callee', 'request', False) defer.addCallback(change_caller, protocol, session, media_defer) return defer -def change_caller((caller_ip, caller_ports), protocol, session, media_defer): +def change_caller(caller_addr, protocol, session, media_defer): + (caller_ip, caller_ports) = caller_addr print('sending new update for caller') defer = session.do_update(protocol, 'caller', 'reply', True) defer.addCallback(start_new_media, protocol, session, media_defer, caller_ip, caller_ports) return defer -def start_new_media((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports): +def start_new_media(callee_addr, protocol, session, media_defer, caller_ip, caller_ports): + (callee_ip, callee_ports) = callee_addr print('starting new media') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait2, protocol, session) return media_defer def wait2(result, protocol, session): print('got new media, waiting 5 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'caller') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/updatetest5.py b/test/updatetest5.py index 34a2482..28cc790 100755 --- a/test/updatetest5.py +++ b/test/updatetest5.py @@ -1,148 +1,152 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a call setup with an updated reply from the callee: - The caller sends an INVITE - The callee replies with a provisional response containg SDP e.g. 183 - Both parties start sending media - Media flows for 5 seconds - Media stops - The callee sends another 183 with new port and to-tag (e.g. when the first PSTN gateway failed) - Both parties start sending media - Media flows for 5 seconds - Media stops - The callee sends a 200 OK with a new port - Media flows again for 5 seconds - The caller sends a BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', False) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session, callee_ip, callee_ports) return defer def wait(result, protocol, session, callee_ip, callee_ports): print('got media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session, callee_ip, callee_ports) reactor.callLater(5, defer.callback, None) return defer def stop_media(result, protocol, session, callee_ip, callee_ports): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(change_callee_prov, protocol, session, callee_ip, callee_ports) return defer def change_callee_prov(result, protocol, session, callee_ip, callee_ports): print('sending new provisional update for callee') session.callee.tag = 'newtotag' caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30010, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = session.do_update(protocol, 'callee', 'reply', False) defer.addCallback(start_new_media_prov, protocol, session, media_defer, callee_ip, callee_ports) return defer -def start_new_media_prov((caller_ip, caller_ports), protocol, session, media_defer, callee_ip, callee_ports): +def start_new_media_prov(caller_addr, protocol, session, media_defer, callee_ip, callee_ports): + (caller_ip, caller_ports) = caller_addr print('starting new media') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait2, protocol, session, callee_ip, callee_ports) return media_defer def wait2(result, protocol, session, callee_ip, callee_ports): print('got new media, waiting 5 seconds') defer = Deferred() defer.addCallback(stop_media_prov, protocol, session, callee_ip, callee_ports) reactor.callLater(5, defer.callback, None) return defer def stop_media_prov(result, protocol, session, callee_ip, callee_ports): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(change_callee, protocol, session, callee_ip, callee_ports) return defer def change_callee(result, protocol, session, callee_ip, callee_ports): print('sending new update for callee') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30020, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(start_new_media, protocol, session, media_defer, callee_ip, callee_ports) return defer -def start_new_media((caller_ip, caller_ports), protocol, session, media_defer, callee_ip, callee_ports): +def start_new_media(caller_addr, protocol, session, media_defer, callee_ip, callee_ports): + (caller_ip, caller_ports) = caller_addr print('starting new media') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait3, protocol, session) return media_defer def wait3(result, protocol, session): print('got new media, waiting 5 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(5, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'caller') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run() diff --git a/test/updatetest6.py b/test/updatetest6.py index 8ab6efe..9dff1c9 100755 --- a/test/updatetest6.py +++ b/test/updatetest6.py @@ -1,160 +1,166 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 # Copyright (C) 2008 AG Projects # """ This test simulates a session with audio and video media flowing, after which the callee removes the video stream and only audio flows. After a while, the video stream is introduced back and both audio and video flow for a while: - caller sends INVITE, callee sends 200 ok - audio and video media flows for 15 seconds - callee proposes to keep only the audio stream using a re-INVITE, caller sends OK - audio media flows for 15 seconds - callee proposes to reintroduce a video stream using a re-INVITE, caller sends OK - audio and video media flows for 15 seconds - caller sends BYE """ from common import * def caller_update(protocol, session, caller_media, callee_media): print('doing update for caller') defer = session.do_update(protocol, 'caller', 'request', False) defer.addCallback(callee_update, protocol, session, caller_media, callee_media) return defer def callee_update(callee_addr, protocol, session, caller_media, callee_media): print('doing update for callee') defer = session.do_update(protocol, 'callee', 'reply', True) defer.addCallback(do_media, callee_addr, protocol, session, caller_media, callee_media) return defer -def do_media((caller_ip, caller_ports), (callee_ip, callee_ports), protocol, session, caller_media, callee_media): +def do_media(caller_addr, callee_addr, protocol, session, caller_media, callee_media): + (caller_ip, caller_ports) = caller_addr + (callee_ip, callee_ports) = callee_addr print('starting media for both parties') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) defer = DeferredList([caller_media, callee_media]) defer.addCallback(wait, protocol, session) return defer def wait(result, protocol, session): print('got media, waiting 15 seconds') defer = Deferred() defer.addCallback(stop_media, protocol, session) reactor.callLater(15, defer.callback, None) return defer def stop_media(result, protocol, session): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(change_callee, protocol, session) return defer def change_callee(result, protocol, session): print('sending new update for callee') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 0, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {}), ('video', 0, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = session.do_update(protocol, 'callee', 'request', False) defer.addCallback(change_caller, protocol, session, media_defer) return defer -def change_caller((caller_ip, caller_ports), protocol, session, media_defer): +def change_caller(caller_addr, protocol, session, media_defer): + (caller_ip, caller_ports) = caller_addr print('sending new update for caller') defer = session.do_update(protocol, 'caller', 'reply', True) defer.addCallback(start_new_media, protocol, session, media_defer, caller_ip, caller_ports) return defer -def start_new_media((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports): +def start_new_media(callee_addr, protocol, session, media_defer, caller_ip, caller_ports): + (callee_ip, callee_ports) = calee_addr print('starting new media') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait2, protocol, session) return media_defer def wait2(result, protocol, session): print('got new media, waiting 15 seconds') defer = Deferred() defer.addCallback(stop_media2, protocol, session) reactor.callLater(15, defer.callback, None) return defer def stop_media2(result, protocol, session): print('stopping media') defer = DeferredList([session.caller.stop_media(), session.callee.stop_media()]) defer.addCallback(change_callee2, protocol, session) return defer def change_callee2(result, protocol, session): print('sending new update for callee') caller_media = session.caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 40010, 'sendrecv', {})]) callee_media = session.callee.set_media([('audio', 30000, 'sendrecv', {}), ('video', 30010, 'sendrecv', {})]) media_defer = DeferredList([caller_media, callee_media]) defer = session.do_update(protocol, 'callee', 'request', False) defer.addCallback(change_caller2, protocol, session, media_defer) return defer -def change_caller2((caller_ip, caller_ports), protocol, session, media_defer): +def change_caller2(caller_addr, protocol, session, media_defer): + (caller_ip, caller_ports) = caller_addr print('sending new update for caller') defer = session.do_update(protocol, 'caller', 'reply', True) defer.addCallback(start_new_media2, protocol, session, media_defer, caller_ip, caller_ports) return defer -def start_new_media2((callee_ip, callee_ports), protocol, session, media_defer, caller_ip, caller_ports): +def start_new_media2(callee_addr, protocol, session, media_defer, caller_ip, caller_ports): + (callee_ip, callee_ports) = callee_addr print('starting new media') session.caller.start_media(caller_ip, caller_ports) session.callee.start_media(callee_ip, callee_ports) media_defer.addCallback(wait3, protocol, session) return media_defer def wait3(result, protocol, session): print('got new media, waiting 15 seconds') defer = Deferred() defer.addCallback(kthxbye, protocol, session) reactor.callLater(15, defer.callback, None) return defer def kthxbye(result, protocol, session): print('sending remove') return session.do_remove(protocol, 'caller') def disconnect(result, connector): print('disconnecting') connector.disconnect() reactor.callLater(1, reactor.stop) def catch_all_err(failure): print(failure) if __name__ == '__main__': caller = Endpoint('Alice ', 'Caller UA', True) caller_media = caller.set_media([('audio', 40000, 'sendrecv', {}), ('video', 40010, 'sendrecv', {})]) callee = Endpoint('Bob ', 'Callee UA', False) callee_media = callee.set_media([('audio', 30000, 'sendrecv', {}), ('video', 30010, 'sendrecv', {})]) session = Session(caller, callee) connector, defer = connect_to_dispatcher() defer.addCallback(caller_update, session, caller_media, callee_media) defer.addCallback(disconnect, connector) defer.addErrback(catch_all_err) reactor.run()