diff --git a/debian/control b/debian/control index 741a525..ec6a9c3 100644 --- a/debian/control +++ b/debian/control @@ -1,16 +1,16 @@ Source: msrprelay Section: net Priority: optional Maintainer: Adrian Georgescu Uploaders: Saul Ibarra , Dan Pascu Build-Depends: debhelper (>= 7.3.5), python(>= 2.7) Standards-Version: 3.9.6 Package: msrprelay Architecture: all -Depends: ${python:Depends}, ${misc:Depends}, python-application (>= 1.2.8), python-gnutls (>= 1.1.8), python-twisted-core (>= 2.5.0), python-twisted-names, python-sqlobject (>= 0.10.2) +Depends: ${python:Depends}, ${misc:Depends}, python-application (>= 1.2.8), python-gnutls (>= 3.0.0), python-twisted-core (>= 2.5.0), python-twisted-names, python-sqlobject (>= 0.10.2) Description: MSRP Relay, a RFC 4976 compatible IM/File transfer relay This software implements an MSRP relay, which is an extension to the MSRP protocol (RFC 4975). Its main role is to help NAT traversal of Interactive Messaging and file transfer sessions for SIP/MSRP endpoints located behind NAT. diff --git a/msrp/relay.py b/msrp/relay.py index b6d4314..702144d 100644 --- a/msrp/relay.py +++ b/msrp/relay.py @@ -1,779 +1,775 @@ from copy import copy from collections import deque from application import log from application.configuration import * from application.configuration.datatypes import NetworkAddress, LogLevel from application.python.types import Singleton from application.system import host from zope.interface import implements from twisted.internet.defer import maybeDeferred from twisted.internet import reactor from twisted.internet.protocol import Factory, ClientFactory from twisted.internet.interfaces import IPullProducer -from gnutls.constants import * -from gnutls.interfaces.twisted import X509Credentials +from gnutls.interfaces.twisted import TLSContext, X509Credentials from msrp.tls import Certificate, PrivateKey from msrp.protocol import * from msrp.digest import AuthChallenger, LoginFailed from msrp.responses import * from msrp import configuration_filename class RelayConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Relay' address = ConfigSetting(type=NetworkAddress, value=NetworkAddress("0.0.0.0:2855")) hostname = "" default_domain = "" allow_other_methods = True session_expiration_time_minimum = 60 session_expiration_time_default = 600 session_expiration_time_maximum = 3600 auth_challenge_expiration_time = 15 backend = "database" max_auth_attempts = 3 debug_notls = False log_failed_auth = False certificate = ConfigSetting(type=Certificate, value=None) key = ConfigSetting(type=PrivateKey, value=None) log_level = ConfigSetting(type=LogLevel, value=log.level.DEBUG) class Relay(object): __metaclass__ = Singleton def __init__(self): self.unbound_sessions = {} self._do_init() def _do_init(self): self.listener = None log.level.current = RelayConfig.log_level self.backend = __import__("msrp.backend.%s" % RelayConfig.backend.lower(), globals(), locals(), [""]).Checker() if not RelayConfig.debug_notls: if RelayConfig.certificate is None: raise RuntimeError("TLS certificate file is not specified in configuration or is invalid") if RelayConfig.key is None: raise RuntimeError("TLS private key file is not specified in configuration or is invalid") self.credentials = X509Credentials(RelayConfig.certificate, RelayConfig.key) - self.credentials.session_params.protocols = (PROTO_TLS1_1, PROTO_TLS1_0) - self.credentials.session_params.kx_algorithms = (KX_RSA,) - self.credentials.session_params.ciphers = (CIPHER_AES_128_CBC,) - self.credentials.session_params.mac_algorithms = (MAC_SHA1,) self.credentials.verify_peer = False + # TODO: add configuration option for configuring session parameters? -Saul if RelayConfig.hostname != "": self.hostname = RelayConfig.hostname if not RelayConfig.debug_notls: def matches(hostname, pattern): if pattern.startswith('*.'): return hostname.endswith(pattern[1:]) else: return hostname == pattern if not any(matches(self.hostname, name) for name in RelayConfig.certificate.alternative_names.dns): raise RuntimeError('The specified MSRP Relay hostname "%s" is not set as DNS subject alternative name in the TLS certificate.' % self.hostname) elif not RelayConfig.debug_notls: self.hostname = RelayConfig.certificate.alternative_names.dns[0] # Just grab the first one? elif RelayConfig.address[0] != "0.0.0.0": self.hostname = RelayConfig.address[0] else: self.hostname = host.default_ip self.auth_challenger = AuthChallenger(RelayConfig.auth_challenge_expiration_time) def _do_run(self): if RelayConfig.debug_notls: self.listener = reactor.listenTCP(RelayConfig.address[1], RelayFactory(), interface=RelayConfig.address[0]) else: - self.listener = reactor.listenTLS(RelayConfig.address[1], RelayFactory(), self.credentials, interface=RelayConfig.address[0]) + self.listener = reactor.listenTLS(RelayConfig.address[1], RelayFactory(), TLSContext(self.credentials), interface=RelayConfig.address[0]) def run(self): self._do_run() reactor.run() def reload(self): log.debug("Reloading configuration file") RelayConfig.reset() RelayConfig.read() if not self.listener: try: self._do_init() except RuntimeError, e: log.fatal("Error reloading configuration file: %s" % e) reactor.stop() else: result = self.listener.stopListening() result.addCallback(lambda x: self._do_init()) result.addCallbacks(lambda x: self._do_run(), self._reload_failure) def _reload_failure(self, failure): failure.trap(RuntimeError) log.fatal("Error reloading configuration file: %s" % failure.value) reactor.stop() def generate_uri(self): return URI("%s" % self.hostname, port = RelayConfig.address[1], use_tls = not RelayConfig.debug_notls) class RelayFactory(Factory): protocol = MSRPProtocol noisy = False def get_peer(self, protocol): peer = Peer(protocol = protocol) return peer class ConnectingFactory(ClientFactory): protocol = MSRPProtocol noisy = False def __init__(self, peer): self.peer = peer def get_peer(self, protocol): self.peer.got_protocol(protocol) return self.peer def clientConnectionFailed(self, connector, reason): self.peer.connection_failed(reason.value) class ForwardingData(object): def __init__(self, msrpdata): self.msrpdata_received = msrpdata self.msrpdata_forward = None self.bytes_received = 0 self.data_queue = deque() self.continuation = None def __str__(self): return str(self.msrpdata_received) @property def method(self): return self.msrpdata_received.method @property def bytes_in_queue(self): return sum(len(data) for data in self.data_queue) def add_data(self, data): self.bytes_received += len(data) self.data_queue.append(data) def consume_data(self): if self.data_queue: return self.data_queue.popleft() else: return None class ForwardingSendData(ForwardingData): def __init__(self, msrpdata): super(ForwardingSendData, self).__init__(msrpdata) self.position = msrpdata.headers["Byte-Range"].decoded[0] # A session always exists of two peer instances that are associated with # eachother. # # A Peer instance has an attribute state, which can be one of the following: # # NEW: # A newly created incoming connection. If an AUTH is received and successfully # processed this creates a new session and moves the Peer into the UNBOUND # state. Alternatively, a SEND for an existing session can be received, which # directly binds the Peer with the session and moves it into ESTABLISHED. # # UNBOUND: # A newly created session which does not yet have another endpoint associated # with it. This association can occur either through a new outgoing connection # when the client that performed the AUTH sends a SEND message or when a new # SEND message is received. # # CONNECTING: # An attempt at a new outgoing connection. Messages can already be queued on it. # When the protocol is connected the Peer will move into ESTABLISHED. # # ESTABLISHED: # The two endpoints for the session are known and messages can be passed through # in either direction. When the connection to this peer is lost, the Peer will # move into DISCONNECTED, when the connection to the other peer is lost it will # move into DISCONNECTING. # # DISCONNECTING: # The connection to the other peer is lost and relay has to send REPORTs for # queued messages that require it. After this it will disconnect itself. # # DISCONNECTED: # When the peer is disconnected, which can happen at any time, inform the other # associated peer. This is always the end state. # # | | # V V # +-----+ +------------+ # +-----| NEW |---------+ +--------| CONNECTING |-----+ # | +-----+ | | +------------+ | # | | | | | # | V V V | # | +---------+ +-------------+ +---------------+ | # |<--| UNBOUND |-->| ESTABLISHED |-->| DISCONNECTING |-->| # | +---------+ +-------------+ +---------------+ | # | | | # | V | # | +--------------+ | # +---------------->| DISCONNECTED |<---------------------+ # +--------------+ class Peer(object): implements(IPullProducer) def __init__(self, session = None, protocol = None, path = None, other_peer = None): self.session = session self.protocol = protocol self.other_peer = other_peer self.path = path if self.protocol is not None: self.state = "NEW" self.auth_attempts = 0 self.invalid_timer = reactor.callLater(30, self._cb_invalid) else: self.invalid_timer = None self.state = "CONNECTING" self.send_transactions = {} self.other_transactions = {} # Other requests not REPORT or SEND self.forwarding_data = None # transmission attributes self.registered = False self.forwarding_send_request = False self.forward_send_queue = deque() self.forward_other_queue = deque() self.read_paused = False self.relay = Relay() def __str__(self): if self.session is None: address = self.protocol.transport.getPeer() return "%s:%d (%s)" % (address.host, address.port, self.state) else: return "session %s for %s@%s (%s)" % (self.session.session_id, self.session.username, self.session.realm, self.state) def log(self, log_func, msg): log_func("%s: %s" % (str(self), msg)) # called by MSRPProtocol def data_start(self, msrpdata): #self.log(log.debug, "Received headers for %s" % str(msrpdata)) if self.state == "NEW": result = maybeDeferred(self._unbound_peer_data, msrpdata) result.addErrback(self._cb_catch_response, msrpdata) else: self._bound_peer_data(msrpdata) def write_chunk(self, chunk): #self.log(log.debug, "Received %d bytes of MSRP body" % len(chunk)) if self.state == "ESTABLISHED" and self.forwarding_data: self.forwarding_data.add_data(chunk) if self.forwarding_data.method != 'SEND' and self.forwarding_data.bytes_in_queue > 10240: self.log(log.debug, "Non-SEND request's payload bigger than 10KB, closing connection") del self.other_transactions[self.forwarding_data.msrpdata_forward.transaction_id] self.forwarding_data = None self.protocol.transport.loseConnection() return self.other_peer._maybe_pause_read() self.other_peer.start_sending() def data_end(self, continuation): #self.log(log.debug, "Received termination \"%s\"" % continuation) if self.state == "ESTABLISHED" and self.forwarding_data: msrpdata = self.forwarding_data.msrpdata_received if msrpdata.method == "SEND": if msrpdata.failure_report != "no": if msrpdata.failure_report == "yes": self.enqueue(ResponseOK(msrpdata).data.encode()) self.send_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, None) self.forwarding_data.continuation = continuation else: if msrpdata.method != 'REPORT' and msrpdata.failure_report != 'no': # Keep track, for matching replies timer = reactor.callLater(30, self._cb_transaction_timeout, self.forwarding_data) self.other_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, timer) # For methods other than SEND, assemble the chunk and don't use ForwardingData self.forwarding_data.msrpdata_forward.data = ''.join(self.forwarding_data.data_queue) self.other_peer.enqueue(self.forwarding_data.msrpdata_forward.encode(continuation)) self.other_peer.start_sending() self.forwarding_data = None def connection_lost(self, reason): self.log(log.debug, "Connection lost: %s" % reason) if self.invalid_timer and self.invalid_timer.active(): self.invalid_timer.cancel() self.invalid_timer = None if self.state == "ESTABLISHED": self.other_peer.disconnect() if self.state == "UNBOUND": del self.relay.unbound_sessions[self.session.session_id] self.state = "DISCONNECTED" if self.session is not None and self is self.session.source: self.log(log.debug, "bytes sent: %d, bytes received: %d" % (self.session.upstream_bytes, self.session.downstream_bytes)) self._cleanup() # called by ConnectingFactory def connection_failed(self, reason): self.log(log.warn, "Connection failed: %s" % reason) if self.state == "CONNECTING": self.other_peer.disconnect() self._cleanup() # methods for an unbound peer def _cb_invalid(self): self.log(log.warn, "No valid MSRP message received, disconnecting") self.invalid_timer = None self.disconnect() def _cb_catch_response(self, failure, msrpdata): failure.trap(ResponseException) if msrpdata.method is None: self.log(log.warn, "Caught exception to response: %s (%s)" % (failure.value.__class__.__name__, failure.value.data.comment)) return response = failure.value.data #self.log(log.debug, "Sending response %03d (%s)" % (response.code, response.comment)) self.enqueue(response.encode()) def _unbound_peer_data(self, msrpdata): try: msrpdata.verify_headers() except ParsingError, e: if isinstance(e, HeaderParsingError) and (e.header == "To-Path" or e.header == "From-Path"): self.log(log.warn, "Cannot send error response, path headers unreadable") return else: raise ResponseUnintelligible(msrpdata, e.args[0]) # Check if To-Path is really directed to us. to_path = msrpdata.headers["To-Path"].decoded from_path = msrpdata.headers["From-Path"].decoded if msrpdata.method == "AUTH" and len(to_path) == 1: return self._handle_auth(msrpdata) elif msrpdata.method == "SEND" and len(to_path) > 1: session_id = to_path[0].session_id try: session = self.relay.unbound_sessions[session_id] except KeyError: raise ResponseNoSession(msrpdata) self.log(log.debug, "Found matching unbound session %s" % session.session_id) self.state = "ESTABLISHED" self.invalid_timer.cancel() self.invalid_timer = None self.session = session self.path = from_path self.other_peer = self.session.source self.other_peer.got_destination(self) self._bound_peer_data(msrpdata) else: raise ResponseUnknownMethod(msrpdata) def _handle_auth(self, msrpdata): auth_challenger = self.relay.auth_challenger if RelayConfig.default_domain == "": realm = msrpdata.headers["To-Path"].decoded[0].host else: realm = RelayConfig.default_domain if not msrpdata.headers.has_key("Authorization"): # If the Authorization header is not present generate challenge data and respond. www_authenticate = auth_challenger.generate_www_authenticate(realm, self.protocol.transport.getPeer().host) raise ResponseUnauthenticated(msrpdata, headers = [WWWAuthenticateHeader(www_authenticate)]) else: authorization = msrpdata.headers["Authorization"].decoded if authorization.get("realm") != realm: raise ResponseUnauthorized(msrpdata, "realm does not match") if authorization.get("qop") != "auth": raise ResponseUnauthorized(msrpdata, "qop != auth") try: username = authorization["username"] session_id = authorization["nonce"] except KeyError, e: raise ResponseUnauthorized(msrpdata, "%s field not present in Authorization header" % e.args[0]) if self.relay.backend.cleartext_passwords: result = self.relay.backend.retrieve_password(username, realm) func = auth_challenger.process_authorization_password else: result = self.relay.backend.retrieve_ha1(username, realm) func = auth_challenger.process_authorization_ha1 result.addCallback(func, "AUTH", msrpdata.headers["To-Path"].encoded.split()[-1], self.protocol.transport.getPeer().host, **authorization) result.addCallbacks(self._cb_login_success, self._eb_login_failed, callbackArgs=[msrpdata, session_id, username, realm], errbackArgs=[msrpdata]) return result def _eb_login_failed(self, failure, msrpdata): failure.trap(LoginFailed) self.auth_attempts += 1 if RelayConfig.log_failed_auth: try: username = msrpdata.headers["Authorization"].decoded["username"] except IndexError: self.log(log.warn, "AUTH failed, no username: %s" % failure.value.args[0]) else: self.log(log.warn, 'AUTH failed for username "%s": %s' % (username, failure.value.args[0])) if self.auth_attempts == RelayConfig.max_auth_attempts: self.disconnect() else: raise ResponseUnauthorized(msrpdata, "Login failed: %s" % failure.value.args[0]) def _cb_login_success(self, authentication_info, msrpdata, session_id, username, realm): # Check the Expires header, if present. if msrpdata.headers.has_key("Expires"): expire = msrpdata.headers["Expires"].decoded if expire < RelayConfig.session_expiration_time_minimum: raise ResponseOutOfBounds(msrpdata, headers = [MinExpiresHeader(RelayConfig.session_expiration_time_minimum)]) if expire > RelayConfig.session_expiration_time_maximum: raise ResponseOutOfBounds(msrpdata, headers = [MaxExpiresHeader(RelayConfig.session_expiration_time_maximum)]) else: expire = RelayConfig.session_expiration_time_default # We got a successful AUTH request, so add a new session # and reply with the the Use-Path. self.state = "UNBOUND" self.invalid_timer.cancel() self.invalid_timer = None from_path = msrpdata.headers["From-Path"].decoded self.path = from_path self.relay.unbound_sessions[session_id] = self.session = Session(self, session_id, expire, username, realm) use_path = copy(from_path) use_path.pop() use_path = list(reversed(use_path)) uri = self.relay.generate_uri() uri.session_id = session_id use_path.append(uri) headers = [UsePathHeader(use_path), ExpiresHeader(expire), AuthenticationInfoHeader(authentication_info)] self.log(log.debug, "AUTH succeeded, creating new session") raise ResponseOK(msrpdata, headers = headers) # methods for a unconnected peer def got_protocol(self, protocol): self.log(log.debug, "Successfully connected") self.state = "ESTABLISHED" self.protocol = protocol if self.forward_other_queue or self.forward_send_queue: self.start_sending() def got_destination(self, other_peer): self.state = "ESTABLISHED" del self.relay.unbound_sessions[self.session.session_id] self.session.destination = other_peer self.other_peer = other_peer # methods for a bound and connected peer def _bound_peer_data(self, msrpdata): try: try: msrpdata.verify_headers() except ParsingError, e: if isinstance(e, HeaderParsingError) and (e.header == "To-Path" or e.header == "From-Path"): self.log(log.error, "Cannot send error response, path headers unreadable") return else: raise ResponseUnintelligible(msrpdata, e.args[0]) to_path = copy(msrpdata.headers["To-Path"].decoded) from_path = copy(msrpdata.headers["From-Path"].decoded) relay_uri = to_path.popleft() if relay_uri.session_id != self.session.session_id: raise ResponseNoSession(msrpdata, "Wrong session id on relay MSRP URI") if len(to_path) == 0 and msrpdata.method not in (None, "AUTH"): raise ResponseNoSession(msrpdata, "Non-response with me as endpoint, nowhere to relay to") for index, uri in enumerate(from_path): if uri != self.path[index]: raise ResponseNoSession(msrpdata, "From-Path does not match session source") if msrpdata.method == "AUTH": if msrpdata.headers.has_key("Expires"): expire = msrpdata.headers["Expires"].decoded if expire < RelayConfig.session_expiration_time_minimum: raise ResponseOutOfBounds(msrpdata, headers=[MinExpiresHeader(RelayConfig.session_expiration_time_minimum)]) if expire > RelayConfig.session_expiration_time_maximum: raise ResponseOutOfBounds(msrpdata, headers=[MaxExpiresHeader(RelayConfig.session_expiration_time_maximum)]) else: expire = RelayConfig.session_expiration_time_default use_path = from_path use_path.pop() use_path = list(reversed(use_path)) use_path.append(relay_uri) headers = [UsePathHeader(use_path), ExpiresHeader(expire)] self.log(log.debug, "Received refreshing AUTH") raise ResponseOK(msrpdata, headers=headers) if self.state == "UNBOUND": if msrpdata.method != "SEND": raise ResponseNoSession(msrpdata, "Non-SEND method received on unbound session") self.got_destination(Peer(path = to_path, session = self.session, other_peer = self)) uri = to_path[0] #self.log(log.debug, "Attempting to connect to %s" % str(uri)) factory = ConnectingFactory(self.other_peer) if uri.use_tls: - self.other_peer.connector = reactor.connectTLS(uri.host, uri.port, factory, self.relay.credentials) + self.other_peer.connector = reactor.connectTLS(uri.host, uri.port, factory, TLSContext(self.relay.credentials)) else: self.other_peer.connector = reactor.connectTCP(uri.host, uri.port, factory) else: for index, uri in enumerate(to_path): if uri != self.other_peer.path[index]: raise ResponseNoSession(msrpdata, "To-Path does not match session destination") if msrpdata.method == "SEND": if not msrpdata.headers.has_key("Message-ID"): raise ResponseUnintelligible(msrpdata, "SEND received without Message-ID") if not msrpdata.headers.has_key("Byte-Range"): raise ResponseUnintelligible(msrpdata, "SEND received without Byte-Range") if msrpdata.method is None: # we got a response if msrpdata.transaction_id in self.other_peer.send_transactions: # Handle response to SEND request with Failure-Report != no forwarding_data, timer = self.other_peer.send_transactions.pop(msrpdata.transaction_id) if timer is not None: timer.cancel() if msrpdata.code != ResponseOK.code: report = generate_report(msrpdata.code, forwarding_data, reason=msrpdata.comment) self.other_peer.enqueue(report.encode()) elif msrpdata.transaction_id in self.other_peer.other_transactions: forwarding_data, timer = self.other_peer.other_transactions.pop(msrpdata.transaction_id) if timer is not None: timer.cancel() forward = msrpdata forward.transaction_id = forwarding_data.msrpdata_received.transaction_id to_path_header = msrpdata.headers["To-Path"] to_path = to_path_header.decoded from_path_header = msrpdata.headers["From-Path"] from_path = from_path_header.decoded from_path.appendleft(to_path.popleft()) to_path_header.decoded = to_path from_path_header.decoded = from_path self.other_peer.enqueue(forward.encode()) else: self.log(log.debug, "Received response for untracked request: %s" % msrpdata) elif msrpdata.method in ("SEND", "REPORT", "NICKNAME") or RelayConfig.allow_other_methods: # Do the magic of appending the first To-Path URI to the From-Path. to_path = copy(msrpdata.headers["To-Path"].decoded) from_path = copy(msrpdata.headers["From-Path"].decoded) from_path.appendleft(to_path.popleft()) forward = MSRPData(generate_transaction_id(), method=msrpdata.method) for header in msrpdata.headers.itervalues(): forward.add_header(MSRPHeader(header.name, header.encoded)) forward.add_header(ToPathHeader(to_path)) forward.add_header(FromPathHeader(from_path)) if msrpdata.method == 'SEND': self.forwarding_data = ForwardingSendData(msrpdata) else: self.forwarding_data = ForwardingData(msrpdata) self.forwarding_data.msrpdata_forward = forward if self.forwarding_data.method == 'SEND': if msrpdata.failure_report != "no": self.send_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, None) self.other_peer.enqueue(self.forwarding_data) else: self.other_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, None) else: raise ResponseUnknownMethod(msrpdata) except ResponseException, e: if msrpdata.method is None: self.log(log.debug, "Caught exception to response: %s (%s)" % (e.__class__.__name__, e.data.comment)) return response = e.data self.enqueue(response.encode()) def _cb_transaction_timeout(self, forward_data): transaction_id = forward_data.msrpdata_forward.transaction_id if forward_data.method == 'SEND': del self.send_transactions[transaction_id] if forward_data.msrpdata_received.failure_report == 'yes': report = generate_report(ResponseDownstreamTimeout.code, forward_data) self.enqueue(report.encode()) else: # We don't really need to do anything here, just remove the request from the mapping del self.other_transactions[transaction_id] def enqueue(self, msrpdata): #self.log(log.debug, "Enqueuing %s" % str(msrpdata)) if isinstance(msrpdata, ForwardingData): self.forward_send_queue.append(msrpdata) else: # a string containing a request or response self.forward_other_queue.append(msrpdata) self._maybe_pause_read() self.start_sending() def start_sending(self): if self.state not in ["CONNECTING", "DISCONNECTED"] and not self.registered: #self.log(log.debug, "Starting transmission") self.registered = True self.protocol.transport.registerProducer(self, False) def _stop_sending(self): #self.log(log.debug, "Empty queues, halting transmission") self.registered = False self.protocol.transport.unregisterProducer() if self.state == "DISCONNECTING": self.state = "DISCONNECTED" self.protocol.transport.loseConnection() def disconnect(self): #self.log(log.debug, "Disconnecting when possible") if self.invalid_timer and self.invalid_timer.active(): self.invalid_timer.cancel() self.invalid_timer = None if self.state == "NEW": self.protocol.transport.loseConnection() self.state = "DISCONNECTED" elif self.state == "UNBOUND": del self.relay.unbound_sessions[self.session.session_id] self.protocol.transport.loseConnection() self.state = "DISCONNECTED" elif self.state == "CONNECTING": self.connector.disconnect() self.state = "DISCONNECTED" elif self.state == "ESTABLISHED": for forwarding_data, timer in self.send_transactions.itervalues(): if timer is not None: timer.cancel() report = generate_report(ResponseDownstreamTimeout.code, forwarding_data, reason="Session got disconnected") self.enqueue(report.encode()) self.send_transactions.clear() for _, timer in self.other_transactions.itervalues(): if timer is not None: timer.cancel() self.other_transactions.clear() if not self.registered: self.protocol.transport.loseConnection() self.state = "DISCONNECTED" else: self.state = "DISCONNECTING" def _cleanup(self): self.session = None self.other_peer = None self.protocol = None self.relay = None for _, timer in self.send_transactions.itervalues(): if timer is not None and timer.active(): timer.cancel() self.send_transactions.clear() for _, timer in self.other_transactions.itervalues(): if timer is not None and timer.active(): timer.cancel() self.other_transactions.clear() self.forward_send_queue.clear() self.forward_other_queue.clear() @property def _data_bytes(self): return sum(data.bytes_in_queue for data in self.forward_send_queue) @property def _message_count(self): return len(self.forward_other_queue) + len(self.forward_send_queue) def _maybe_pause_read(self): if self.state == "ESTABLISHED" and self.other_peer.state == "ESTABLISHED" and not self.other_peer.read_paused: if self._data_bytes > 1024 * 1024 or self._message_count > 50: self.other_peer.pause_read() def _maybe_resume_read(self): if self.state == "ESTABLISHED" and self.other_peer.state == "ESTABLISHED" and self.other_peer.read_paused: if self._data_bytes <= 1024 * 1024 and self._message_count <= 50: self.other_peer.resume_read() def pause_read(self): self.read_paused = True self.protocol.transport.stopReading() def resume_read(self): self.read_paused = False self.protocol.transport.startReading() # methods implemented for IPullProducer def resumeProducing(self): if self.forward_other_queue: if self.forwarding_send_request: #self.log(log.debug, "Sending other message, aborting SEND") data = self.forward_send_queue.popleft() # terminate the current chunk self._send_payload(data.msrpdata_forward.encode_end("+")) timer = reactor.callLater(30, self.other_peer._cb_transaction_timeout, data) self.other_peer.send_transactions[data.msrpdata_forward.transaction_id] = (data, timer) if self.other_peer.forwarding_data is data: self.other_peer.forwarding_data = None # clone the forward data new_data = ForwardingSendData(data.msrpdata_received) new_data.continuation = data.continuation new_data.position = data.position new_data.data_queue, data.data_queue = data.data_queue, deque() new_data.msrpdata_forward = data.msrpdata_forward.clone() # adjust the byterange, create a new transaction new_data.msrpdata_forward.transaction_id = generate_transaction_id() byterange = new_data.msrpdata_forward.headers["Byte-Range"].decoded byterange[0] = new_data.position new_data.msrpdata_forward.headers["Byte-Range"].decoded = byterange self.forward_send_queue.appendleft(new_data) if self.other_peer.forwarding_data is None and new_data.continuation is None: self.other_peer.forwarding_data = new_data self.forwarding_send_request = False else: #self.log(log.debug, "Sending message") self._send_payload(self.forward_other_queue.popleft()) self._maybe_resume_read() elif self.forwarding_send_request: data = self.forward_send_queue[0] if data.continuation is not None and len(data.data_queue) <= 1: #self.log(log.debug, "Sending end of SEND message") self.forward_send_queue.popleft() chunk = data.consume_data() if chunk is not None: self._send_payload(chunk) self._send_payload(data.msrpdata_forward.encode_end(data.continuation)) if data.msrpdata_received.failure_report != 'no': timer = reactor.callLater(30, self.other_peer._cb_transaction_timeout, data) self.other_peer.send_transactions[data.msrpdata_forward.transaction_id] = (data, timer) self.forwarding_send_request = False self._maybe_resume_read() else: chunk = data.consume_data() if chunk is None: self._stop_sending() else: #self.log(log.debug, "Sending data for SEND message") self._send_payload(chunk) data.position += len(chunk) self._maybe_resume_read() elif self.forward_send_queue: #self.log(log.debug, "Sending headers for SEND message") data = self.forward_send_queue[0] self.forwarding_send_request = True self._send_payload(data.msrpdata_forward.encode_start()) else: self._stop_sending() def stopProducing(self): pass def _send_payload(self, data): if self.session is not None: if self is self.session.source: self.session.upstream_bytes += len(data) else: self.session.downstream_bytes += len(data) self.protocol.transport.write(data) class Session(object): def __init__(self, source, session_id, expire, username, realm): self.source = source self.destination = None self.session_id = session_id self.expire = expire self.username = username self.realm = realm self.downstream_bytes = 0 self.upstream_bytes = 0 diff --git a/test/msrp_receive_file.py b/test/msrp_receive_file.py index 3a9d104..a6ac7a8 100644 --- a/test/msrp_receive_file.py +++ b/test/msrp_receive_file.py @@ -1,154 +1,151 @@ #!/usr/bin/env python import sys sys.path.append(".") sys.path.append("..") import time from base64 import b64encode from getpass import getpass from twisted.names.srvconnect import SRVConnector -from twisted.internet.protocol import ClientFactory, Protocol -from twisted.protocols.basic import LineOnlyReceiver +from twisted.internet.protocol import ClientFactory from twisted.internet import reactor -from gnutls.constants import * -from gnutls.crypto import * -from gnutls.errors import * -from gnutls.interfaces.twisted import X509Credentials +from gnutls.interfaces.twisted import TLSContext, X509Credentials from msrp.protocol import * from msrp.digest import process_www_authenticate from msrp.responses import * rand_source = open("/dev/urandom") def generate_transaction_id(): return b64encode(rand_source.read(12), "+-") class MSRPFileReceiverFactory(ClientFactory): protocol = MSRPProtocol def __init__(self, username, password, relay_uri): self.uri = URI("localhost", use_tls = True, port = 12345, session_id = b64encode(rand_source.read(12))) self.username = username self.password = password self.relay_uri = relay_uri self.byte_count = 0 self.bytes_expected = 0 self.do_on_start = None self.do_on_data = None self.do_on_end = None def get_peer(self, protocol): self.protocol = protocol reactor.callLater(0, self._send_auth1) return self def data_start(self, msrpdata): if self.do_on_start: self.do_on_start(msrpdata) def write_chunk(self, data): self.byte_count += len(data) print "received %d of %d bytes" % (self.byte_count, self.bytes_expected) if self.do_on_data: self.do_on_data(data) def data_end(self, continuation): if self.do_on_end: self.do_on_end(continuation) def connection_lost(self, reason): print "Connection lost!" def _send_auth1(self): print "Sending initial AUTH" msrpdata = MSRPData(generate_transaction_id(), method = "AUTH") msrpdata.add_header(ToPathHeader([self.relay_uri])) msrpdata.add_header(FromPathHeader([self.uri])) self.protocol.transport.write(msrpdata.encode()) self.do_on_start = self._send_auth2 def _send_auth2(self, msrpdata): print "Got challenge, sending response AUTH" auth, rsp_auth = process_www_authenticate(self.username, self.password, "AUTH", str(self.relay_uri), **msrpdata.headers["WWW-Authenticate"].decoded) msrpdata = MSRPData(generate_transaction_id(), method = "AUTH") msrpdata.add_header(ToPathHeader([self.relay_uri])) msrpdata.add_header(FromPathHeader([self.uri])) msrpdata.add_header(AuthorizationHeader(auth)) self.protocol.transport.write(msrpdata.encode()) self.do_on_start = self._get_path def _get_path(self, msrpdata): if msrpdata.code != 200: print "Failed to authenticate!" if msrpdata.comment: print msrpdata.comment self.protocol.transport.loseConnection() return sdp_path = list(reversed(msrpdata.headers["Use-Path"].decoded)) + [self.uri] print "Path to send in SDP:\n%s" % " ".join(str(uri) for uri in sdp_path) self.full_to_path = " ".join(str(uri) for uri in msrpdata.headers["Use-Path"].decoded) + " " + raw_input("Destination path: ") self.do_on_start = self._start_time def _start_time(self, msrpdata): filename = msrpdata.headers["Content-Disposition"].decoded[1]["filename"] print 'Receiving file "%s"' % filename self.outfile= open(msrpdata.headers["Content-Disposition"].decoded[1]["filename"], "wb") self.start_time = time.time() total = msrpdata.headers["Byte-Range"].decoded[2] if total: self.bytes_expected = total self.do_on_start = None self.do_on_data = self._receive_data self.do_on_end = self._quit def _receive_data(self, data): self.outfile.write(data) def _quit(self, continuation): if continuation == "$": duration = time.time() - self.start_time speed = self.byte_count / duration / 1024 if self.byte_count == self.bytes_expected: print "File transfer completed successfully." else: print "File transfer aborted prematurely!" print "Received %d bytes in %.0f seconds, (%.2f kb/s)" % (self.byte_count, duration, speed) self.protocol.transport.loseConnection() def clientConnectionFailed(self, connector, err): print "Connection failed" print err.value reactor.callLater(0, reactor.stop) def clientConnectionLost(self, connector, err): print "Connection lost" print err.value reactor.callLater(0, reactor.stop) if __name__ == "__main__": if len(sys.argv) < 2 or len(sys.argv) > 4: print "Usage: %s user@domain [relay-hostname [relay-port]]" % sys.argv[0] print "If the hostname and port are not specified, the MSRP relay will be discovered" print "through the the _msrps._tcp.domain SRV record. If a hostname is specified but" print "no port, the default port of 2855 will be used." else: username, domain = sys.argv[1].split("@", 1) cred = X509Credentials(None, None) cred.verify_peer = False + ctx = TLSContext(cred) password = getpass() if len(sys.argv) == 2: - factory = MSRPFileReceiverFactory(username, password, URI(domain, use_tls = True)) - connector = SRVConnector(reactor, "msrps", domain, factory, connectFuncName = "connectTLS", connectFuncArgs = [cred]) + factory = MSRPFileReceiverFactory(username, password, URI(domain, use_tls=True)) + connector = SRVConnector(reactor, "msrps", domain, factory, connectFuncName="connectTLS", connectFuncArgs=[ctx]) connector.connect() else: relay_host = sys.argv[2] if len(sys.argv) == 4: relay_port = int(sys.argv[3]) else: relay_port = 2855 - factory = MSRPFileReceiverFactory(username, password, URI(relay_host, port = relay_port, use_tls = True)) - reactor.connectTLS(relay_host, relay_port, factory, cred) + factory = MSRPFileReceiverFactory(username, password, URI(relay_host, port=relay_port, use_tls=True)) + reactor.connectTLS(relay_host, relay_port, factory, ctx) reactor.run() diff --git a/test/msrp_send_file.py b/test/msrp_send_file.py index 3323048..8074c5f 100644 --- a/test/msrp_send_file.py +++ b/test/msrp_send_file.py @@ -1,164 +1,161 @@ #!/usr/bin/env python import sys sys.path.append(".") sys.path.append("..") import time import os from base64 import b64encode from getpass import getpass from twisted.names.srvconnect import SRVConnector -from twisted.internet.protocol import ClientFactory, Protocol -from twisted.protocols.basic import LineOnlyReceiver +from twisted.internet.protocol import ClientFactory from twisted.internet import reactor -from gnutls.constants import * -from gnutls.crypto import * -from gnutls.errors import * -from gnutls.interfaces.twisted import X509Credentials +from gnutls.interfaces.twisted import TLSContext, X509Credentials from msrp.protocol import * from msrp.digest import process_www_authenticate from msrp.responses import * rand_source = open("/dev/urandom") BLOCK_SIZE = 64 * 1024 def generate_transaction_id(): return b64encode(rand_source.read(12), "+-") class MSRPFileSenderFactory(ClientFactory): protocol = MSRPProtocol def __init__(self, username, password, relay_uri, infile, filename): self.uri = URI("localhost", use_tls = True, port = 12345, session_id = b64encode(rand_source.read(12))) self.username = username self.password = password self.relay_uri = relay_uri self.infile = infile self.filename = filename self.byte_count = 0 self.do_on_start = None self.do_on_data = None self.do_on_end = None self.start_time = None self.complete = False def get_peer(self, protocol): self.protocol = protocol reactor.callLater(0, self._send_auth1) return self def data_start(self, msrpdata): if self.do_on_start: self.do_on_start(msrpdata) def write_chunk(self, data): self.byte_count += len(data) print "received %d bytes, total %d" % (len(data), self.byte_count) if self.do_on_data: self.do_on_data(data) def data_end(self, continuation): if self.do_on_end: self.do_on_end(continuation) def connection_lost(self, reason): print "Connection lost!" if self.complete: duration = time.time() - self.start_time speed = self.file_size / duration / 1024 print "Sent %d bytes in %.0f seconds, (%.2f kb/s)" % (self.file_size, duration, speed) else: print "File transfer was aborted prematurely." def _send_auth1(self): print "Sending initial AUTH" msrpdata = MSRPData(generate_transaction_id(), method = "AUTH") msrpdata.add_header(ToPathHeader([self.relay_uri])) msrpdata.add_header(FromPathHeader([self.uri])) self.protocol.transport.write(msrpdata.encode()) self.do_on_start = self._send_auth2 def _send_auth2(self, msrpdata): print "Got challenge, sending response AUTH" auth, rsp_auth = process_www_authenticate(self.username, self.password, "AUTH", str(self.relay_uri), **msrpdata.headers["WWW-Authenticate"].decoded) msrpdata = MSRPData(generate_transaction_id(), method = "AUTH") msrpdata.add_header(ToPathHeader([self.relay_uri])) msrpdata.add_header(FromPathHeader([self.uri])) msrpdata.add_header(AuthorizationHeader(auth)) self.protocol.transport.write(msrpdata.encode()) self.do_on_start = self._get_path def _get_path(self, msrpdata): if msrpdata.code != 200: print "Failed to authenticate!" if msrpdata.comment: print msrpdata.comment self.protocol.transport.loseConnection() return sdp_path = list(reversed(msrpdata.headers["Use-Path"].decoded)) + [self.uri] print "Path to send in SDP:\n%s" % " ".join(str(uri) for uri in sdp_path) self.full_to_path = " ".join(str(uri) for uri in msrpdata.headers["Use-Path"].decoded) + " " + raw_input("Destination path: ") print 'Starting transmission of "%s"' % self.filename self.do_on_start = None self.infile.seek(0, 2) self.file_size = self.infile.tell() self.infile.seek(0, 0) msrpdata = MSRPData(generate_transaction_id(), method = "SEND") msrpdata.add_header(FromPathHeader([self.uri])) msrpdata.add_header(MSRPHeader("To-Path", self.full_to_path)) msrpdata.add_header(MessageIDHeader("1")) msrpdata.add_header(ByteRangeHeader([1, self.file_size, self.file_size])) msrpdata.add_header(ContentTypeHeader("binary/octet-stream")) msrpdata.add_header(FailureReportHeader("no")) msrpdata.add_header(ContentDispositionHeader(["attachment", {"filename": self.filename}])) self.start_time = time.time() self.protocol.transport.write(msrpdata.encode_start()) sent = 0 for i in range(0, self.file_size, BLOCK_SIZE): print "sent %d of %d bytes" % (sent, self.file_size) sent += i data = self.infile.read(BLOCK_SIZE) self.protocol.transport.write(data) print "File transfer completed." self.complete = True self.protocol.transport.write(msrpdata.encode_end("$")) def clientConnectionFailed(self, connector, err): print "Connection failed" print err.value reactor.callLater(0, reactor.stop) def clientConnectionLost(self, connector, err): print "Connection lost" print err.value reactor.callLater(0, reactor.stop) if __name__ == "__main__": if len(sys.argv) < 3 or len(sys.argv) > 5: print "Usage: %s infile user@domain [relay-hostname [relay-port]]" % sys.argv[0] print "If the hostname and port are not specified, the MSRP relay will be discovered" print "through the the _msrps._tcp.domain SRV record. If a hostname is specified but" print "no port, the default port of 2855 will be used." else: filename = sys.argv[1] infile = open(filename, "rb") username, domain = sys.argv[2].split("@", 1) cred = X509Credentials(None, None) cred.verify_peer = False + ctx = TLSContext(cred) password = getpass() if len(sys.argv) == 3: factory = MSRPFileSenderFactory(username, password, URI(domain, use_tls = True), infile, filename.split(os.path.sep)[-1]) - connector = SRVConnector(reactor, "msrps", domain, factory, connectFuncName = "connectTLS", connectFuncArgs = [cred]) + connector = SRVConnector(reactor, "msrps", domain, factory, connectFuncName="connectTLS", connectFuncArgs=[ctx]) connector.connect() else: relay_host = sys.argv[3] if len(sys.argv) == 5: relay_port = int(sys.argv[4]) else: relay_port = 2855 factory = MSRPFileSenderFactory(username, password, URI(relay_host, port = relay_port, use_tls = True), infile, filename.split(os.path.sep)[-1]) - reactor.connectTLS(relay_host, relay_port, factory, cred) + reactor.connectTLS(relay_host, relay_port, factory, ctx) reactor.run()