diff --git a/msrp/__init__.py b/msrp/__init__.py index 829af93..97e0206 100644 --- a/msrp/__init__.py +++ b/msrp/__init__.py @@ -1,23 +1,7 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. __version__ = "1.1.0" runtime_directory = "/var/run/msrprelay" system_config_directory = "/etc/msrprelay" configuration_filename = "config.ini" diff --git a/msrp/backend/__init__.py b/msrp/backend/__init__.py index 80f22d9..e69de29 100644 --- a/msrp/backend/__init__.py +++ b/msrp/backend/__init__.py @@ -1,16 +0,0 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. diff --git a/msrp/backend/database.py b/msrp/backend/database.py index 2a84930..28babb4 100644 --- a/msrp/backend/database.py +++ b/msrp/backend/database.py @@ -1,69 +1,53 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from sqlobject import sqlhub, connectionForURI, SQLObject, StringCol from sqlobject.dberrors import Error as SQLObjectError from application.configuration import * from twisted.internet.threads import deferToThread from msrp.digest import LoginFailed from msrp import configuration_filename class Config(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Database' cleartext_passwords = True uri = "mysql://user:pass@db/opensips" subscriber_table = "subscriber" username_col = "username" domain_col = "domain" password_col = "password" ha1_col = "ha1" class Subscribers(SQLObject): class sqlmeta: table = Config.subscriber_table username = StringCol(dbName = Config.username_col) domain = StringCol(dbName = Config.domain_col) password = StringCol(dbName = Config.password_col) ha1 = StringCol(dbName = Config.ha1_col) sqlhub.processConnection = connectionForURI(Config.uri) class Checker(object): def __init__(self): self.cleartext_passwords = Config.cleartext_passwords def _retrieve(self, col, username, domain): try: subscriber = Subscribers.selectBy(username=username, domain=domain)[0] except IndexError: raise LoginFailed("Username not found") except SQLObjectError: raise LoginFailed("Database error") return getattr(subscriber, col) def retrieve_password(self, username, domain): return deferToThread(self._retrieve, "password", username, domain) def retrieve_ha1(self, username, domain): return deferToThread(self._retrieve, "ha1", username, domain) diff --git a/msrp/backend/memory.py b/msrp/backend/memory.py index af4dded..3843218 100644 --- a/msrp/backend/memory.py +++ b/msrp/backend/memory.py @@ -1,39 +1,23 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from application.configuration import * from twisted.internet.defer import succeed, fail from msrp.digest import LoginFailed from msrp import configuration_filename config = ConfigFile(configuration_filename) config.cleartext_passwords = True config.user_db = dict(config.get_section("Memory", default=[])) class Checker(object): def __init__(self): self.cleartext_passwords = config.cleartext_passwords def retrieve_password(self, username, domain): key = "%s@%s" % (username, domain) if key in config.user_db: return succeed(config.user_db[key]) else: return fail(LoginFailed("Username not found")) diff --git a/msrp/backend/sipthor.py b/msrp/backend/sipthor.py index 29cab58..7eb62f7 100644 --- a/msrp/backend/sipthor.py +++ b/msrp/backend/sipthor.py @@ -1,144 +1,128 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import cjson import signal from application import log from application.configuration import ConfigSection, ConfigSetting from application.python.types import Singleton from application.system import host from application.process import process from gnutls.interfaces.twisted import X509Credentials from gnutls.constants import COMP_DEFLATE, COMP_LZO, COMP_NULL from sqlobject import sqlhub, connectionForURI, SQLObject, StringCol, BLOBCol from sqlobject.dberrors import Error as SQLObjectError from twisted.internet.threads import deferToThread from msrp import configuration_filename, __version__ from msrp.digest import LoginFailed from msrp.tls import Certificate, PrivateKey from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from thor.tls import X509NameValidator class Config(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'SIPThor' cleartext_passwords = True uri = "mysql://user:pass@db/sipthor" subscriber_table = "sip_accounts" username_col = "username" domain_col = "domain" profile_col = "profile" multiply = 1000 certificate = ConfigSetting(type=Certificate, value=None) private_key = ConfigSetting(type=PrivateKey, value=None) ca = ConfigSetting(type=Certificate, value=None) class ThorNetworkConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'ThorNetwork' domain = "sipthor-domain" passport = X509NameValidator('O:undefined, OU:undefined') class Subscribers(SQLObject): class sqlmeta: table = Config.subscriber_table username = StringCol(dbName = Config.username_col) domain = StringCol(dbName = Config.domain_col) profile = BLOBCol(dbName = Config.profile_col) sqlhub.processConnection = connectionForURI(Config.uri) class ThorNetworkService(EventServiceClient): __metaclass__ = Singleton topics = ["Thor.Members"] def __init__(self): self.node = ThorEntity(host.default_ip, ['msrprelay_server'], version=__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(Config.certificate, Config.private_key, [Config.ca]) credentials.verify_peer = True credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL) EventServiceClient.__init__(self, ThorNetworkConfig.domain, credentials) process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP) process.signals.add_handler(signal.SIGINT, self._handle_SIGINT) process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM) def handle_event(self, event): # print "Received event: %s" % event networks = self.networks role_map = ThorEntitiesRoleMap(event.message) ## mapping between role names and lists of nodes with that role for role in ["msrprelay_server"]: try: network = networks[role] ## avoid setdefault here because it always evaluates the 2nd argument except KeyError: from thor import network as thor_network network = thor_network.new(Config.multiply) networks[role] = network new_nodes = set([node.ip for node in role_map.get(role, [])]) old_nodes = set(network.nodes) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) plural = len(removed_nodes) != 1 and 's' or '' log.msg("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes))) if added_nodes: for node in added_nodes: network.add_node(node) plural = len(added_nodes) != 1 and 's' or '' log.msg("added %s node%s: %s" % (role, plural, ', '.join(added_nodes))) #print "Thor %s nodes: %s" % (role, str(network.nodes)) class Checker(object): def __init__(self): self.cleartext_passwords = Config.cleartext_passwords self._thor_service = ThorNetworkService() def _retrieve(self, col, username, domain): try: subscriber = Subscribers.selectBy(username=username, domain=domain)[0] except IndexError: raise LoginFailed("Username not found") except SQLObjectError: raise LoginFailed("Database error") try: profile = cjson.decode(subscriber.profile) except cjson.DecodeError: raise LoginFailed("Database JSON error") try: return profile[col] except KeyError: raise LoginFailed("Database profile error") def retrieve_password(self, username, domain): return deferToThread(self._retrieve, "password", username, domain) def retrieve_ha1(self, username, domain): return deferToThread(self._retrieve, "ha1", username, domain) diff --git a/msrp/digest.py b/msrp/digest.py index 5ac71df..8e96ad1 100644 --- a/msrp/digest.py +++ b/msrp/digest.py @@ -1,126 +1,110 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from base64 import b64encode, b64decode from hashlib import md5 from os import urandom from time import time class LoginFailed(Exception): pass def calc_ha1(**parameters): ha1_text = "%(username)s:%(realm)s:%(password)s" % parameters return md5(ha1_text).hexdigest() def calc_ha2_response(**parameters): ha2_text = "%(method)s:%(uri)s" % parameters return md5(ha2_text).hexdigest() def calc_ha2_rspauth(**parameters): ha2_text = ":%(uri)s" % parameters return md5(ha2_text).hexdigest() def calc_hash(**parameters): hash_text = "%(ha1)s:%(nonce)s:%(nc)s:%(cnonce)s:auth:%(ha2)s" % parameters return md5(hash_text).hexdigest() def calc_responses(**parameters): if parameters.has_key("ha1"): ha1 = parameters.pop("ha1") else: ha1 = calc_ha1(**parameters) ha2_response = calc_ha2_response(**parameters) ha2_rspauth = calc_ha2_rspauth(**parameters) response = calc_hash(ha1 = ha1, ha2 = ha2_response, **parameters) rspauth = calc_hash(ha1 = ha1, ha2 = ha2_rspauth, **parameters) return response, rspauth def process_www_authenticate(username, password, method, uri, **parameters): nc = "00000001" cnonce = urandom(16).encode("hex") parameters["username"] = username parameters["password"] = password parameters["method"] = method parameters["uri"] = uri response, rsp_auth = calc_responses(nc = nc, cnonce = cnonce, **parameters) authorization = {} authorization["username"] = username authorization["realm"] = parameters["realm"] authorization["nonce"] = parameters["nonce"] authorization["qop"] = "auth" authorization["nc"] = nc authorization["cnonce"] = cnonce authorization["response"] = response authorization["opaque"] = parameters["opaque"] return authorization, rsp_auth class AuthChallenger(object): def __init__(self, expire_time): self.expire_time = expire_time self.key = urandom(16) def generate_www_authenticate(self, realm, peer_ip): www_authenticate = {} www_authenticate["realm"] = realm www_authenticate["qop"] = "auth" nonce = urandom(16) + "%.3f:%s" % (time(), peer_ip) www_authenticate["nonce"] = b64encode(nonce) opaque = md5(nonce + self.key) www_authenticate["opaque"] = opaque.hexdigest() return www_authenticate def process_authorization_ha1(self, ha1, method, uri, peer_ip, **parameters): parameters["method"] = method parameters["uri"] = uri try: nonce = parameters["nonce"] opaque = parameters["opaque"] response = parameters["response"] except IndexError, e: raise LoginFailed("Parameter not present: %s", e.message) try: expected_response, rspauth = calc_responses(ha1 = ha1, **parameters) except: raise #raise LoginFailed("Parameters error") if response != expected_response: raise LoginFailed("Incorrect password") try: nonce_dec = b64decode(nonce) issued, nonce_ip = nonce_dec[16:].split(":", 1) issued = float(issued) except: raise LoginFailed("Could not decode nonce") if nonce_ip != peer_ip: raise LoginFailed("This challenge was not issued to you") expected_opaque = md5(nonce_dec + self.key).hexdigest() if opaque != expected_opaque: raise LoginFailed("This nonce/opaque combination was not issued by me") if issued + self.expire_time < time(): raise LoginFailed("This challenge has expired") authentication_info = {} authentication_info["qop"] = "auth" authentication_info["cnonce"] = parameters["cnonce"] authentication_info["nc"] = parameters["nc"] authentication_info["rspauth"] = rspauth return authentication_info def process_authorization_password(self, password, method, uri, peer_ip, **parameters): ha1 = calc_ha1(password = password, **parameters) return self.process_authorization_ha1(ha1, method, uri, peer_ip, **parameters) diff --git a/msrp/protocol.py b/msrp/protocol.py index a3c623f..828c392 100644 --- a/msrp/protocol.py +++ b/msrp/protocol.py @@ -1,547 +1,531 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import re from collections import deque from random import getrandbits from twisted.protocols.basic import LineReceiver class MSRPError(Exception): pass class ParsingError(MSRPError): pass class HeaderParsingError(ParsingError): def __init__(self, header): self.header = header ParsingError.__init__(self, "Error parsing %s header" % header) class MSRPHeaderMeta(type): header_classes = {} def __init__(cls, name, bases, dict): type.__init__(cls, name, bases, dict) try: cls.header_classes[dict["name"]] = name except KeyError: pass def _auth_header_quote(name, value, header_name): if name == "qop": if header_name == "WWW-Authenticate": return '"%s"' % value else: return value if name in ("stale", "algorithm", "nc"): return value else: return '"%s"' % value class MSRPHeader(object): __metaclass__ = MSRPHeaderMeta def __new__(cls, name, value): if isinstance(value, str) and name in MSRPHeaderMeta.header_classes: cls = eval(MSRPHeaderMeta.header_classes[name]) return object.__new__(cls) def __init__(self, name, value): self.name = name if isinstance(value, str): self.encoded = value else: self.decoded = value def _raise_error(self): raise HeaderParsingError(self.name) def _get_encoded(self): if self._encoded is None: self._encoded = self._encode(self._decoded) return self._encoded def _set_encoded(self, encoded): self._encoded = encoded self._decoded = None encoded = property(_get_encoded, _set_encoded) def _get_decoded(self): if self._decoded is None: self._decoded = self._decode(self._encoded) return self._decoded def _set_decoded(self, decoded): self._decoded = decoded self._encoded = None decoded = property(_get_decoded, _set_decoded) def _decode(self, encoded): return encoded def _encode(self, decoded): return decoded def clone(self): return self.__class__(self.name, self.encoded) class MSRPNamedHeader(MSRPHeader): def __new__(cls, *args): if len(args) == 1: value = args[0] else: value = args[1] return MSRPHeader.__new__(cls, cls.name, value) def __init__(self, *args): if len(args) == 1: value = args[0] else: value = args[1] MSRPHeader.__init__(self, self.name, value) class URIHeader(MSRPNamedHeader): def _decode(self, encoded): try: return deque(parse_uri(uri) for uri in encoded.split(" ")) except ParsingError: self._raise_error() def _encode(self, decoded): return " ".join([str(uri) for uri in decoded]) class IntegerHeader(MSRPNamedHeader): def _decode(self, encoded): try: return int(encoded) except ValueError: self._raise_error() def _encode(self, decoded): return str(decoded) class DigestHeader(MSRPNamedHeader): def _decode(self, encoded): try: algo, params = encoded.split(" ", 1) except ValueError: self._raise_error() if algo != "Digest": self._raise_error() try: param_dict = dict((x.strip('"') for x in param.split("=", 1)) for param in params.split(", ")) except: self._raise_error() return param_dict def _encode(self, decoded): return "Digest " + ", ".join(['%s=%s' % (name, _auth_header_quote(name, value, self.name)) for name, value in decoded.iteritems()]) class ToPathHeader(URIHeader): name = "To-Path" class FromPathHeader(URIHeader): name = "From-Path" class MessageIDHeader(MSRPNamedHeader): name = "Message-ID" class SuccessReportHeader(MSRPNamedHeader): name = "Success-Report" def _decode(self, encoded): if encoded not in ["yes", "no"]: self._raise_error() return encoded class FailureReportHeader(MSRPNamedHeader): name = "Failure-Report" def _decode(self, encoded): if encoded not in ["yes", "no", "partial"]: self._raise_error() return encoded class ByteRangeHeader(MSRPNamedHeader): name = "Byte-Range" def _decode(self, encoded): try: rest, total = encoded.split("/") fro, to = rest.split("-") fro = int(fro) except ValueError: self._raise_error() try: to = int(to) except ValueError: if to != "*": self._raise_error() to = None try: total = int(total) except ValueError: if total != "*": self._raise_error() total = None return [fro, to, total] def _encode(self, decoded): fro, to, total = decoded if to is None: to = "*" if total is None: total = "*" return "%s-%s/%s" % (fro, to, total) class StatusHeader(MSRPNamedHeader): name = "Status" def _decode(self, encoded): try: namespace, rest = encoded.split(" ", 1) except ValueError: self._raise_error() if namespace != "000": self._raise_error() rest_sp = rest.split(" ", 1) try: if len(rest_sp[0]) != 3: raise ValueError code = int(rest_sp[0]) except ValueError: self._raise_error() try: comment = rest_sp[1] except IndexError: comment = None return (code, comment) def _encode(self, decoded): code, comment = decoded encoded = "000 %03d" % code if comment is not None: encoded += " %s" % comment return encoded class ExpiresHeader(IntegerHeader): name = "Expires" class MinExpiresHeader(IntegerHeader): name = "Min-Expires" class MaxExpiresHeader(IntegerHeader): name = "Max-Expires" class UsePathHeader(URIHeader): name = "Use-Path" class WWWAuthenticateHeader(DigestHeader): name = "WWW-Authenticate" class AuthorizationHeader(DigestHeader): name = "Authorization" class AuthenticationInfoHeader(MSRPNamedHeader): name = "Authentication-Info" def _decode(self, encoded): try: param_dict = dict((x.strip('"') for x in param.split("=", 1)) for param in encoded.split(", ")) except: self._raise_error() return param_dict def _encode(self, decoded): return ", ".join(['%s=%s' % (name, _auth_header_quote(name, value, self.name)) for name, value in decoded.iteritems()]) class ContentTypeHeader(MSRPNamedHeader): name = "Content-Type" class ContentIDHeader(MSRPNamedHeader): name = "Content-ID" class ContentDescriptionHeader(MSRPNamedHeader): name = "Content-Description" class ContentDispositionHeader(MSRPNamedHeader): name = "Content-Disposition" def _decode(self, encoded): try: sp = encoded.split(";") disposition = sp[0] parameters = dict(param.split("=", 1) for param in sp[1:]) except: self._raise_error() return [disposition, parameters] def _encode(self, decoded): disposition, parameters = decoded return ";".join([disposition] + ["%s=%s" % pair for pair in parameters.iteritems()]) class MSRPData(object): def __init__(self, transaction_id, method=None, code=None, comment=None, data=''): self.transaction_id = transaction_id self.method = method self.code = code self.comment = comment self.data = data self.headers = {} def __str__(self): if self.method is None: description = "MSRP response: %03d" % self.code if self.comment is not None: description += " %s" % self.comment else: description = "MSRP %s request" % self.method return description def add_header(self, header): self.headers[header.name] = header def verify_headers(self): try: # Decode To-/From-path headers first to be able to send responses self.headers["To-Path"].decoded self.headers["From-Path"].decoded except KeyError, e: raise HeaderParsingError(e.args[0]) for header in self.headers.itervalues(): header.decoded @property def failure_report(self): if "Failure-Report" in self.headers: return self.headers["Failure-Report"].decoded else: return "yes" @property def success_report(self): if "Success-Report" in self.headers: return self.headers["Success-Report"].decoded else: return "no" def encode_start(self): data = [] if self.method is not None: data.append("MSRP %(transaction_id)s %(method)s" % self.__dict__) else: data.append("MSRP %(transaction_id)s %(code)03d" % self.__dict__ + (self.comment is not None and " %s" % self.comment or "")) headers = self.headers.copy() data.append("To-Path: %s" % headers.pop("To-Path").encoded) data.append("From-Path: %s" % headers.pop("From-Path").encoded) for hnameval in [(hname, headers.pop(hname).encoded) for hname in headers.keys() if not hname.startswith("Content-")]: data.append("%s: %s" % hnameval) for hnameval in [(hname, headers.pop(hname).encoded) for hname in headers.keys() if hname != "Content-Type"]: data.append("%s: %s" % hnameval) if len(headers) > 0: data.append("Content-Type: %s" % headers["Content-Type"].encoded) data.append("") data.append("") return "\r\n".join(data) def encode_end(self, continuation): return "\r\n-------%s%s\r\n" % (self.transaction_id, continuation) def encode(self, continuation='$'): return self.encode_start() + self.data + self.encode_end(continuation) def clone(self): other = self.__class__(self.transaction_id, self.method, self.code, self.comment, self.data) for header in self.headers.itervalues(): other.add_header(header.clone()) return other class MSRPProtocol(LineReceiver): MAX_LENGTH = 16384 MAX_LINES = 64 def __init__(self): self.peer = None self._reset() def _reset(self): self.data = None self.line_count = 0 def connectionMade(self): self.peer = self.factory.get_peer(self) def lineReceived(self, line): if self.data: if len(line) == 0: self.term_buf_len = 12 + len(self.data.transaction_id) self.term_buf = "" self.term = re.compile("^(.*)\r\n-------%s([$#+])\r\n(.*)$" % re.escape(self.data.transaction_id), re.DOTALL) self.peer.data_start(self.data) self.setRawMode() else: match = self.term.match(line) if match: continuation = match.group(1) self.peer.data_start(self.data) self.peer.data_end(continuation) self._reset() else: self.line_count += 1 if self.line_count > self.MAX_LINES: self._reset() return try: hname, hval = line.split(": ", 2) except ValueError: return # let this pass silently, we'll just not read this line else: self.data.add_header(MSRPHeader(hname, hval)) else: # we received a new message try: msrp, transaction_id, rest = line.split(" ", 2) except ValueError: return # drop connection? if msrp != "MSRP": return # drop connection? method, code, comment = None, None, None rest_sp = rest.split(" ", 1) try: if len(rest_sp[0]) != 3: raise ValueError code = int(rest_sp[0]) except ValueError: # we have a request method = rest_sp[0] else: # we have a response if len(rest_sp) > 1: comment = rest_sp[1] self.data = MSRPData(transaction_id, method, code, comment) self.term = re.compile("^-------%s([$#+])$" % re.escape(transaction_id)) def lineLengthExceeded(self, line): self._reset() def rawDataReceived(self, data): match_data = self.term_buf + data match = self.term.match(match_data) if match: # we got the last data for this message contents, continuation, extra = match.groups() contents = contents[len(self.term_buf):] if contents: self.peer.write_chunk(contents) self.peer.data_end(continuation) self._reset() self.setLineMode(extra) else: self.peer.write_chunk(data) self.term_buf = match_data[-self.term_buf_len:] def connectionLost(self, reason): if self.peer: self.peer.connection_lost(reason.value) _re_uri = re.compile("^(?P.*?)://(((?P.*?)@)?(?P.*?)(:(?P[0-9]+?))?)(/(?P.*?))?;(?P.*?)(;(?P.*))?$") def parse_uri(uri_str): match = _re_uri.match(uri_str) if match is None: raise ParsingError("Cannot parse URI") uri_params = match.groupdict() if uri_params["port"] is not None: uri_params["port"] = int(uri_params["port"]) if uri_params["parameters"] is not None: try: uri_params["parameters"] = dict(param.split("=") for param in uri_params["parameters"].split(";")) except ValueError: raise ParsingError("Cannot parse URI parameters") scheme = uri_params.pop("scheme") if scheme == "msrp": uri_params["use_tls"] = False elif scheme == "msrps": uri_params["use_tls"] = True else: raise ParsingError("Invalid scheme user in URI: %s" % scheme) if uri_params["transport"] != "tcp": raise ParsingError('Invalid transport in URI, only "tcp" is accepted: %s' % uri_params["transport"]) return URI(**uri_params) class URI(object): def __init__(self, host, use_tls = False, user = None, port = None, session_id = None, transport = "tcp", parameters = None): self.use_tls = use_tls self.user = user self.host = host self.port = port self.session_id = session_id self.transport = transport if parameters is None: self.parameters = {} else: self.parameters = parameters def __str__(self): uri_str = [] if self.use_tls: uri_str.append("msrps://") else: uri_str.append("msrp://") if self.user: uri_str.extend([self.user, "@"]) uri_str.append(self.host) if self.port: uri_str.extend([":", str(self.port)]) if self.session_id: uri_str.extend(["/", self.session_id]) uri_str.extend([";", self.transport]) for key, value in self.parameters.iteritems(): uri_str.extend([";", key, "=", value]) return "".join(uri_str) def __eq__(self, other): """MSRP URI comparison according to section 6.1 of RFC 4975""" if self is other: return True if self.use_tls != other.use_tls: return False if self.host.lower() != other.host.lower(): return False if self.port != other.port: return False if self.session_id != other.session_id: return False if self.transport.lower() != other.transport.lower(): return False return True def __ne__(self, other): return not self == other def generate_transaction_id(): return '%024x' % getrandbits(96) diff --git a/msrp/relay.py b/msrp/relay.py index 1d3e427..b6d4314 100644 --- a/msrp/relay.py +++ b/msrp/relay.py @@ -1,795 +1,779 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from copy import copy from collections import deque from application import log from application.configuration import * from application.configuration.datatypes import NetworkAddress, LogLevel from application.python.types import Singleton from application.system import host from zope.interface import implements from twisted.internet.defer import maybeDeferred from twisted.internet import reactor from twisted.internet.protocol import Factory, ClientFactory from twisted.internet.interfaces import IPullProducer from gnutls.constants import * from gnutls.interfaces.twisted import X509Credentials from msrp.tls import Certificate, PrivateKey from msrp.protocol import * from msrp.digest import AuthChallenger, LoginFailed from msrp.responses import * from msrp import configuration_filename class RelayConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Relay' address = ConfigSetting(type=NetworkAddress, value=NetworkAddress("0.0.0.0:2855")) hostname = "" default_domain = "" allow_other_methods = True session_expiration_time_minimum = 60 session_expiration_time_default = 600 session_expiration_time_maximum = 3600 auth_challenge_expiration_time = 15 backend = "database" max_auth_attempts = 3 debug_notls = False log_failed_auth = False certificate = ConfigSetting(type=Certificate, value=None) key = ConfigSetting(type=PrivateKey, value=None) log_level = ConfigSetting(type=LogLevel, value=log.level.DEBUG) class Relay(object): __metaclass__ = Singleton def __init__(self): self.unbound_sessions = {} self._do_init() def _do_init(self): self.listener = None log.level.current = RelayConfig.log_level self.backend = __import__("msrp.backend.%s" % RelayConfig.backend.lower(), globals(), locals(), [""]).Checker() if not RelayConfig.debug_notls: if RelayConfig.certificate is None: raise RuntimeError("TLS certificate file is not specified in configuration or is invalid") if RelayConfig.key is None: raise RuntimeError("TLS private key file is not specified in configuration or is invalid") self.credentials = X509Credentials(RelayConfig.certificate, RelayConfig.key) self.credentials.session_params.protocols = (PROTO_TLS1_1, PROTO_TLS1_0) self.credentials.session_params.kx_algorithms = (KX_RSA,) self.credentials.session_params.ciphers = (CIPHER_AES_128_CBC,) self.credentials.session_params.mac_algorithms = (MAC_SHA1,) self.credentials.verify_peer = False if RelayConfig.hostname != "": self.hostname = RelayConfig.hostname if not RelayConfig.debug_notls: def matches(hostname, pattern): if pattern.startswith('*.'): return hostname.endswith(pattern[1:]) else: return hostname == pattern if not any(matches(self.hostname, name) for name in RelayConfig.certificate.alternative_names.dns): raise RuntimeError('The specified MSRP Relay hostname "%s" is not set as DNS subject alternative name in the TLS certificate.' % self.hostname) elif not RelayConfig.debug_notls: self.hostname = RelayConfig.certificate.alternative_names.dns[0] # Just grab the first one? elif RelayConfig.address[0] != "0.0.0.0": self.hostname = RelayConfig.address[0] else: self.hostname = host.default_ip self.auth_challenger = AuthChallenger(RelayConfig.auth_challenge_expiration_time) def _do_run(self): if RelayConfig.debug_notls: self.listener = reactor.listenTCP(RelayConfig.address[1], RelayFactory(), interface=RelayConfig.address[0]) else: self.listener = reactor.listenTLS(RelayConfig.address[1], RelayFactory(), self.credentials, interface=RelayConfig.address[0]) def run(self): self._do_run() reactor.run() def reload(self): log.debug("Reloading configuration file") RelayConfig.reset() RelayConfig.read() if not self.listener: try: self._do_init() except RuntimeError, e: log.fatal("Error reloading configuration file: %s" % e) reactor.stop() else: result = self.listener.stopListening() result.addCallback(lambda x: self._do_init()) result.addCallbacks(lambda x: self._do_run(), self._reload_failure) def _reload_failure(self, failure): failure.trap(RuntimeError) log.fatal("Error reloading configuration file: %s" % failure.value) reactor.stop() def generate_uri(self): return URI("%s" % self.hostname, port = RelayConfig.address[1], use_tls = not RelayConfig.debug_notls) class RelayFactory(Factory): protocol = MSRPProtocol noisy = False def get_peer(self, protocol): peer = Peer(protocol = protocol) return peer class ConnectingFactory(ClientFactory): protocol = MSRPProtocol noisy = False def __init__(self, peer): self.peer = peer def get_peer(self, protocol): self.peer.got_protocol(protocol) return self.peer def clientConnectionFailed(self, connector, reason): self.peer.connection_failed(reason.value) class ForwardingData(object): def __init__(self, msrpdata): self.msrpdata_received = msrpdata self.msrpdata_forward = None self.bytes_received = 0 self.data_queue = deque() self.continuation = None def __str__(self): return str(self.msrpdata_received) @property def method(self): return self.msrpdata_received.method @property def bytes_in_queue(self): return sum(len(data) for data in self.data_queue) def add_data(self, data): self.bytes_received += len(data) self.data_queue.append(data) def consume_data(self): if self.data_queue: return self.data_queue.popleft() else: return None class ForwardingSendData(ForwardingData): def __init__(self, msrpdata): super(ForwardingSendData, self).__init__(msrpdata) self.position = msrpdata.headers["Byte-Range"].decoded[0] # A session always exists of two peer instances that are associated with # eachother. # # A Peer instance has an attribute state, which can be one of the following: # # NEW: # A newly created incoming connection. If an AUTH is received and successfully # processed this creates a new session and moves the Peer into the UNBOUND # state. Alternatively, a SEND for an existing session can be received, which # directly binds the Peer with the session and moves it into ESTABLISHED. # # UNBOUND: # A newly created session which does not yet have another endpoint associated # with it. This association can occur either through a new outgoing connection # when the client that performed the AUTH sends a SEND message or when a new # SEND message is received. # # CONNECTING: # An attempt at a new outgoing connection. Messages can already be queued on it. # When the protocol is connected the Peer will move into ESTABLISHED. # # ESTABLISHED: # The two endpoints for the session are known and messages can be passed through # in either direction. When the connection to this peer is lost, the Peer will # move into DISCONNECTED, when the connection to the other peer is lost it will # move into DISCONNECTING. # # DISCONNECTING: # The connection to the other peer is lost and relay has to send REPORTs for # queued messages that require it. After this it will disconnect itself. # # DISCONNECTED: # When the peer is disconnected, which can happen at any time, inform the other # associated peer. This is always the end state. # # | | # V V # +-----+ +------------+ # +-----| NEW |---------+ +--------| CONNECTING |-----+ # | +-----+ | | +------------+ | # | | | | | # | V V V | # | +---------+ +-------------+ +---------------+ | # |<--| UNBOUND |-->| ESTABLISHED |-->| DISCONNECTING |-->| # | +---------+ +-------------+ +---------------+ | # | | | # | V | # | +--------------+ | # +---------------->| DISCONNECTED |<---------------------+ # +--------------+ class Peer(object): implements(IPullProducer) def __init__(self, session = None, protocol = None, path = None, other_peer = None): self.session = session self.protocol = protocol self.other_peer = other_peer self.path = path if self.protocol is not None: self.state = "NEW" self.auth_attempts = 0 self.invalid_timer = reactor.callLater(30, self._cb_invalid) else: self.invalid_timer = None self.state = "CONNECTING" self.send_transactions = {} self.other_transactions = {} # Other requests not REPORT or SEND self.forwarding_data = None # transmission attributes self.registered = False self.forwarding_send_request = False self.forward_send_queue = deque() self.forward_other_queue = deque() self.read_paused = False self.relay = Relay() def __str__(self): if self.session is None: address = self.protocol.transport.getPeer() return "%s:%d (%s)" % (address.host, address.port, self.state) else: return "session %s for %s@%s (%s)" % (self.session.session_id, self.session.username, self.session.realm, self.state) def log(self, log_func, msg): log_func("%s: %s" % (str(self), msg)) # called by MSRPProtocol def data_start(self, msrpdata): #self.log(log.debug, "Received headers for %s" % str(msrpdata)) if self.state == "NEW": result = maybeDeferred(self._unbound_peer_data, msrpdata) result.addErrback(self._cb_catch_response, msrpdata) else: self._bound_peer_data(msrpdata) def write_chunk(self, chunk): #self.log(log.debug, "Received %d bytes of MSRP body" % len(chunk)) if self.state == "ESTABLISHED" and self.forwarding_data: self.forwarding_data.add_data(chunk) if self.forwarding_data.method != 'SEND' and self.forwarding_data.bytes_in_queue > 10240: self.log(log.debug, "Non-SEND request's payload bigger than 10KB, closing connection") del self.other_transactions[self.forwarding_data.msrpdata_forward.transaction_id] self.forwarding_data = None self.protocol.transport.loseConnection() return self.other_peer._maybe_pause_read() self.other_peer.start_sending() def data_end(self, continuation): #self.log(log.debug, "Received termination \"%s\"" % continuation) if self.state == "ESTABLISHED" and self.forwarding_data: msrpdata = self.forwarding_data.msrpdata_received if msrpdata.method == "SEND": if msrpdata.failure_report != "no": if msrpdata.failure_report == "yes": self.enqueue(ResponseOK(msrpdata).data.encode()) self.send_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, None) self.forwarding_data.continuation = continuation else: if msrpdata.method != 'REPORT' and msrpdata.failure_report != 'no': # Keep track, for matching replies timer = reactor.callLater(30, self._cb_transaction_timeout, self.forwarding_data) self.other_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, timer) # For methods other than SEND, assemble the chunk and don't use ForwardingData self.forwarding_data.msrpdata_forward.data = ''.join(self.forwarding_data.data_queue) self.other_peer.enqueue(self.forwarding_data.msrpdata_forward.encode(continuation)) self.other_peer.start_sending() self.forwarding_data = None def connection_lost(self, reason): self.log(log.debug, "Connection lost: %s" % reason) if self.invalid_timer and self.invalid_timer.active(): self.invalid_timer.cancel() self.invalid_timer = None if self.state == "ESTABLISHED": self.other_peer.disconnect() if self.state == "UNBOUND": del self.relay.unbound_sessions[self.session.session_id] self.state = "DISCONNECTED" if self.session is not None and self is self.session.source: self.log(log.debug, "bytes sent: %d, bytes received: %d" % (self.session.upstream_bytes, self.session.downstream_bytes)) self._cleanup() # called by ConnectingFactory def connection_failed(self, reason): self.log(log.warn, "Connection failed: %s" % reason) if self.state == "CONNECTING": self.other_peer.disconnect() self._cleanup() # methods for an unbound peer def _cb_invalid(self): self.log(log.warn, "No valid MSRP message received, disconnecting") self.invalid_timer = None self.disconnect() def _cb_catch_response(self, failure, msrpdata): failure.trap(ResponseException) if msrpdata.method is None: self.log(log.warn, "Caught exception to response: %s (%s)" % (failure.value.__class__.__name__, failure.value.data.comment)) return response = failure.value.data #self.log(log.debug, "Sending response %03d (%s)" % (response.code, response.comment)) self.enqueue(response.encode()) def _unbound_peer_data(self, msrpdata): try: msrpdata.verify_headers() except ParsingError, e: if isinstance(e, HeaderParsingError) and (e.header == "To-Path" or e.header == "From-Path"): self.log(log.warn, "Cannot send error response, path headers unreadable") return else: raise ResponseUnintelligible(msrpdata, e.args[0]) # Check if To-Path is really directed to us. to_path = msrpdata.headers["To-Path"].decoded from_path = msrpdata.headers["From-Path"].decoded if msrpdata.method == "AUTH" and len(to_path) == 1: return self._handle_auth(msrpdata) elif msrpdata.method == "SEND" and len(to_path) > 1: session_id = to_path[0].session_id try: session = self.relay.unbound_sessions[session_id] except KeyError: raise ResponseNoSession(msrpdata) self.log(log.debug, "Found matching unbound session %s" % session.session_id) self.state = "ESTABLISHED" self.invalid_timer.cancel() self.invalid_timer = None self.session = session self.path = from_path self.other_peer = self.session.source self.other_peer.got_destination(self) self._bound_peer_data(msrpdata) else: raise ResponseUnknownMethod(msrpdata) def _handle_auth(self, msrpdata): auth_challenger = self.relay.auth_challenger if RelayConfig.default_domain == "": realm = msrpdata.headers["To-Path"].decoded[0].host else: realm = RelayConfig.default_domain if not msrpdata.headers.has_key("Authorization"): # If the Authorization header is not present generate challenge data and respond. www_authenticate = auth_challenger.generate_www_authenticate(realm, self.protocol.transport.getPeer().host) raise ResponseUnauthenticated(msrpdata, headers = [WWWAuthenticateHeader(www_authenticate)]) else: authorization = msrpdata.headers["Authorization"].decoded if authorization.get("realm") != realm: raise ResponseUnauthorized(msrpdata, "realm does not match") if authorization.get("qop") != "auth": raise ResponseUnauthorized(msrpdata, "qop != auth") try: username = authorization["username"] session_id = authorization["nonce"] except KeyError, e: raise ResponseUnauthorized(msrpdata, "%s field not present in Authorization header" % e.args[0]) if self.relay.backend.cleartext_passwords: result = self.relay.backend.retrieve_password(username, realm) func = auth_challenger.process_authorization_password else: result = self.relay.backend.retrieve_ha1(username, realm) func = auth_challenger.process_authorization_ha1 result.addCallback(func, "AUTH", msrpdata.headers["To-Path"].encoded.split()[-1], self.protocol.transport.getPeer().host, **authorization) result.addCallbacks(self._cb_login_success, self._eb_login_failed, callbackArgs=[msrpdata, session_id, username, realm], errbackArgs=[msrpdata]) return result def _eb_login_failed(self, failure, msrpdata): failure.trap(LoginFailed) self.auth_attempts += 1 if RelayConfig.log_failed_auth: try: username = msrpdata.headers["Authorization"].decoded["username"] except IndexError: self.log(log.warn, "AUTH failed, no username: %s" % failure.value.args[0]) else: self.log(log.warn, 'AUTH failed for username "%s": %s' % (username, failure.value.args[0])) if self.auth_attempts == RelayConfig.max_auth_attempts: self.disconnect() else: raise ResponseUnauthorized(msrpdata, "Login failed: %s" % failure.value.args[0]) def _cb_login_success(self, authentication_info, msrpdata, session_id, username, realm): # Check the Expires header, if present. if msrpdata.headers.has_key("Expires"): expire = msrpdata.headers["Expires"].decoded if expire < RelayConfig.session_expiration_time_minimum: raise ResponseOutOfBounds(msrpdata, headers = [MinExpiresHeader(RelayConfig.session_expiration_time_minimum)]) if expire > RelayConfig.session_expiration_time_maximum: raise ResponseOutOfBounds(msrpdata, headers = [MaxExpiresHeader(RelayConfig.session_expiration_time_maximum)]) else: expire = RelayConfig.session_expiration_time_default # We got a successful AUTH request, so add a new session # and reply with the the Use-Path. self.state = "UNBOUND" self.invalid_timer.cancel() self.invalid_timer = None from_path = msrpdata.headers["From-Path"].decoded self.path = from_path self.relay.unbound_sessions[session_id] = self.session = Session(self, session_id, expire, username, realm) use_path = copy(from_path) use_path.pop() use_path = list(reversed(use_path)) uri = self.relay.generate_uri() uri.session_id = session_id use_path.append(uri) headers = [UsePathHeader(use_path), ExpiresHeader(expire), AuthenticationInfoHeader(authentication_info)] self.log(log.debug, "AUTH succeeded, creating new session") raise ResponseOK(msrpdata, headers = headers) # methods for a unconnected peer def got_protocol(self, protocol): self.log(log.debug, "Successfully connected") self.state = "ESTABLISHED" self.protocol = protocol if self.forward_other_queue or self.forward_send_queue: self.start_sending() def got_destination(self, other_peer): self.state = "ESTABLISHED" del self.relay.unbound_sessions[self.session.session_id] self.session.destination = other_peer self.other_peer = other_peer # methods for a bound and connected peer def _bound_peer_data(self, msrpdata): try: try: msrpdata.verify_headers() except ParsingError, e: if isinstance(e, HeaderParsingError) and (e.header == "To-Path" or e.header == "From-Path"): self.log(log.error, "Cannot send error response, path headers unreadable") return else: raise ResponseUnintelligible(msrpdata, e.args[0]) to_path = copy(msrpdata.headers["To-Path"].decoded) from_path = copy(msrpdata.headers["From-Path"].decoded) relay_uri = to_path.popleft() if relay_uri.session_id != self.session.session_id: raise ResponseNoSession(msrpdata, "Wrong session id on relay MSRP URI") if len(to_path) == 0 and msrpdata.method not in (None, "AUTH"): raise ResponseNoSession(msrpdata, "Non-response with me as endpoint, nowhere to relay to") for index, uri in enumerate(from_path): if uri != self.path[index]: raise ResponseNoSession(msrpdata, "From-Path does not match session source") if msrpdata.method == "AUTH": if msrpdata.headers.has_key("Expires"): expire = msrpdata.headers["Expires"].decoded if expire < RelayConfig.session_expiration_time_minimum: raise ResponseOutOfBounds(msrpdata, headers=[MinExpiresHeader(RelayConfig.session_expiration_time_minimum)]) if expire > RelayConfig.session_expiration_time_maximum: raise ResponseOutOfBounds(msrpdata, headers=[MaxExpiresHeader(RelayConfig.session_expiration_time_maximum)]) else: expire = RelayConfig.session_expiration_time_default use_path = from_path use_path.pop() use_path = list(reversed(use_path)) use_path.append(relay_uri) headers = [UsePathHeader(use_path), ExpiresHeader(expire)] self.log(log.debug, "Received refreshing AUTH") raise ResponseOK(msrpdata, headers=headers) if self.state == "UNBOUND": if msrpdata.method != "SEND": raise ResponseNoSession(msrpdata, "Non-SEND method received on unbound session") self.got_destination(Peer(path = to_path, session = self.session, other_peer = self)) uri = to_path[0] #self.log(log.debug, "Attempting to connect to %s" % str(uri)) factory = ConnectingFactory(self.other_peer) if uri.use_tls: self.other_peer.connector = reactor.connectTLS(uri.host, uri.port, factory, self.relay.credentials) else: self.other_peer.connector = reactor.connectTCP(uri.host, uri.port, factory) else: for index, uri in enumerate(to_path): if uri != self.other_peer.path[index]: raise ResponseNoSession(msrpdata, "To-Path does not match session destination") if msrpdata.method == "SEND": if not msrpdata.headers.has_key("Message-ID"): raise ResponseUnintelligible(msrpdata, "SEND received without Message-ID") if not msrpdata.headers.has_key("Byte-Range"): raise ResponseUnintelligible(msrpdata, "SEND received without Byte-Range") if msrpdata.method is None: # we got a response if msrpdata.transaction_id in self.other_peer.send_transactions: # Handle response to SEND request with Failure-Report != no forwarding_data, timer = self.other_peer.send_transactions.pop(msrpdata.transaction_id) if timer is not None: timer.cancel() if msrpdata.code != ResponseOK.code: report = generate_report(msrpdata.code, forwarding_data, reason=msrpdata.comment) self.other_peer.enqueue(report.encode()) elif msrpdata.transaction_id in self.other_peer.other_transactions: forwarding_data, timer = self.other_peer.other_transactions.pop(msrpdata.transaction_id) if timer is not None: timer.cancel() forward = msrpdata forward.transaction_id = forwarding_data.msrpdata_received.transaction_id to_path_header = msrpdata.headers["To-Path"] to_path = to_path_header.decoded from_path_header = msrpdata.headers["From-Path"] from_path = from_path_header.decoded from_path.appendleft(to_path.popleft()) to_path_header.decoded = to_path from_path_header.decoded = from_path self.other_peer.enqueue(forward.encode()) else: self.log(log.debug, "Received response for untracked request: %s" % msrpdata) elif msrpdata.method in ("SEND", "REPORT", "NICKNAME") or RelayConfig.allow_other_methods: # Do the magic of appending the first To-Path URI to the From-Path. to_path = copy(msrpdata.headers["To-Path"].decoded) from_path = copy(msrpdata.headers["From-Path"].decoded) from_path.appendleft(to_path.popleft()) forward = MSRPData(generate_transaction_id(), method=msrpdata.method) for header in msrpdata.headers.itervalues(): forward.add_header(MSRPHeader(header.name, header.encoded)) forward.add_header(ToPathHeader(to_path)) forward.add_header(FromPathHeader(from_path)) if msrpdata.method == 'SEND': self.forwarding_data = ForwardingSendData(msrpdata) else: self.forwarding_data = ForwardingData(msrpdata) self.forwarding_data.msrpdata_forward = forward if self.forwarding_data.method == 'SEND': if msrpdata.failure_report != "no": self.send_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, None) self.other_peer.enqueue(self.forwarding_data) else: self.other_transactions[self.forwarding_data.msrpdata_forward.transaction_id] = (self.forwarding_data, None) else: raise ResponseUnknownMethod(msrpdata) except ResponseException, e: if msrpdata.method is None: self.log(log.debug, "Caught exception to response: %s (%s)" % (e.__class__.__name__, e.data.comment)) return response = e.data self.enqueue(response.encode()) def _cb_transaction_timeout(self, forward_data): transaction_id = forward_data.msrpdata_forward.transaction_id if forward_data.method == 'SEND': del self.send_transactions[transaction_id] if forward_data.msrpdata_received.failure_report == 'yes': report = generate_report(ResponseDownstreamTimeout.code, forward_data) self.enqueue(report.encode()) else: # We don't really need to do anything here, just remove the request from the mapping del self.other_transactions[transaction_id] def enqueue(self, msrpdata): #self.log(log.debug, "Enqueuing %s" % str(msrpdata)) if isinstance(msrpdata, ForwardingData): self.forward_send_queue.append(msrpdata) else: # a string containing a request or response self.forward_other_queue.append(msrpdata) self._maybe_pause_read() self.start_sending() def start_sending(self): if self.state not in ["CONNECTING", "DISCONNECTED"] and not self.registered: #self.log(log.debug, "Starting transmission") self.registered = True self.protocol.transport.registerProducer(self, False) def _stop_sending(self): #self.log(log.debug, "Empty queues, halting transmission") self.registered = False self.protocol.transport.unregisterProducer() if self.state == "DISCONNECTING": self.state = "DISCONNECTED" self.protocol.transport.loseConnection() def disconnect(self): #self.log(log.debug, "Disconnecting when possible") if self.invalid_timer and self.invalid_timer.active(): self.invalid_timer.cancel() self.invalid_timer = None if self.state == "NEW": self.protocol.transport.loseConnection() self.state = "DISCONNECTED" elif self.state == "UNBOUND": del self.relay.unbound_sessions[self.session.session_id] self.protocol.transport.loseConnection() self.state = "DISCONNECTED" elif self.state == "CONNECTING": self.connector.disconnect() self.state = "DISCONNECTED" elif self.state == "ESTABLISHED": for forwarding_data, timer in self.send_transactions.itervalues(): if timer is not None: timer.cancel() report = generate_report(ResponseDownstreamTimeout.code, forwarding_data, reason="Session got disconnected") self.enqueue(report.encode()) self.send_transactions.clear() for _, timer in self.other_transactions.itervalues(): if timer is not None: timer.cancel() self.other_transactions.clear() if not self.registered: self.protocol.transport.loseConnection() self.state = "DISCONNECTED" else: self.state = "DISCONNECTING" def _cleanup(self): self.session = None self.other_peer = None self.protocol = None self.relay = None for _, timer in self.send_transactions.itervalues(): if timer is not None and timer.active(): timer.cancel() self.send_transactions.clear() for _, timer in self.other_transactions.itervalues(): if timer is not None and timer.active(): timer.cancel() self.other_transactions.clear() self.forward_send_queue.clear() self.forward_other_queue.clear() @property def _data_bytes(self): return sum(data.bytes_in_queue for data in self.forward_send_queue) @property def _message_count(self): return len(self.forward_other_queue) + len(self.forward_send_queue) def _maybe_pause_read(self): if self.state == "ESTABLISHED" and self.other_peer.state == "ESTABLISHED" and not self.other_peer.read_paused: if self._data_bytes > 1024 * 1024 or self._message_count > 50: self.other_peer.pause_read() def _maybe_resume_read(self): if self.state == "ESTABLISHED" and self.other_peer.state == "ESTABLISHED" and self.other_peer.read_paused: if self._data_bytes <= 1024 * 1024 and self._message_count <= 50: self.other_peer.resume_read() def pause_read(self): self.read_paused = True self.protocol.transport.stopReading() def resume_read(self): self.read_paused = False self.protocol.transport.startReading() # methods implemented for IPullProducer def resumeProducing(self): if self.forward_other_queue: if self.forwarding_send_request: #self.log(log.debug, "Sending other message, aborting SEND") data = self.forward_send_queue.popleft() # terminate the current chunk self._send_payload(data.msrpdata_forward.encode_end("+")) timer = reactor.callLater(30, self.other_peer._cb_transaction_timeout, data) self.other_peer.send_transactions[data.msrpdata_forward.transaction_id] = (data, timer) if self.other_peer.forwarding_data is data: self.other_peer.forwarding_data = None # clone the forward data new_data = ForwardingSendData(data.msrpdata_received) new_data.continuation = data.continuation new_data.position = data.position new_data.data_queue, data.data_queue = data.data_queue, deque() new_data.msrpdata_forward = data.msrpdata_forward.clone() # adjust the byterange, create a new transaction new_data.msrpdata_forward.transaction_id = generate_transaction_id() byterange = new_data.msrpdata_forward.headers["Byte-Range"].decoded byterange[0] = new_data.position new_data.msrpdata_forward.headers["Byte-Range"].decoded = byterange self.forward_send_queue.appendleft(new_data) if self.other_peer.forwarding_data is None and new_data.continuation is None: self.other_peer.forwarding_data = new_data self.forwarding_send_request = False else: #self.log(log.debug, "Sending message") self._send_payload(self.forward_other_queue.popleft()) self._maybe_resume_read() elif self.forwarding_send_request: data = self.forward_send_queue[0] if data.continuation is not None and len(data.data_queue) <= 1: #self.log(log.debug, "Sending end of SEND message") self.forward_send_queue.popleft() chunk = data.consume_data() if chunk is not None: self._send_payload(chunk) self._send_payload(data.msrpdata_forward.encode_end(data.continuation)) if data.msrpdata_received.failure_report != 'no': timer = reactor.callLater(30, self.other_peer._cb_transaction_timeout, data) self.other_peer.send_transactions[data.msrpdata_forward.transaction_id] = (data, timer) self.forwarding_send_request = False self._maybe_resume_read() else: chunk = data.consume_data() if chunk is None: self._stop_sending() else: #self.log(log.debug, "Sending data for SEND message") self._send_payload(chunk) data.position += len(chunk) self._maybe_resume_read() elif self.forward_send_queue: #self.log(log.debug, "Sending headers for SEND message") data = self.forward_send_queue[0] self.forwarding_send_request = True self._send_payload(data.msrpdata_forward.encode_start()) else: self._stop_sending() def stopProducing(self): pass def _send_payload(self, data): if self.session is not None: if self is self.session.source: self.session.upstream_bytes += len(data) else: self.session.downstream_bytes += len(data) self.protocol.transport.write(data) class Session(object): def __init__(self, source, session_id, expire, username, realm): self.source = source self.destination = None self.session_id = session_id self.expire = expire self.username = username self.realm = realm self.downstream_bytes = 0 self.upstream_bytes = 0 diff --git a/msrp/responses.py b/msrp/responses.py index 2b0902b..be7befe 100644 --- a/msrp/responses.py +++ b/msrp/responses.py @@ -1,144 +1,128 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from msrp.protocol import * def generate_report(code, forwarding_data, reason=None): from_data = forwarding_data.msrpdata_received report = MSRPData(generate_transaction_id(), method = "REPORT") report.add_header(ToPathHeader(from_data.headers["From-Path"].encoded)) report.add_header(FromPathHeader([from_data.headers["To-Path"].decoded[0]])) report.add_header(StatusHeader((code, reason))) report.add_header(MessageIDHeader(from_data.headers["Message-ID"].encoded)) start, end, total = forwarding_data.msrpdata_forward.headers["Byte-Range"].decoded if forwarding_data.bytes_received: end = start + forwarding_data.bytes_received - 1 report.add_header(ByteRangeHeader([start, end, total])) return report def exception_from_data(data): try: response_exception = _response_exceptions[data.code] except KeyError: response = ResponseExceptionBase(data) response.code = data.code return response class ResponseExceptionWrapper(response_exception): def __init__(self, data): ResponseExceptionBase.__init__(self, data) return ResponseExceptionWrapper(data) class ResponseExceptionBase(MSRPError): def __init__(self, data): self.data = data def __str__(self): if self.data.comment: return "%03d %s" % (self.data.code, self.data.comment) else: return "%03d" % self.data.code class ResponseException(ResponseExceptionBase): def __init__(self, code, request_data, comment = None, headers = []): data = MSRPData(request_data.transaction_id, code = code, comment = comment) for header in headers: data.headers[header.name] = header data.add_header(ToPathHeader([request_data.headers["From-Path"].decoded[0]])) data.add_header(FromPathHeader([request_data.headers["To-Path"].decoded[0]])) ResponseExceptionBase.__init__(self, data) class ResponseOK(ResponseException): code = 200 def __init__(self, request_data, headers = []): ResponseException.__init__(self, self.code, request_data, "OK", headers) class ResponseUnintelligible(ResponseException): code = 400 def __init__(self, request_data, comment = None, headers = []): if comment: ResponseException.__init__(self, self.code, request_data, "Request was unintelligible, please try again (%s)" % comment, headers) else: ResponseException.__init__(self, self.code, request_data, "Request was unintelligible, please try again", headers) class ResponseUnauthenticated(ResponseException): code = 401 def __init__(self, request_data, headers = []): ResponseException.__init__(self, self.code, request_data, "Unauthenticated", headers) class ResponseUnauthorized(ResponseException): code = 403 def __init__(self, request_data, comment = None, headers = []): if comment is None: ResponseException.__init__(self, self.code, request_data, "Unauthorized to use this relay", headers) else: ResponseException.__init__(self, self.code, request_data, "Unauthorized: %s" % comment, headers) class ResponseDownstreamTimeout(ResponseException): code = 408 def __init__(self, request_data, headers = []): ResponseException.__init__(self, self.code, request_data, "Downstream transaction timed out", headers) class ResponseAbort(ResponseException): code = 413 def __init__(self, request_data, headers = []): ResponseException.__init__(self, self.code, request_data, "Please abort the message you are sending", headers) class ResponseUnknownMediaType(ResponseException): code = 415 def __init__(self, request_data, content_type = None, headers = []): ResponseException.__init__(self, self.code, request_data, "Unknown content type: %s" % content_type, headers) class ResponseOutOfBounds(ResponseException): code = 423 def __init__(self, request_data, parameter = None, headers = []): if parameter: ResponseException.__init__(self, self.code, request_data, "Parameter out of bounds: %s" % parameter, headers) else: ResponseException.__init__(self, self.code, request_data, "Parameter out of bounds", headers) class ResponseNoSession(ResponseException): code = 481 def __init__(self, request_data, comment = None, headers = []): if comment is None: ResponseException.__init__(self, self.code, request_data, "Indicated session does not exist, please terminate", headers) else: ResponseException.__init__(self, self.code, request_data, "Indicated session does not exist, please terminate: %s" % comment, headers) class ResponseUnknownMethod(ResponseException): code = 501 def __init__(self, request_data, headers = []): ResponseException.__init__(self, self.code, request_data, "Unknown method: %s" % request_data.method, headers) class ResponseSessionTaken(ResponseException): code = 506 def __init__(self, request_data, headers = []): ResponseException.__init__(self, self.code, request_data, "This session is already bound to another network connection, stop sending messages", headers) _response_exceptions = dict((eval(cls_name).code,eval(cls_name)) for cls_name in globals() if cls_name.startswith("Response") and hasattr(eval(cls_name), "code")) diff --git a/msrp/tls.py b/msrp/tls.py index 7b64b06..4acfd2c 100644 --- a/msrp/tls.py +++ b/msrp/tls.py @@ -1,62 +1,47 @@ -# MSRP Relay -# Copyright (C) 2008 AG Projects -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + __all__ = ['Certificate', 'PrivateKey'] from gnutls.crypto import X509Certificate, X509PrivateKey from application.process import process class _FileError(Exception): pass def file_content(file): path = process.config_file(file) if path is None: raise _FileError("File '%s' does not exist" % file) try: f = open(path, 'rt') except: raise _FileError("File '%s' could not be open" % file) try: return f.read() finally: f.close() class Certificate(object): """Configuration data type. Used to create a gnutls.crypto.X509Certificate object from a file given in the configuration file.""" def __new__(typ, value): if isinstance(value, str): try: return X509Certificate(file_content(value)) except Exception, e: raise ValueError("Certificate file '%s' could not be loaded: %s" % (value, str(e))) return None else: raise TypeError, 'value should be a string' class PrivateKey(object): """Configuration data type. Used to create a gnutls.crypto.X509PrivateKey object from a file given in the configuration file.""" def __new__(typ, value): if isinstance(value, str): try: return X509PrivateKey(file_content(value)) except Exception, e: raise ValueError("Private key file '%s' could not be loaded: %s" % (value, str(e))) return None else: raise TypeError, 'value should be a string' diff --git a/msrprelay b/msrprelay index 1809de5..a4b05b5 100644 --- a/msrprelay +++ b/msrprelay @@ -1,76 +1,73 @@ #!/usr/bin/env python -# Copyright (C) 2008 AG Projects -# - """MSRP Relay""" if __name__ == "__main__": import os import sys import signal from optparse import OptionParser from application.process import process, ProcessError from application import log import msrp name = "msrprelay" fullname = "MSRP Relay" description = "An open source MSRP Relay" default_pid = os.path.join(msrp.runtime_directory, "relay.pid") default_config = os.path.join(msrp.system_config_directory , msrp.configuration_filename) parser = OptionParser(version="%%prog %s" % msrp.__version__) parser.add_option("--no-fork", action="store_false", dest="fork", default=1, help="run the process in the foreground (for debugging)") parser.add_option("--pid", dest="pid_file", default=default_pid, help='pid file ("%s")' % default_pid, metavar="File") parser.add_option("--config-file", dest="config_file", default=default_config, help='path to configuration file to read ("%s")' % msrp.configuration_filename, metavar="File") (options, args) = parser.parse_args() pid_file = options.pid_file system_config_directory, msrp.configuration_filename = os.path.split(options.config_file) if system_config_directory != "": msrp.system_config_directory = system_config_directory process.system_config_directory = msrp.system_config_directory try: process.runtime_directory = msrp.runtime_directory except ProcessError, e: log.fatal("Cannot start %s: %s" % (fullname, e)) sys.exit(1) if options.fork: try: process.daemonize(pid_file) except ProcessError, e: log.fatal("Cannot start %s: %s" % (fullname, e)) sys.exit(1) log.start_syslog(name) log.msg("Starting %s %s" % (fullname, msrp.__version__)) try: from msrp.relay import Relay if not options.fork: from application.debug.memory import * relay = Relay() except Exception, e: log.fatal("failed to create %s: %s" % (fullname, e)) if e.__class__ is not RuntimeError: log.err() sys.exit(1) process.signals.add_handler(signal.SIGHUP, lambda signum, frame: relay.reload()) try: relay.run() except Exception, e: log.fatal("failed to run %s: %s" % (fullname, e)) if e.__class__ is not RuntimeError: log.err() sys.exit(1) if not options.fork: #from application.debug.memory import * memory_dump()