diff --git a/sylk/configuration/__init__.py b/sylk/configuration/__init__.py index 92830eb..8d3d80e 100644 --- a/sylk/configuration/__init__.py +++ b/sylk/configuration/__init__.py @@ -1,55 +1,68 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import StringList from application.system import host from sipsimple.configuration.datatypes import NonNegativeInteger, SRTPEncryption from sylk import configuration_filename from sylk.configuration.datatypes import AudioCodecs, IPAddress, Port, PortRange, SIPProxyAddress +from sylk.tls import Certificate, PrivateKey class ServerConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Server' ca_file = ConfigSetting(type=str, value='/etc/sylkserver/tls/ca.crt') certificate = ConfigSetting(type=str, value='/etc/sylkserver/tls/sylkserver.crt') verify_server = False default_application = 'conference' application_map = ConfigSetting(type=StringList, value='') trace_dir = ConfigSetting(type=str, value='/var/log/sylkserver') trace_sip = False trace_msrp = False trace_notifications = False class SIPConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'SIP' local_ip = ConfigSetting(type=IPAddress, value=host.default_ip) local_udp_port = ConfigSetting(type=Port, value=5060) local_tcp_port = ConfigSetting(type=Port, value=5060) local_tls_port = ConfigSetting(type=Port, value=None) outbound_proxy = ConfigSetting(type=SIPProxyAddress, value=None) class MSRPConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'MSRP' use_tls = False class RTPConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'RTP' audio_codecs = ConfigSetting(type=AudioCodecs, value=None) port_range = ConfigSetting(type=PortRange, value=PortRange('50000:50500')) srtp_encryption = ConfigSetting(type=SRTPEncryption, value='optional') timeout = ConfigSetting(type=NonNegativeInteger, value=30) +class ThorNodeConfig(ConfigSection): + __cfgfile__ = configuration_filename + __section__ = 'ThorNetwork' + + enabled = False + domain = "sipthor.net" + multiply = 1000 + certificate = ConfigSetting(type=Certificate, value=None) + private_key = ConfigSetting(type=PrivateKey, value=None) + ca = ConfigSetting(type=Certificate, value=None) + + diff --git a/sylk/interfaces/__init__.py b/sylk/interfaces/__init__.py new file mode 100644 index 0000000..abdbbc0 --- /dev/null +++ b/sylk/interfaces/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2011 AG Projects. See LICENSE for details. +# + diff --git a/sylk/interfaces/sipthor.py b/sylk/interfaces/sipthor.py new file mode 100644 index 0000000..c1cc754 --- /dev/null +++ b/sylk/interfaces/sipthor.py @@ -0,0 +1,111 @@ +# Copyright (C) 2011 AG-Projects. +# +# This module is proprietary to AG Projects. Use of this module by third +# parties is not supported. + +__all__ = ['ConferenceNode'] + +from application import log +from application.notification import NotificationCenter +from application.python.util import Singleton +from eventlet.twistedutil import block_on, callInGreenThread +from gnutls.interfaces.twisted import X509Credentials +from gnutls.constants import COMP_DEFLATE, COMP_LZO, COMP_NULL +from sipsimple.util import TimestampedNotificationData + +from thor.eventservice import EventServiceClient, ThorEvent +from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity +from thor.scheduler import KeepRunning + +import sylk +from sylk.configuration import SIPConfig, ThorNodeConfig + + +class ConferenceNode(EventServiceClient): + __metaclass__ = Singleton + topics = ["Thor.Members"] + + def __init__(self): + # Needs to be called from a green thread + self.node = ThorEntity(SIPConfig.local_ip, ['conference_server'], version=sylk.__version__) + self.networks = {} + self.presence_message = ThorEvent('Thor.Presence', self.node.id) + self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) + credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) + credentials.verify_peer = True + credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL) + EventServiceClient.__init__(self, ThorNodeConfig.domain, credentials) + + def connectionLost(self, connector, reason): + """Called when an event server connection goes away""" + self.connections.discard(connector.transport) + + def connectionFailed(self, connector, reason): + """Called when an event server connection has an unrecoverable error""" + connector.failed = True + available_connectors = set(c for c in self.connectors if not c.failed) + if not available_connectors: + log.fatal("All Thor Event Servers have unrecoverable errors.") + NotificationCenter().post_notification('ThorNetworkGotFatalError', sender=self, data=TimestampedNotificationData()) + + def stop(self): + # Needs to be called from a green thread + self._shutdown() + + def _monitor_event_servers(self): + def wrapped_func(): + servers = self._get_event_servers() + self._update_event_servers(servers) + callInGreenThread(wrapped_func) + return KeepRunning + + def _disconnect_all(self): + for conn in self.connectors: + conn.disconnect() + + def _shutdown(self): + if self.disconnecting: + return + self.disconnecting = True + self.dns_monitor.cancel() + if self.advertiser: + self.advertiser.cancel() + if self.shutdown_message: + self._publish(self.shutdown_message) + requests = [conn.protocol.unsubscribe(*self.topics) for conn in self.connections] + for request in requests: + block_on(request.deferred) + self._disconnect_all() + + def handle_event(self, event): + #print "Received event: %s" % event + networks = self.networks + role_map = ThorEntitiesRoleMap(event.message) # mapping between role names and lists of nodes with that role + updated = False + for role in ('sip_proxy', 'conference_server'): + try: + network = networks[role] + except KeyError: + from thor import network as thor_network + network = thor_network.new(ThorNodeConfig.multiply) + networks[role] = network + new_nodes = set([node.ip for node in role_map.get(role, [])]) + old_nodes = set(network.nodes) + added_nodes = new_nodes - old_nodes + removed_nodes = old_nodes - new_nodes + if removed_nodes: + for node in removed_nodes: + network.remove_node(node) + plural = len(removed_nodes) != 1 and 's' or '' + log.msg("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes))) + updated = True + if added_nodes: + for node in added_nodes: + network.add_node(node) + plural = len(added_nodes) != 1 and 's' or '' + log.msg("added %s node%s: %s" % (role, plural, ', '.join(added_nodes))) + updated = True + #print "Thor %s nodes: %s" % (role, str(network.nodes)) + if updated: + NotificationCenter().post_notification('ThorNetworkGotUpdate', sender=self, data=TimestampedNotificationData(networks=self.networks)) + diff --git a/sylk/server.py b/sylk/server.py index 258b337..9281e80 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,173 +1,228 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import sys from threading import Event from application import log from application.notification import NotificationCenter +from eventlet import api, proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration import ConfigurationError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer, Engine, SIPCoreError +from sipsimple.lookup import DNSManager from sipsimple.session import SessionManager +from sipsimple.threading import ThreadManager +from sipsimple.threading.green import run_in_green_thread from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from sylk.applications import IncomingRequestHandler -from sylk.configuration import SIPConfig +from sylk.configuration import SIPConfig, ThorNodeConfig from sylk.configuration.backend import MemoryBackend from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import Logger # Load extensions needed for integration with SIP SIMPLE SDK import sylk.extensions class SylkServer(SIPApplication): def __init__(self): self.logger = None self.request_handler = IncomingRequestHandler() self.stop_event = Event() def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) + notification_center.add_observer(self, name='ThorNetworkGotFatalError') self.logger = Logger() Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: SIPApplication.start(self, MemoryBackend()) except ConfigurationError, e: log.fatal("Error loading configuration: ",e) sys.exit(1) def _load_configuration(self): account_manager = AccountManager() account = Account("account@example.com") # an account is required by AccountManager account_manager.default_account = account def _initialize_subsystems(self): account_manager = AccountManager() engine = Engine() notification_center = NotificationCenter() session_manager = SessionManager() settings = SIPSimpleSettings() self._load_configuration() notification_center.post_notification('SIPApplicationWillStart', sender=self, data=TimestampedNotificationData()) if self.state == 'stopping': reactor.stop() return account = account_manager.default_account # initialize core notification_center.add_observer(self, sender=engine) options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_protocol='TLSv1', tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, tls_timeout=3000, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # logging log_level=settings.logs.pjsip_level, trace_sip=True, # events and requests to handle events={"conference": ["application/conference-info+xml"], "refer": ["message/sipfrag;version=2.0"]}, incoming_events=set(['conference']), incoming_requests=set(['MESSAGE']) ) try: engine.start(**options) except SIPCoreError: self.end_reason = 'engine failed' reactor.stop() return # initialize TLS try: engine.set_tls_options(port=settings.sip.tls_port if 'tls' in settings.sip.transport_list else None, protocol=settings.tls.protocol, verify_server=account.tls.verify_server if account else False, ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None, cert_file=account.tls.certificate.normalized if account and account.tls.certificate else None, privkey_file=account.tls.certificate.normalized if account and account.tls.certificate else None, timeout=settings.tls.timeout) except Exception, e: notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=TimestampedNotificationData(error=e)) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, settings.audio.tail_length) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize middleware components account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self, data=TimestampedNotificationData()) + @run_in_green_thread + def _shutdown_subsystems(self): + # cleanup internals + if self._wakeup_timer is not None and self._wakeup_timer.active(): + self._wakeup_timer.cancel() + self._wakeup_timer = None + + # shutdown SIPThor interface + sipthor_proc = proc.spawn(self._stop_sipthor) + sipthor_proc.wait() + + # shutdown middleware components + dns_manager = DNSManager() + account_manager = AccountManager() + session_manager = SessionManager() + procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(session_manager.stop)] + proc.waitall(procs) + + # shutdown engine + engine = Engine() + engine.stop() + # TODO: timeout should be removed when the Engine is fixed so that it never hangs. -Saul + try: + with api.timeout(15): + while True: + notification = self._channel.wait() + if notification.name == 'SIPEngineDidEnd': + break + except api.TimeoutError: + pass + # stop threads + thread_manager = ThreadManager() + thread_manager.stop() + # stop the reactor + reactor.stop() + + def _start_sipthor(self): + if ThorNodeConfig.enabled: + from sylk.interfaces.sipthor import ConferenceNode + ConferenceNode() + + def _stop_sipthor(self): + if ThorNodeConfig.enabled: + from sylk.interfaces.sipthor import ConferenceNode + ConferenceNode().stop() + def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal("Couldn't set TLS options: %s" % notification.data.error) def _NH_SIPApplicationWillStart(self, notification): self.logger.start() self.request_handler.start() settings = SIPSimpleSettings() if settings.logs.trace_sip and self.logger._siptrace_filename is not None: log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename) if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None: log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename) if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None: log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename) if settings.logs.trace_notifications and self.logger._notifications_filename is not None: log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename) def _NH_SIPApplicationDidStart(self, notification): engine = Engine() settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.msg("SylkServer started, listening on:") for transport in settings.sip.transport_list: try: log.msg("%s:%d (%s)" % (local_ip, getattr(engine, '%s_port' % transport), transport.upper())) except TypeError: pass + # Start SIPThor interface + proc.spawn(self._start_sipthor) def _NH_SIPApplicationWillEnd(self, notification): self.request_handler.stop() def _NH_SIPApplicationDidEnd(self, notification): self.logger.stop() self.stop_event.set() def _NH_SIPEngineGotException(self, notification): log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback) + def _NH_ThorNetworkGotFatalError(self, notification): + self.stop() diff --git a/sylk/tls.py b/sylk/tls.py new file mode 100644 index 0000000..440e482 --- /dev/null +++ b/sylk/tls.py @@ -0,0 +1,53 @@ +# Copyright (C) 2007-2011 AG Projects. +# + +"""TLS helper classes""" + +__all__ = ['Certificate', 'PrivateKey'] + +from gnutls.crypto import X509Certificate, X509PrivateKey + +from application import log +from application.process import process + +class _FileError(Exception): pass + +def file_content(file): + path = process.config_file(file) + if path is None: + raise _FileError("File '%s' does not exist" % file) + try: + f = open(path, 'rt') + except Exception: + raise _FileError("File '%s' could not be open" % file) + try: + return f.read() + finally: + f.close() + +class Certificate(object): + """Configuration data type. Used to create a gnutls.crypto.X509Certificate object + from a file given in the configuration file.""" + def __new__(typ, value): + if isinstance(value, str): + try: + return X509Certificate(file_content(value)) + except Exception, e: + log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e))) + return None + else: + raise TypeError, 'value should be a string' + +class PrivateKey(object): + """Configuration data type. Used to create a gnutls.crypto.X509PrivateKey object + from a file given in the configuration file.""" + def __new__(typ, value): + if isinstance(value, str): + try: + return X509PrivateKey(file_content(value)) + except Exception, e: + log.warn("Private key file '%s' could not be loaded: %s" % (value, str(e))) + return None + else: + raise TypeError, 'value should be a string' +