diff --git a/debian/control b/debian/control index 3074971..2b5486e 100644 --- a/debian/control +++ b/debian/control @@ -1,16 +1,16 @@ Source: msrprelay Section: net Priority: optional Maintainer: Dan Pascu Uploaders: Adrian Georgescu Build-Depends: debhelper (>= 9), python(>= 2.7) Standards-Version: 3.9.8 Package: msrprelay Architecture: all -Depends: ${python:Depends}, ${misc:Depends}, lsb-base, python-application (>= 1.2.8), python-gnutls (>= 3.0.0), python-twisted-core (>= 2.5.0), python-twisted-names, python-sqlobject (>= 0.10.2) +Depends: ${python:Depends}, ${misc:Depends}, lsb-base, python-application (>= 2.8.0), python-gnutls (>= 3.0.0), python-twisted-core (>= 2.5.0), python-twisted-names, python-sqlobject (>= 0.10.2) Description: MSRP Relay, a RFC 4976 compatible IM/File transfer relay This software implements an MSRP relay, which is an extension to the MSRP protocol (RFC 4975). Its main role is to help NAT traversal of Interactive Messaging and file transfer sessions for SIP/MSRP endpoints located behind NAT. diff --git a/msrp/__init__.py b/msrp/__init__.py index 3b155de..d06f30e 100644 --- a/msrp/__init__.py +++ b/msrp/__init__.py @@ -1,7 +1,4 @@ -__version__ = "1.2.2" +__version__ = '1.2.2' -runtime_directory = "/var/run/msrprelay" -system_config_directory = "/etc/msrprelay" - -configuration_filename = "config.ini" +configuration_file = 'config.ini' diff --git a/msrp/backend/database.py b/msrp/backend/database.py index 28babb4..a211e44 100644 --- a/msrp/backend/database.py +++ b/msrp/backend/database.py @@ -1,53 +1,53 @@ from sqlobject import sqlhub, connectionForURI, SQLObject, StringCol from sqlobject.dberrors import Error as SQLObjectError from application.configuration import * from twisted.internet.threads import deferToThread from msrp.digest import LoginFailed -from msrp import configuration_filename +from msrp import configuration_file class Config(ConfigSection): - __cfgfile__ = configuration_filename + __cfgfile__ = configuration_file __section__ = 'Database' cleartext_passwords = True uri = "mysql://user:pass@db/opensips" subscriber_table = "subscriber" username_col = "username" domain_col = "domain" password_col = "password" ha1_col = "ha1" class Subscribers(SQLObject): class sqlmeta: table = Config.subscriber_table username = StringCol(dbName = Config.username_col) domain = StringCol(dbName = Config.domain_col) password = StringCol(dbName = Config.password_col) ha1 = StringCol(dbName = Config.ha1_col) sqlhub.processConnection = connectionForURI(Config.uri) class Checker(object): def __init__(self): self.cleartext_passwords = Config.cleartext_passwords def _retrieve(self, col, username, domain): try: subscriber = Subscribers.selectBy(username=username, domain=domain)[0] except IndexError: raise LoginFailed("Username not found") except SQLObjectError: raise LoginFailed("Database error") return getattr(subscriber, col) def retrieve_password(self, username, domain): return deferToThread(self._retrieve, "password", username, domain) def retrieve_ha1(self, username, domain): return deferToThread(self._retrieve, "ha1", username, domain) diff --git a/msrp/backend/memory.py b/msrp/backend/memory.py index 3843218..f59ea42 100644 --- a/msrp/backend/memory.py +++ b/msrp/backend/memory.py @@ -1,23 +1,23 @@ from application.configuration import * from twisted.internet.defer import succeed, fail from msrp.digest import LoginFailed -from msrp import configuration_filename +from msrp import configuration_file -config = ConfigFile(configuration_filename) +config = ConfigFile(configuration_file) config.cleartext_passwords = True config.user_db = dict(config.get_section("Memory", default=[])) class Checker(object): def __init__(self): self.cleartext_passwords = config.cleartext_passwords def retrieve_password(self, username, domain): key = "%s@%s" % (username, domain) if key in config.user_db: return succeed(config.user_db[key]) else: return fail(LoginFailed("Username not found")) diff --git a/msrp/backend/sipthor.py b/msrp/backend/sipthor.py index 72efb06..7e4a58b 100644 --- a/msrp/backend/sipthor.py +++ b/msrp/backend/sipthor.py @@ -1,129 +1,129 @@ import cjson import signal from application import log from application.configuration import ConfigSection, ConfigSetting from application.python.types import Singleton from application.system import host from application.process import process from gnutls.interfaces.twisted import TLSContext, X509Credentials from sqlobject import sqlhub, connectionForURI, SQLObject, StringCol, BLOBCol from sqlobject.dberrors import Error as SQLObjectError from twisted.internet.threads import deferToThread -from msrp import configuration_filename, __version__ +from msrp import configuration_file, __version__ from msrp.digest import LoginFailed from msrp.tls import Certificate, PrivateKey from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from thor.tls import X509NameValidator class Config(ConfigSection): - __cfgfile__ = configuration_filename + __cfgfile__ = configuration_file __section__ = 'SIPThor' cleartext_passwords = True uri = "mysql://user:pass@db/sipthor" subscriber_table = "sip_accounts" username_col = "username" domain_col = "domain" profile_col = "profile" multiply = 1000 certificate = ConfigSetting(type=Certificate, value=None) private_key = ConfigSetting(type=PrivateKey, value=None) ca = ConfigSetting(type=Certificate, value=None) class ThorNetworkConfig(ConfigSection): - __cfgfile__ = configuration_filename + __cfgfile__ = configuration_file __section__ = 'ThorNetwork' domain = "sipthor-domain" passport = X509NameValidator('O:undefined, OU:undefined') class Subscribers(SQLObject): class sqlmeta: table = Config.subscriber_table username = StringCol(dbName=Config.username_col) domain = StringCol(dbName=Config.domain_col) profile = BLOBCol(dbName=Config.profile_col) sqlhub.processConnection = connectionForURI(Config.uri) class ThorNetworkService(EventServiceClient): __metaclass__ = Singleton topics = ["Thor.Members"] def __init__(self): self.node = ThorEntity(host.default_ip, ['msrprelay_server'], version=__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(Config.certificate, Config.private_key, [Config.ca]) credentials.verify_peer = True tls_context = TLSContext(credentials) EventServiceClient.__init__(self, ThorNetworkConfig.domain, tls_context) process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) def handle_event(self, event): # print "Received event: %s" % event networks = self.networks role_map = ThorEntitiesRoleMap(event.message) # mapping between role names and lists of nodes with that role for role in ["msrprelay_server"]: try: network = networks[role] # avoid setdefault here because it always evaluates the 2nd argument except KeyError: from thor import network as thor_network network = thor_network.new(Config.multiply) networks[role] = network new_nodes = set([node.ip for node in role_map.get(role, [])]) old_nodes = set(network.nodes) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) plural = len(removed_nodes) != 1 and 's' or '' log.info('removed %s node%s: %s', role, plural, ', '.join(removed_nodes)) if added_nodes: for node in added_nodes: network.add_node(node) plural = len(added_nodes) != 1 and 's' or '' log.info('added %s node%s: %s', role, plural, ', '.join(added_nodes)) # print "Thor %s nodes: %s" % (role, str(network.nodes)) class Checker(object): def __init__(self): self.cleartext_passwords = Config.cleartext_passwords self._thor_service = ThorNetworkService() def _retrieve(self, col, username, domain): try: subscriber = Subscribers.selectBy(username=username, domain=domain)[0] except IndexError: raise LoginFailed("Username not found") except SQLObjectError: raise LoginFailed("Database error") try: profile = cjson.decode(subscriber.profile) except cjson.DecodeError: raise LoginFailed("Database JSON error") try: return profile[col] except KeyError: raise LoginFailed("Database profile error") def retrieve_password(self, username, domain): return deferToThread(self._retrieve, "password", username, domain) def retrieve_ha1(self, username, domain): return deferToThread(self._retrieve, "ha1", username, domain) diff --git a/msrp/relay.py b/msrp/relay.py index a134fe2..8be6c25 100644 --- a/msrp/relay.py +++ b/msrp/relay.py @@ -1,784 +1,784 @@ import weakref from copy import copy from collections import deque from application import log from application.configuration import * from application.configuration.datatypes import NetworkAddress, LogLevel from application.python.types import Singleton from application.system import host from zope.interface import implements from twisted.internet.defer import maybeDeferred from twisted.internet import reactor from twisted.internet.protocol import Factory, ClientFactory from twisted.internet.interfaces import IPullProducer from gnutls.interfaces.twisted import TLSContext, X509Credentials from msrp.tls import Certificate, PrivateKey from msrp.protocol import * from msrp.digest import AuthChallenger, LoginFailed from msrp.responses import * -from msrp import configuration_filename +from msrp import configuration_file class RelayConfig(ConfigSection): - __cfgfile__ = configuration_filename + __cfgfile__ = configuration_file __section__ = 'Relay' address = ConfigSetting(type=NetworkAddress, value=NetworkAddress("0.0.0.0:2855")) hostname = "" default_domain = "" allow_other_methods = True session_expiration_time_minimum = 60 session_expiration_time_default = 600 session_expiration_time_maximum = 3600 auth_challenge_expiration_time = 15 backend = "database" max_auth_attempts = 3 debug_notls = False log_failed_auth = False certificate = ConfigSetting(type=Certificate, value=None) key = ConfigSetting(type=PrivateKey, value=None) log_level = ConfigSetting(type=LogLevel, value=log.level.DEBUG) class Relay(object): __metaclass__ = Singleton def __init__(self): self.unbound_sessions = {} self._do_init() def _do_init(self): self.listener = None self.backend = __import__("msrp.backend.%s" % RelayConfig.backend.lower(), globals(), locals(), [""]).Checker() if not RelayConfig.debug_notls: if RelayConfig.certificate is None: raise RuntimeError("TLS certificate file is not specified in configuration or is invalid") if RelayConfig.key is None: raise RuntimeError("TLS private key file is not specified in configuration or is invalid") self.credentials = X509Credentials(RelayConfig.certificate, RelayConfig.key) self.credentials.verify_peer = False # TODO: add configuration option for configuring session parameters? -Saul if RelayConfig.hostname != "": self.hostname = RelayConfig.hostname if not RelayConfig.debug_notls: def matches(hostname, pattern): if pattern.startswith('*.'): return hostname.endswith(pattern[1:]) else: return hostname == pattern if not any(matches(self.hostname, name) for name in RelayConfig.certificate.alternative_names.dns): raise RuntimeError('The specified MSRP Relay hostname "%s" is not set as DNS subject alternative name in the TLS certificate.' % self.hostname) elif not RelayConfig.debug_notls: self.hostname = RelayConfig.certificate.alternative_names.dns[0] # Just grab the first one? elif RelayConfig.address[0] != "0.0.0.0": self.hostname = RelayConfig.address[0] else: self.hostname = host.default_ip self.auth_challenger = AuthChallenger(RelayConfig.auth_challenge_expiration_time) def _do_run(self): if RelayConfig.debug_notls: self.listener = reactor.listenTCP(RelayConfig.address[1], RelayFactory(), interface=RelayConfig.address[0]) else: self.listener = reactor.listenTLS(RelayConfig.address[1], RelayFactory(), TLSContext(self.credentials), interface=RelayConfig.address[0]) def run(self): self._do_run() reactor.run() def reload(self): log.debug('Reloading configuration file') RelayConfig.reset() RelayConfig.read() if not self.listener: try: self._do_init() except RuntimeError, e: log.critical('Error reloading configuration file: %s' % e) reactor.stop() else: result = self.listener.stopListening() result.addCallback(lambda x: self._do_init()) result.addCallbacks(lambda x: self._do_run(), self._reload_failure) def _reload_failure(self, failure): failure.trap(RuntimeError) log.critical('Error reloading configuration file: %s' % failure.value) reactor.stop() def generate_uri(self): return URI(self.hostname, port=RelayConfig.address[1], use_tls=not RelayConfig.debug_notls) class RelayFactory(Factory): protocol = MSRPProtocol noisy = False def get_peer(self, protocol): peer = Peer(protocol = protocol) return peer class ConnectingFactory(ClientFactory): protocol = MSRPProtocol noisy = False def __init__(self, peer): self.peer = peer def get_peer(self, protocol): self.peer.got_protocol(protocol) return self.peer def clientConnectionFailed(self, connector, reason): self.peer.connection_failed(reason.value) class ForwardingData(object): def __init__(self, msrpdata): self.msrpdata_received = msrpdata self.msrpdata_forward = None self.bytes_received = 0 self.data_queue = deque() self.continuation = None def __str__(self): return str(self.msrpdata_received) @property def method(self): return self.msrpdata_received.method @property def bytes_in_queue(self): return sum(len(data) for data in self.data_queue) def add_data(self, data): self.bytes_received += len(data) self.data_queue.append(data) def consume_data(self): if self.data_queue: return self.data_queue.popleft() else: return None class ForwardingSendData(ForwardingData): def __init__(self, msrpdata): super(ForwardingSendData, self).__init__(msrpdata) self.position = msrpdata.headers["Byte-Range"].decoded[0] class PeerLogger(log.ContextualLogger): def __init__(self, peer): super(PeerLogger, self).__init__(logger=log.get_logger()) self.peer = weakref.ref(peer) def apply_context(self, message): peer = self.peer() if message == '' or peer is None: return message # cannot apply context if there is no message or peer to provide the context elif peer.session is not None: return 'session {session.session_id} for {session.username}@{session.realm} ({state}): {message}'.format(session=peer.session, state=peer.state, message=message) elif peer.protocol is not None: return '{address.host}:{address.port} ({state}): {message}'.format(address=peer.protocol.transport.getPeer(), state=peer.state, message=message) else: return message # peer exists, but both peer.session and peer.protocol are None so there is no context to apply # A session always exists of two peer instances that are associated with # eachother. # # A Peer instance has an attribute state, which can be one of the following: # # NEW: # A newly created incoming connection. If an AUTH is received and successfully # processed this creates a new session and moves the Peer into the UNBOUND # state. Alternatively, a SEND for an existing session can be received, which # directly binds the Peer with the session and moves it into ESTABLISHED. # # UNBOUND: # A newly created session which does not yet have another endpoint associated # with it. This association can occur either through a new outgoing connection # when the client that performed the AUTH sends a SEND message or when a new # SEND message is received. # # CONNECTING: # An attempt at a new outgoing connection. Messages can already be queued on it. # When the protocol is connected the Peer will move into ESTABLISHED. # # ESTABLISHED: # The two endpoints for the session are known and messages can be passed through # in either direction. When the connection to this peer is lost, the Peer will # move into DISCONNECTED, when the connection to the other peer is lost it will # move into DISCONNECTING. # # DISCONNECTING: # The connection to the other peer is lost and relay has to send REPORTs for # queued messages that require it. After this it will disconnect itself. # # DISCONNECTED: # When the peer is disconnected, which can happen at any time, inform the other # associated peer. This is always the end state. # # | | # V V # +-----+ +------------+ # +-----| NEW |---------+ +--------| CONNECTING |-----+ # | +-----+ | | +------------+ | # | | | | | # | V V V | # | +---------+ +-------------+ +---------------+ | # |<--| UNBOUND |-->| ESTABLISHED |-->| DISCONNECTING |-->| # | +---------+ +-------------+ +---------------+ | # | | | # | V | # | +--------------+ | # +---------------->| DISCONNECTED |<---------------------+ # +--------------+ class Peer(object): implements(IPullProducer) def __init__(self, session = None, protocol = None, path = None, other_peer = None): self.session = session self.protocol = protocol self.other_peer = other_peer self.path = path if self.protocol is not None: self.state = "NEW" self.auth_attempts = 0 self.invalid_timer = reactor.callLater(30, self._cb_invalid) else: self.invalid_timer = None self.state = "CONNECTING" self.send_transactions = {} self.other_transactions = {} # Other requests not REPORT or SEND self.forwarding_data = None # transmission attributes self.registered = False self.forwarding_send_request = False self.forward_send_queue = deque() self.forward_other_queue = deque() self.read_paused = False self.relay = Relay() self.logger = PeerLogger(self) # called by MSRPProtocol def data_start(self, msrpdata): # self.logger.debug('Received headers for %s', msrpdata) if self.state == "NEW": result = maybeDeferred(self._unbound_peer_data, msrpdata) result.addErrback(self._cb_catch_response, msrpdata) else: self._bound_peer_data(msrpdata) def write_chunk(self, chunk): # self.logger.debug("Received %d bytes of MSRP body", len(chunk)) if self.state == "ESTABLISHED" and self.forwarding_data: self.forwarding_data.add_data(chunk) if self.forwarding_data.method != 'SEND' and self.forwarding_data.bytes_in_queue > 10240: self.logger.debug('Non-SEND request payload bigger than 10KB, closing connection') del self.other_transactions[self.forwarding_data.msrpdata_forward.transaction_id] self.forwarding_data = None self.protocol.transport.loseConnection() return self.other_peer._maybe_pause_read() self.other_peer.start_sending() def data_end(self, continuation): # self.logger.debug('Received termination "%s"', continuation) if self.state == "ESTABLISHED" and self.forwarding_data: msrpdata = self.forwarding_data.msrpdata_received if msrpdata.method == "SEND": if msrpdata.failure_report != "no": if msrpdata.failure_report == "yes": self.enqueue(ResponseOK(msrpdata).data.encode()) self.send_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, None) self.forwarding_data.continuation = continuation else: if msrpdata.method != 'REPORT' and msrpdata.failure_report != 'no': # Keep track, for matching replies timer = reactor.callLater(30, self._cb_transaction_timeout, self.forwarding_data) self.other_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, timer) # For methods other than SEND, assemble the chunk and don't use ForwardingData self.forwarding_data.msrpdata_forward.data = ''.join(self.forwarding_data.data_queue) self.other_peer.enqueue(self.forwarding_data.msrpdata_forward.encode(continuation)) self.other_peer.start_sending() self.forwarding_data = None def connection_lost(self, reason): self.logger.debug('Connection lost: %s', reason) if self.invalid_timer and self.invalid_timer.active(): self.invalid_timer.cancel() self.invalid_timer = None if self.state == "ESTABLISHED": self.other_peer.disconnect() if self.state == "UNBOUND": del self.relay.unbound_sessions[self.session.session_id] self.state = "DISCONNECTED" if self.session is not None and self is self.session.source: self.logger.debug('bytes sent: %d, bytes received: %d', self.session.upstream_bytes, self.session.downstream_bytes) self._cleanup() # called by ConnectingFactory def connection_failed(self, reason): self.logger.warning('Connection failed: %s', reason) if self.state == "CONNECTING": self.other_peer.disconnect() self._cleanup() # methods for an unbound peer def _cb_invalid(self): self.logger.warning('No valid MSRP message received, disconnecting') self.invalid_timer = None self.disconnect() def _cb_catch_response(self, failure, msrpdata): failure.trap(ResponseException) if msrpdata.method is None: self.logger.warning('Caught exception to response: %s (%s)', failure.value.__class__.__name__, failure.value.data.comment) return response = failure.value.data # self.logger.debug('Sending response %03d (%s)', response.code, response.comment) self.enqueue(response.encode()) def _unbound_peer_data(self, msrpdata): try: msrpdata.verify_headers() except ParsingError, e: if isinstance(e, HeaderParsingError) and (e.header == "To-Path" or e.header == "From-Path"): self.logger.warning('Cannot send error response, path headers unreadable') return else: raise ResponseUnintelligible(msrpdata, e.args[0]) # Check if To-Path is really directed to us. to_path = msrpdata.headers["To-Path"].decoded from_path = msrpdata.headers["From-Path"].decoded if msrpdata.method == "AUTH" and len(to_path) == 1: return self._handle_auth(msrpdata) elif msrpdata.method == "SEND" and len(to_path) > 1: session_id = to_path[0].session_id try: session = self.relay.unbound_sessions[session_id] except KeyError: raise ResponseNoSession(msrpdata) self.logger.debug('Found matching unbound session %s', session.session_id) self.state = "ESTABLISHED" self.invalid_timer.cancel() self.invalid_timer = None self.session = session self.path = from_path self.other_peer = self.session.source self.other_peer.got_destination(self) self._bound_peer_data(msrpdata) else: raise ResponseUnknownMethod(msrpdata) def _handle_auth(self, msrpdata): auth_challenger = self.relay.auth_challenger if RelayConfig.default_domain == "": realm = msrpdata.headers["To-Path"].decoded[0].host else: realm = RelayConfig.default_domain if not msrpdata.headers.has_key("Authorization"): # If the Authorization header is not present generate challenge data and respond. www_authenticate = auth_challenger.generate_www_authenticate(realm, self.protocol.transport.getPeer().host) raise ResponseUnauthenticated(msrpdata, headers = [WWWAuthenticateHeader(www_authenticate)]) else: authorization = msrpdata.headers["Authorization"].decoded if authorization.get("realm") != realm: raise ResponseUnauthorized(msrpdata, "realm does not match") if authorization.get("qop") != "auth": raise ResponseUnauthorized(msrpdata, "qop != auth") try: username = authorization["username"] session_id = authorization["nonce"] except KeyError, e: raise ResponseUnauthorized(msrpdata, "%s field not present in Authorization header" % e.args[0]) if self.relay.backend.cleartext_passwords: result = self.relay.backend.retrieve_password(username, realm) func = auth_challenger.process_authorization_password else: result = self.relay.backend.retrieve_ha1(username, realm) func = auth_challenger.process_authorization_ha1 result.addCallback(func, "AUTH", msrpdata.headers["To-Path"].encoded.split()[-1], self.protocol.transport.getPeer().host, **authorization) result.addCallbacks(self._cb_login_success, self._eb_login_failed, callbackArgs=[msrpdata, session_id, username, realm], errbackArgs=[msrpdata]) return result def _eb_login_failed(self, failure, msrpdata): failure.trap(LoginFailed) self.auth_attempts += 1 if RelayConfig.log_failed_auth: try: username = msrpdata.headers["Authorization"].decoded["username"] except IndexError: self.logger.warning('AUTH failed, no username: %s', failure.value.args[0]) else: self.logger.warning('AUTH failed for username "%s": %s', username, failure.value.args[0]) if self.auth_attempts == RelayConfig.max_auth_attempts: self.disconnect() else: raise ResponseUnauthorized(msrpdata, "Login failed: %s" % failure.value.args[0]) def _cb_login_success(self, authentication_info, msrpdata, session_id, username, realm): # Check the Expires header, if present. if msrpdata.headers.has_key("Expires"): expire = msrpdata.headers["Expires"].decoded if expire < RelayConfig.session_expiration_time_minimum: raise ResponseOutOfBounds(msrpdata, headers = [MinExpiresHeader(RelayConfig.session_expiration_time_minimum)]) if expire > RelayConfig.session_expiration_time_maximum: raise ResponseOutOfBounds(msrpdata, headers = [MaxExpiresHeader(RelayConfig.session_expiration_time_maximum)]) else: expire = RelayConfig.session_expiration_time_default # We got a successful AUTH request, so add a new session # and reply with the the Use-Path. self.state = "UNBOUND" self.invalid_timer.cancel() self.invalid_timer = None from_path = msrpdata.headers["From-Path"].decoded self.path = from_path self.relay.unbound_sessions[session_id] = self.session = Session(self, session_id, expire, username, realm) use_path = copy(from_path) use_path.pop() use_path = list(reversed(use_path)) uri = self.relay.generate_uri() uri.session_id = session_id use_path.append(uri) headers = [UsePathHeader(use_path), ExpiresHeader(expire), AuthenticationInfoHeader(authentication_info)] self.logger.debug('AUTH succeeded, creating new session') raise ResponseOK(msrpdata, headers = headers) # methods for a unconnected peer def got_protocol(self, protocol): self.logger.debug('Successfully connected') self.state = "ESTABLISHED" self.protocol = protocol if self.forward_other_queue or self.forward_send_queue: self.start_sending() def got_destination(self, other_peer): self.state = "ESTABLISHED" del self.relay.unbound_sessions[self.session.session_id] self.session.destination = other_peer self.other_peer = other_peer # methods for a bound and connected peer def _bound_peer_data(self, msrpdata): try: try: msrpdata.verify_headers() except ParsingError, e: if isinstance(e, HeaderParsingError) and (e.header == "To-Path" or e.header == "From-Path"): self.logger.error('Cannot send error response, path headers unreadable') return else: raise ResponseUnintelligible(msrpdata, e.args[0]) to_path = copy(msrpdata.headers["To-Path"].decoded) from_path = copy(msrpdata.headers["From-Path"].decoded) relay_uri = to_path.popleft() if relay_uri.session_id != self.session.session_id: raise ResponseNoSession(msrpdata, "Wrong session id on relay MSRP URI") if len(to_path) == 0 and msrpdata.method not in (None, "AUTH"): raise ResponseNoSession(msrpdata, "Non-response with me as endpoint, nowhere to relay to") for index, uri in enumerate(from_path): if uri != self.path[index]: raise ResponseNoSession(msrpdata, "From-Path does not match session source") if msrpdata.method == "AUTH": if msrpdata.headers.has_key("Expires"): expire = msrpdata.headers["Expires"].decoded if expire < RelayConfig.session_expiration_time_minimum: raise ResponseOutOfBounds(msrpdata, headers=[MinExpiresHeader(RelayConfig.session_expiration_time_minimum)]) if expire > RelayConfig.session_expiration_time_maximum: raise ResponseOutOfBounds(msrpdata, headers=[MaxExpiresHeader(RelayConfig.session_expiration_time_maximum)]) else: expire = RelayConfig.session_expiration_time_default use_path = from_path use_path.pop() use_path = list(reversed(use_path)) use_path.append(relay_uri) headers = [UsePathHeader(use_path), ExpiresHeader(expire)] self.logger.debug('Received refreshing AUTH') raise ResponseOK(msrpdata, headers=headers) if self.state == "UNBOUND": if msrpdata.method != "SEND": raise ResponseNoSession(msrpdata, "Non-SEND method received on unbound session") self.got_destination(Peer(path = to_path, session = self.session, other_peer = self)) uri = to_path[0] # self.logger.debug('Attempting to connect to %s', uri) factory = ConnectingFactory(self.other_peer) if uri.use_tls: self.other_peer.connector = reactor.connectTLS(uri.host, uri.port, factory, TLSContext(self.relay.credentials)) else: self.other_peer.connector = reactor.connectTCP(uri.host, uri.port, factory) else: for index, uri in enumerate(to_path): if uri != self.other_peer.path[index]: raise ResponseNoSession(msrpdata, "To-Path does not match session destination") if msrpdata.method == "SEND": if not msrpdata.headers.has_key("Message-ID"): raise ResponseUnintelligible(msrpdata, "SEND received without Message-ID") if not msrpdata.headers.has_key("Byte-Range"): raise ResponseUnintelligible(msrpdata, "SEND received without Byte-Range") if msrpdata.method is None: # we got a response if msrpdata.transaction_id in self.other_peer.send_transactions: # Handle response to SEND request with Failure-Report != no forwarding_data, timer = self.other_peer.send_transactions.pop(msrpdata.transaction_id) if timer is not None: timer.cancel() if msrpdata.code != ResponseOK.code: report = generate_report(msrpdata.code, forwarding_data, reason=msrpdata.comment) self.other_peer.enqueue(report.encode()) elif msrpdata.transaction_id in self.other_peer.other_transactions: forwarding_data, timer = self.other_peer.other_transactions.pop(msrpdata.transaction_id) if timer is not None: timer.cancel() forward = msrpdata forward.transaction_id = forwarding_data.msrpdata_received.transaction_id to_path_header = msrpdata.headers["To-Path"] to_path = to_path_header.decoded from_path_header = msrpdata.headers["From-Path"] from_path = from_path_header.decoded from_path.appendleft(to_path.popleft()) to_path_header.decoded = to_path from_path_header.decoded = from_path self.other_peer.enqueue(forward.encode()) else: self.logger.debug('Received response for untracked request: %s', msrpdata) elif msrpdata.method in ("SEND", "REPORT", "NICKNAME") or RelayConfig.allow_other_methods: # Do the magic of appending the first To-Path URI to the From-Path. to_path = copy(msrpdata.headers["To-Path"].decoded) from_path = copy(msrpdata.headers["From-Path"].decoded) from_path.appendleft(to_path.popleft()) forward = MSRPData(generate_transaction_id(), method=msrpdata.method) for header in msrpdata.headers.itervalues(): forward.add_header(MSRPHeader(header.name, header.encoded)) forward.add_header(ToPathHeader(to_path)) forward.add_header(FromPathHeader(from_path)) if msrpdata.method == 'SEND': self.forwarding_data = ForwardingSendData(msrpdata) else: self.forwarding_data = ForwardingData(msrpdata) self.forwarding_data.msrpdata_forward = forward if self.forwarding_data.method == 'SEND': if msrpdata.failure_report != "no": self.send_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, None) self.other_peer.enqueue(self.forwarding_data) else: self.other_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, None) else: raise ResponseUnknownMethod(msrpdata) except ResponseException, e: if msrpdata.method is None: self.logger.debug('Caught exception to response: %s (%s)', e.__class__.__name__, e.data.comment) return response = e.data self.enqueue(response.encode()) def _cb_transaction_timeout(self, forward_data): transaction_id = forward_data.msrpdata_forward.transaction_id if forward_data.method == 'SEND': del self.send_transactions[transaction_id] if forward_data.msrpdata_received.failure_report == 'yes': report = generate_report(ResponseDownstreamTimeout.code, forward_data) self.enqueue(report.encode()) else: # We don't really need to do anything here, just remove the request from the mapping del self.other_transactions[transaction_id] def enqueue(self, msrpdata): # self.logger.debug('Enqueuing %s', msrpdata) if isinstance(msrpdata, ForwardingData): self.forward_send_queue.append(msrpdata) else: # a string containing a request or response self.forward_other_queue.append(msrpdata) self._maybe_pause_read() self.start_sending() def start_sending(self): if self.state not in ["CONNECTING", "DISCONNECTED"] and not self.registered: # self.logger.debug('Starting transmission') self.registered = True self.protocol.transport.registerProducer(self, False) def _stop_sending(self): # self.logger.debug('Empty queues, halting transmission') self.registered = False self.protocol.transport.unregisterProducer() if self.state == "DISCONNECTING": self.state = "DISCONNECTED" self.protocol.transport.loseConnection() def disconnect(self): # self.logger.debug('Disconnecting when possible') if self.invalid_timer and self.invalid_timer.active(): self.invalid_timer.cancel() self.invalid_timer = None if self.state == "NEW": self.protocol.transport.loseConnection() self.state = "DISCONNECTED" elif self.state == "UNBOUND": del self.relay.unbound_sessions[self.session.session_id] self.protocol.transport.loseConnection() self.state = "DISCONNECTED" elif self.state == "CONNECTING": self.connector.disconnect() self.state = "DISCONNECTED" elif self.state == "ESTABLISHED": for forwarding_data, timer in self.send_transactions.itervalues(): if timer is not None: timer.cancel() report = generate_report(ResponseDownstreamTimeout.code, forwarding_data, reason="Session got disconnected") self.enqueue(report.encode()) self.send_transactions.clear() for _, timer in self.other_transactions.itervalues(): if timer is not None: timer.cancel() self.other_transactions.clear() if not self.registered: self.protocol.transport.loseConnection() self.state = "DISCONNECTED" else: self.state = "DISCONNECTING" def _cleanup(self): self.session = None self.other_peer = None self.protocol = None self.relay = None for _, timer in self.send_transactions.itervalues(): if timer is not None and timer.active(): timer.cancel() self.send_transactions.clear() for _, timer in self.other_transactions.itervalues(): if timer is not None and timer.active(): timer.cancel() self.other_transactions.clear() self.forward_send_queue.clear() self.forward_other_queue.clear() @property def _data_bytes(self): return sum(data.bytes_in_queue for data in self.forward_send_queue) @property def _message_count(self): return len(self.forward_other_queue) + len(self.forward_send_queue) def _maybe_pause_read(self): if self.state == "ESTABLISHED" and self.other_peer.state == "ESTABLISHED" and not self.other_peer.read_paused: if self._data_bytes > 1024 * 1024 or self._message_count > 50: self.other_peer.pause_read() def _maybe_resume_read(self): if self.state == "ESTABLISHED" and self.other_peer.state == "ESTABLISHED" and self.other_peer.read_paused: if self._data_bytes <= 1024 * 1024 and self._message_count <= 50: self.other_peer.resume_read() def pause_read(self): self.read_paused = True self.protocol.transport.stopReading() def resume_read(self): self.read_paused = False self.protocol.transport.startReading() # methods implemented for IPullProducer def resumeProducing(self): if self.forward_other_queue: if self.forwarding_send_request: # self.logger.debug('Sending other message, aborting SEND') data = self.forward_send_queue.popleft() # terminate the current chunk self._send_payload(data.msrpdata_forward.encode_end("+")) timer = reactor.callLater(30, self.other_peer._cb_transaction_timeout, data) self.other_peer.send_transactions[data.msrpdata_forward.transaction_id] = (data, timer) if self.other_peer.forwarding_data is data: self.other_peer.forwarding_data = None # clone the forward data new_data = ForwardingSendData(data.msrpdata_received) new_data.continuation = data.continuation new_data.position = data.position new_data.data_queue, data.data_queue = data.data_queue, deque() new_data.msrpdata_forward = data.msrpdata_forward.clone() # adjust the byterange, create a new transaction new_data.msrpdata_forward.transaction_id = generate_transaction_id() byterange = new_data.msrpdata_forward.headers["Byte-Range"].decoded byterange[0] = new_data.position new_data.msrpdata_forward.headers["Byte-Range"].decoded = byterange self.forward_send_queue.appendleft(new_data) if self.other_peer.forwarding_data is None and new_data.continuation is None: self.other_peer.forwarding_data = new_data self.forwarding_send_request = False else: # self.logger.debug('Sending message') self._send_payload(self.forward_other_queue.popleft()) self._maybe_resume_read() elif self.forwarding_send_request: data = self.forward_send_queue[0] if data.continuation is not None and len(data.data_queue) <= 1: # self.logger.debug('Sending end of SEND message') self.forward_send_queue.popleft() chunk = data.consume_data() if chunk is not None: self._send_payload(chunk) self._send_payload(data.msrpdata_forward.encode_end(data.continuation)) if data.msrpdata_received.failure_report != 'no': timer = reactor.callLater(30, self.other_peer._cb_transaction_timeout, data) self.other_peer.send_transactions[data.msrpdata_forward.transaction_id] = (data, timer) self.forwarding_send_request = False self._maybe_resume_read() else: chunk = data.consume_data() if chunk is None: self._stop_sending() else: # self.logger.debug('Sending data for SEND message') self._send_payload(chunk) data.position += len(chunk) self._maybe_resume_read() elif self.forward_send_queue: # self.logger.debug('Sending headers for SEND message') data = self.forward_send_queue[0] self.forwarding_send_request = True self._send_payload(data.msrpdata_forward.encode_start()) else: self._stop_sending() def stopProducing(self): pass def _send_payload(self, data): if self.session is not None: if self is self.session.source: self.session.upstream_bytes += len(data) else: self.session.downstream_bytes += len(data) self.protocol.transport.write(data) class Session(object): def __init__(self, source, session_id, expire, username, realm): self.source = source self.destination = None self.session_id = session_id self.expire = expire self.username = username self.realm = realm self.downstream_bytes = 0 self.upstream_bytes = 0 diff --git a/msrp/tls.py b/msrp/tls.py index feaefa9..42bbcaf 100644 --- a/msrp/tls.py +++ b/msrp/tls.py @@ -1,48 +1,48 @@ __all__ = ['Certificate', 'PrivateKey'] from gnutls.crypto import X509Certificate, X509PrivateKey from application.process import process class _FileError(Exception): pass def file_content(file): - path = process.config_file(file) + path = process.configuration.file(file) if path is None: raise _FileError("File '%s' does not exist" % file) try: f = open(path, 'rt') except: raise _FileError("File '%s' could not be open" % file) try: return f.read() finally: f.close() class Certificate(object): """Configuration data type. Used to create a gnutls.crypto.X509Certificate object from a file given in the configuration file.""" def __new__(cls, value): if isinstance(value, str): try: return X509Certificate(file_content(value)) except Exception, e: raise ValueError("Certificate file '%s' could not be loaded: %s" % (value, str(e))) else: raise TypeError('value should be a string') class PrivateKey(object): """Configuration data type. Used to create a gnutls.crypto.X509PrivateKey object from a file given in the configuration file.""" def __new__(cls, value): if isinstance(value, str): try: return X509PrivateKey(file_content(value)) except Exception, e: raise ValueError("Private key file '%s' could not be loaded: %s" % (value, str(e))) else: raise TypeError('value should be a string') diff --git a/msrprelay b/msrprelay index c8171e5..ea491c7 100644 --- a/msrprelay +++ b/msrprelay @@ -1,83 +1,77 @@ #!/usr/bin/env python """MSRP Relay""" -if __name__ == "__main__": +if __name__ == '__main__': import msrp - import os import sys import signal from application import log from application.process import process, ProcessError from argparse import ArgumentParser - name = "msrprelay" - fullname = "MSRP Relay" - description = "An open source MSRP Relay" + name = 'msrprelay' + fullname = 'MSRP Relay' + description = 'An open source MSRP Relay' - default_pid = os.path.join(msrp.runtime_directory, "relay.pid") - default_config = os.path.join(msrp.system_config_directory , msrp.configuration_filename) + process.configuration.user_directory = None + process.configuration.subdirectory = name + # process.runtime.subdirectory = name parser = ArgumentParser(usage='%(prog)s [options]') parser.add_argument('--version', action='version', version='%(prog)s {}'.format(msrp.__version__)) parser.add_argument('--no-fork', action='store_false', dest='fork', help='run in the foreground and log to terminal') - parser.add_argument('--pid', dest='pid_file', default=default_pid, help='pid file when forking ({})'.format(default_pid), metavar='FILE') - parser.add_argument('--config-file', default=default_config, help='configuration file to read', metavar='FILE') + parser.add_argument('--config-dir', dest='config_directory', default=None, help='the configuration directory', metavar='PATH') + parser.add_argument('--runtime-dir', dest='runtime_directory', default=None, help='the runtime directory ({})'.format(process.runtime.directory), metavar='PATH') parser.add_argument('--debug', action='store_true', help='enable verbose logging') parser.add_argument('--debug-memory', action='store_true', help='enable memory debugging') options = parser.parse_args() log.Formatter.prefix_format = '{record.levelname:<8s} ' - pid_file = options.pid_file - system_config_directory, msrp.configuration_filename = os.path.split(options.config_file) - if system_config_directory != "": - msrp.system_config_directory = system_config_directory - - process.system_config_directory = msrp.system_config_directory - try: - process.runtime_directory = msrp.runtime_directory - except ProcessError, e: - log.critical('Cannot start %s: %s' % (fullname, e)) - sys.exit(1) + if options.config_directory is not None: + process.configuration.local_directory = options.config_directory + if options.runtime_directory is not None: + process.runtime.directory = options.runtime_directory if options.fork: try: + pid_file = '{}.pid'.format(name) process.daemonize(pid_file) except ProcessError, e: log.critical('Cannot start %s: %s' % (fullname, e)) sys.exit(1) log.use_syslog(name) log.info('Starting %s %s' % (fullname, msrp.__version__)) from msrp.relay import Relay, RelayConfig log.level.current = log.level.DEBUG if options.debug else RelayConfig.log_level if options.debug_memory: from application.debug.memory import memory_dump try: relay = Relay() except Exception, e: log.critical('Failed to create %s: %s' % (fullname, e)) if e.__class__ is not RuntimeError: log.exception() sys.exit(1) process.signals.add_handler(signal.SIGHUP, lambda signum, frame: relay.reload()) try: relay.run() except Exception, e: log.critical('Failed to run %s: %s' % (fullname, e)) if e.__class__ is not RuntimeError: log.exception() sys.exit(1) if options.debug_memory: memory_dump()