diff --git a/media-dispatcher b/media-dispatcher index c774e74..fb1d7ed 100644 --- a/media-dispatcher +++ b/media-dispatcher @@ -1,64 +1,64 @@ #!/usr/bin/env python """MediaProxy Dispatcher component""" if __name__ == "__main__": import sys from optparse import OptionParser from application.process import process, ProcessError from application.configuration import ConfigFile, datatypes from application import log import mediaproxy name = "media-dispatcher" fullname = "MediaProxy Dispatcher" description = "MediaProxy Dispatcher component" default_pid = mediaproxy.runtime_directory + '/dispatcher.pid' parser = OptionParser(version="%%prog %s" % mediaproxy.__version__) parser.add_option("--no-fork", action="store_false", dest="fork", default=1, help="run the process in the foreground (for debugging)") parser.add_option("--pid", dest="pid_file", default=default_pid, help="pid file (%s)" % default_pid, metavar="File") (options, args) = parser.parse_args() pid_file = options.pid_file - process.system_config_directory = mediaproxy.system_config_directory + process.system_config_directory = mediaproxy.config_directory config_file = ConfigFile(mediaproxy.configuration_filename) log.level.current = config_file.get_setting("Dispatcher", 'log_level', type=datatypes.LogLevel, default=log.level.DEBUG) try: process.runtime_directory = mediaproxy.runtime_directory except ProcessError, e: log.fatal("Cannot start %s: %s" % (fullname, e)) sys.exit(1) if options.fork: try: process.daemonize(pid_file) except ProcessError, e: log.fatal("Cannot start %s: %s" % (fullname, e)) sys.exit(1) log.start_syslog(name) log.msg("Starting %s %s" % (fullname, mediaproxy.__version__)) from mediaproxy.dispatcher import Dispatcher if not options.fork: from application.debug.memory import memory_dump try: dispatcher = Dispatcher() except Exception, e: log.fatal("failed to create %s: %s" % (fullname, e)) if e.__class__ is not RuntimeError: log.err() sys.exit(1) dispatcher.run() if not options.fork: #from application.debug.memory import memory_dump memory_dump() diff --git a/media-relay b/media-relay index b64b8e3..1c79472 100644 --- a/media-relay +++ b/media-relay @@ -1,108 +1,108 @@ #!/usr/bin/env python """MediaProxy Relay component""" from __future__ import with_statement if __name__ == "__main__": import errno import sys import subprocess from optparse import OptionParser from application import log from application.configuration import ConfigFile, datatypes from application.process import process, ProcessError from application.version import Version import mediaproxy IP_FORWARD_FILE = "/proc/sys/net/ipv4/ip_forward" CONNTRACK_ACCT_FILE = "/proc/sys/net/netfilter/nf_conntrack_acct" KERNEL_VERSION_FILE = "/proc/sys/kernel/osrelease" name = "media-relay" fullname = "MediaProxy Relay" description = "MediaProxy Relay component" default_pid = mediaproxy.runtime_directory + '/relay.pid' parser = OptionParser(version="%%prog %s" % mediaproxy.__version__) parser.add_option("--no-fork", action="store_false", dest="fork", default=1, help="run the process in the foreground (for debugging)") parser.add_option("--pid", dest="pid_file", default=default_pid, help="pid file (%s)" % default_pid, metavar="File") (options, args) = parser.parse_args() if not sys.platform.startswith('linux'): log.fatal("Cannot start %s. A Linux host is required for operation." % fullname) sys.exit(1) try: subprocess.call(['modprobe', 'ip_tables'], env={'PATH': '/usr/sbin:/sbin:/usr/bin:/bin'}) except OSError, e: log.fatal("Cannot start %s: failed to load the ip_tables kernel module: %s" % (fullname, e)) sys.exit(1) try: kernel_version = Version.parse(open(KERNEL_VERSION_FILE).read().strip()) except (OSError, IOError, ValueError): log.fatal("Could not determine Linux kernel version") sys.exit(1) if kernel_version < Version(2, 6, 18): log.fatal("Linux kernel version 2.6.18 or newer is required to run the media relay") sys.exit(1) try: ip_forward = bool(int(open(IP_FORWARD_FILE).read())) except (OSError, IOError, ValueError): ip_forward = False if not ip_forward: log.fatal("IP forwarding is not available or not enabled (check %s)" % IP_FORWARD_FILE) sys.exit(1) try: with open(CONNTRACK_ACCT_FILE, 'w') as acct_file: acct_file.write("1") except (IOError, OSError), e: if e.errno != errno.ENOENT: log.fatal("Could not enable conntrack rule counters (check %s): %s" % (CONNTRACK_ACCT_FILE, e)) sys.exit(1) pid_file = options.pid_file - process.system_config_directory = mediaproxy.system_config_directory + process.system_config_directory = mediaproxy.config_directory config_file = ConfigFile(mediaproxy.configuration_filename) log.level.current = config_file.get_setting("Relay", 'log_level', type=datatypes.LogLevel, default=log.level.DEBUG) try: process.runtime_directory = mediaproxy.runtime_directory except ProcessError, e: log.fatal("Cannot start %s: %s" % (fullname, e)) sys.exit(1) if options.fork: try: process.daemonize(pid_file) except ProcessError, e: log.fatal("Cannot start %s: %s" % (fullname, e)) sys.exit(1) log.start_syslog(name) log.msg("Starting %s %s" % (fullname, mediaproxy.__version__)) try: from mediaproxy.relay import MediaRelay if not options.fork: from application.debug.memory import memory_dump relay = MediaRelay() except Exception, e: log.fatal("failed to create %s: %s" % (fullname, e)) if e.__class__ is not RuntimeError: log.err() sys.exit(1) relay.run() if not options.fork: #from application.debug.memory import memory_dump memory_dump() diff --git a/mediaproxy/__init__.py b/mediaproxy/__init__.py index 7182d30..b6ef332 100644 --- a/mediaproxy/__init__.py +++ b/mediaproxy/__init__.py @@ -1,10 +1,8 @@ """Mediaproxy implements a media relay for SIP calls""" __version__ = "2.6.6" -system_config_directory = '/etc/mediaproxy' +config_directory = '/etc/mediaproxy' runtime_directory = '/var/run/mediaproxy' - configuration_file = 'config.ini' - diff --git a/test/common.py b/test/common.py index a2ede75..d51d750 100644 --- a/test/common.py +++ b/test/common.py @@ -1,214 +1,214 @@ # Copyright (C) 2008 AG Projects # import sys sys.path.append(".") sys.path.append("..") import os import random import string import struct import mediaproxy from application.system import host from application.configuration import * from application.process import process -process.system_config_directory = mediaproxy.system_config_directory +process.system_config_directory = mediaproxy.config_directory from twisted.internet.protocol import DatagramProtocol, ClientFactory from twisted.protocols.basic import LineOnlyReceiver from twisted.internet.task import LoopingCall from twisted.internet.defer import Deferred, DeferredList, succeed from twisted.internet import reactor from mediaproxy.headers import EncodingDict class Config(ConfigSection): __cfgfile__ = mediaproxy.configuration_file __section__ = 'Dispatcher' socket = "/var/run/mediaproxy/dispatcher.sock" random_data = os.urandom(512) stun_data = struct.pack("!HHIIII", 0x0001, 0, 0x2112A442, 0, 0, 0) default_host_ip = host.default_ip class OpenSIPSControlClientProtocol(LineOnlyReceiver): def __init__(self): self.defer = None def lineReceived(self, line): if line == "error": print "got error from dispatcher!" reactor.stop() elif self.defer is not None: print "got ip/ports from dispatcher: %s" % line ip, ports = line.split(" ", 1) defer = self.defer self.defer = None defer.callback((ip, [int(i) for i in ports.split()])) else: print "got reply from dispatcher: %s" % line defer = self.defer self.defer = None defer.callback(line) def _send_command(self, command, headers): self.defer = Deferred() data = "\r\n".join([command] + ["%s: %s" % item for item in headers.iteritems()] + ["", ""]) #print "writing on socket:\n%s" % data self.transport.write(data) return self.defer def update(self, **kw_args): return self._send_command("update", EncodingDict(kw_args)) def remove(self, **kw_args): return self._send_command("remove", EncodingDict(kw_args)) class OpenSIPSConnectorFactory(ClientFactory): protocol = OpenSIPSControlClientProtocol def __init__(self): self.defer = Deferred() def buildProtocol(self, addr): prot = ClientFactory.buildProtocol(self, addr) reactor.callLater(0, self.defer.callback, prot) return prot class MediaReceiverProtocol(DatagramProtocol): def __init__(self, endpoint, index): self.endpoint = endpoint self.index = index self.loop = None self.received_media = False self.defer = Deferred() def datagramReceived(self, data, (host, port)): if not self.received_media: self.received_media = True print "received media %d for %s from %s:%d" % (self.index, self.endpoint.name, host, port) self.defer.callback(None) def connectionRefused(self): print "connection refused for media %d for %s" % (self.index, self.endpoint.name) class Endpoint(object): def __init__(self, sip_uri, user_agent, is_caller): if is_caller: self.name = "caller" else: self.name = "callee" self.sip_uri = sip_uri self.user_agent = user_agent self.tag = "".join(random.sample(string.ascii_lowercase, 8)) self.connectors = [] self.media = [] self.cseq = 1 def set_media(self, media): assert(len(self.connectors) == 0) self.media = media for index, (media_type, port, direction, parameters) in enumerate(self.media): if port != 0: protocol = MediaReceiverProtocol(self, index) connector = reactor.listenUDP(port, protocol) else: connector = None self.connectors.append(connector) return DeferredList([connector.protocol.defer for connector in self.connectors if connector is not None]) def get_media(self, use_old_hold): if use_old_hold: ip = "0.0.0.0" else: ip = default_host_ip return [(media_type, ip, port, direction, parameters) for media_type, port, direction, parameters in self.media] def start_media(self, ip, ports, send_stun=False): for port, connector in zip(ports, self.connectors): if connector is not None: protocol = connector.protocol if port != 0: protocol.transport.connect(ip, port) protocol.loop = LoopingCall(protocol.transport.write, send_stun and stun_data or random_data) protocol.loop.start(random.uniform(0.5, 1)) else: protocol.defer.callback(None) def stop_media(self): defers = [] for connector in self.connectors: if connector is not None: if connector.protocol.loop is not None: connector.protocol.loop.stop() connector.protocol.loop = None defer = connector.stopListening() if defer is not None: defers.append(defer) self.connectors = [] if defers: return DeferredList(defers) else: return succeed(None) class Session(object): def __init__(self, caller, callee): self.caller = caller self.callee = callee self.call_id = "".join(random.sample(string.ascii_letters, 24)) def _get_parties(self, party): party = getattr(self, party) if party is self.caller: other = self.callee else: other = self.caller return party, other def do_update(self, opensips, party, type, is_final, use_old_hold=False): party, other = self._get_parties(party) if type == "request": from_tag = party.tag to_tag = other.tag from_uri = party.sip_uri to_uri = other.sip_uri cseq = party.cseq else: from_tag = other.tag to_tag = party.tag from_uri = other.sip_uri to_uri = party.sip_uri cseq = other.cseq if is_final: defer = opensips.update(call_id = self.call_id, from_tag = from_tag, to_tag = to_tag, from_uri = from_uri, to_uri = to_uri, cseq = cseq, user_agent = party.user_agent, media = party.get_media(use_old_hold), type = type, dialog_id = "12345:67890") else: defer = opensips.update(call_id = self.call_id, from_tag = from_tag, to_tag = to_tag, from_uri = from_uri, to_uri = to_uri, cseq = cseq, user_agent = party.user_agent, media = party.get_media(use_old_hold), type = type, dialog_id = "12345:67890") if is_final: if type == "request": party.cseq += 1 else: other.cseq += 1 return defer def do_remove(self, opensips, party): party, other = self._get_parties(party) opensips.remove(call_id = self.call_id, from_tag = party.tag, to_tag = other.tag) def connect_to_dispatcher(): factory = OpenSIPSConnectorFactory() connector = reactor.connectUNIX(Config.socket, factory) return connector, factory.defer