diff --git a/sylk/applications/__init__.py b/sylk/applications/__init__.py index 4d5cb9e..8743570 100644 --- a/sylk/applications/__init__.py +++ b/sylk/applications/__init__.py @@ -1,348 +1,365 @@ __all__ = ['ISylkApplication', 'ApplicationRegistry', 'SylkApplication', 'IncomingRequestHandler', 'ApplicationLogger'] import abc +import imp import os import socket import struct import sys -from collections import defaultdict - from application import log from application.configuration.datatypes import NetworkRange from application.notification import IObserver, NotificationCenter from application.python import Null +from application.python.decorator import execute_once from application.python.types import Singleton +from collections import defaultdict from itertools import chain from sipsimple.threading import run_in_twisted_thread from zope.interface import implements from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig SYLK_APP_HEADER = 'X-Sylk-App' +def find_builtin_applications(): + applications_directory = os.path.dirname(__file__) + for path, dirs, files in os.walk(applications_directory): + parent_directory, name = os.path.split(path) + if parent_directory == applications_directory and '__init__.py' in files and name not in ServerConfig.disabled_applications: + yield name + if path != applications_directory: + del dirs[:] # do not descend more than 1 level + + +def find_extra_applications(): + if ServerConfig.extra_applications_dir: + applications_directory = os.path.realpath(ServerConfig.extra_applications_dir.normalized) + for path, dirs, files in os.walk(applications_directory): + parent_directory, name = os.path.split(path) + if parent_directory == applications_directory and '__init__.py' in files and name not in ServerConfig.disabled_applications: + yield name + if path != applications_directory: + del dirs[:] # do not descend more than 1 level + + +def find_applications(): + return chain(find_builtin_applications(), find_extra_applications()) + + class ApplicationRegistry(object): __metaclass__ = Singleton def __init__(self): - self.applications = [] + self.application_map = {} + + def __getitem__(self, name): + return self.application_map[name] + + def __contains__(self, name): + return name in self.application_map def __iter__(self): - return iter(self.applications) + return iter(self.application_map.values()) + + def __len__(self): + return len(self.application_map) + + @execute_once + def load_applications(self): + for name in find_builtin_applications(): + try: + __import__('sylk.applications.{name}'.format(name=name)) + except ImportError as e: + log.error('Failed to load builtin application {name!r}: {exception!s}'.format(name=name, exception=e)) + for name in find_extra_applications(): + if name in sys.modules: + # being able to log this is contingent on this function only executing once + log.warning('Not loading extra application {name!r} as it would overshadow a system package/module'.format(name=name)) + continue + try: + imp.load_module(name, *imp.find_module(name, [ServerConfig.extra_applications_dir.normalized])) + except ImportError as e: + log.error('Failed to load extra application {name!r}: {exception!s}'.format(name=name, exception=e)) - def find_application(self, name): + def add(self, app_class): try: - return next(app for app in self.applications if app.__appname__ == name) - except StopIteration: - return None + app = app_class() + except Exception as e: + log.exception('Failed to initialize {app.__appname__!r} application: {exception!s}'.format(app=app_class, exception=e)) + else: + self.application_map[app.__appname__] = app - def add(self, app): - if app not in self.applications: - self.applications.append(app) + def get(self, name, default=None): + return self.application_map.get(name, default) class ApplicationName(object): - def __get__(self, obj, objtype): - name = objtype.__name__ + def __get__(self, instance, instance_type): + name = instance_type.__name__ return name[:-11].lower() if name.endswith('Application') else name.lower() class SylkApplicationMeta(abc.ABCMeta, Singleton): """Metaclass for defining SylkServer applications: a Singleton that also adds them to the application registry""" + def __init__(cls, name, bases, dic): super(SylkApplicationMeta, cls).__init__(name, bases, dic) if name != 'SylkApplication': ApplicationRegistry().add(cls) class SylkApplication(object): """Base class for all SylkServer applications""" __metaclass__ = SylkApplicationMeta __appname__ = ApplicationName() @abc.abstractmethod def start(self): pass @abc.abstractmethod def stop(self): pass @abc.abstractmethod def incoming_session(self, session): pass @abc.abstractmethod def incoming_subscription(self, subscribe_request, data): pass @abc.abstractmethod def incoming_referral(self, refer_request, data): pass @abc.abstractmethod def incoming_message(self, message_request, data): pass -def load_builtin_applications(): - toplevel = os.path.dirname(__file__) - app_list = [item for item in os.listdir(toplevel) if os.path.isdir(os.path.join(toplevel, item)) and '__init__.py' in os.listdir(os.path.join(toplevel, item))] - for module in ['sylk.applications.%s' % item for item in set(app_list).difference(ServerConfig.disabled_applications)]: - try: - __import__(module) - except ImportError, e: - log.warning('Error loading builtin "%s" application: %s' % (module, e)) - -def load_extra_applications(): - if ServerConfig.extra_applications_dir: - toplevel = os.path.realpath(os.path.abspath(ServerConfig.extra_applications_dir.normalized)) - if os.path.isdir(toplevel): - app_list = [item for item in os.listdir(toplevel) if os.path.isdir(os.path.join(toplevel, item)) and '__init__.py' in os.listdir(os.path.join(toplevel, item))] - sys.path.append(toplevel) - for module in (item for item in set(app_list).difference(ServerConfig.disabled_applications)): - try: - __import__(module) - except ImportError, e: - log.warning('Error loading extra "%s" application: %s' % (module, e)) - -def load_applications(): - load_builtin_applications() - load_extra_applications() - for app in ApplicationRegistry(): - try: - app() - except Exception, e: - log.warning('Error loading application: %s' % e) - log.err() - - class ApplicationNotLoadedError(Exception): pass + class IncomingRequestHandler(object): - """ - Handle incoming requests and match them to applications. - """ + """Handle incoming requests and match them to applications""" + __metaclass__ = Singleton implements(IObserver) def __init__(self): - load_applications() - registry = ApplicationRegistry() - self.applications = dict((app.__appname__, app) for app in registry) - log.msg('Loaded applications: %s' % ', '.join(self.applications)) - default_application = registry.find_application(ServerConfig.default_application) - if default_application is None: + self.application_registry = ApplicationRegistry() + self.application_registry.load_applications() + log.info('Loaded applications: {}'.format(', '.join(sorted(app.__appname__ for app in self.application_registry)))) + if ServerConfig.default_application not in self.application_registry: log.warning('Default application "%s" does not exist, falling back to "conference"' % ServerConfig.default_application) ServerConfig.default_application = 'conference' else: log.msg('Default application: %s' % ServerConfig.default_application) self.application_map = dict((item.split(':')) for item in ServerConfig.application_map) if self.application_map: txt = 'Application map:\n' invert_app_map = defaultdict(list) for url, app in self.application_map.iteritems(): invert_app_map[app].append(url) for app, urls in invert_app_map.iteritems(): txt += ' * %s:\n' % app for url in urls: txt += ' - %s\n' % url log.msg(txt[:-1]) self.authorization_handler = AuthorizationHandler() def start(self): - for app in ApplicationRegistry(): + for app in self.application_registry: try: - app().start() - except Exception, e: - log.warning('Error starting application: %s' % e) - log.err() + app.start() + except Exception as e: + log.exception('Failed to start {app.__appname__!r} application: {exception!s}'.format(app=app, exception=e)) self.authorization_handler.start() notification_center = NotificationCenter() notification_center.add_observer(self, name='SIPSessionNewIncoming') notification_center.add_observer(self, name='SIPIncomingSubscriptionGotSubscribe') notification_center.add_observer(self, name='SIPIncomingReferralGotRefer') notification_center.add_observer(self, name='SIPIncomingRequestGotRequest') def stop(self): self.authorization_handler.stop() notification_center = NotificationCenter() notification_center.remove_observer(self, name='SIPSessionNewIncoming') notification_center.remove_observer(self, name='SIPIncomingSubscriptionGotSubscribe') notification_center.remove_observer(self, name='SIPIncomingReferralGotRefer') notification_center.remove_observer(self, name='SIPIncomingRequestGotRequest') - for app in ApplicationRegistry(): + for app in self.application_registry: try: - app().stop() - except Exception, e: - log.warning('Error stopping application: %s' % e) - log.err() + app.stop() + except Exception as e: + log.exception('Failed to stop {app.__appname__!r} application: {exception!s}'.format(app=app, exception=e)) def get_application(self, ruri, headers): if SYLK_APP_HEADER in headers: - application = headers[SYLK_APP_HEADER].body.strip() + application_name = headers[SYLK_APP_HEADER].body.strip() else: - application = ServerConfig.default_application + application_name = ServerConfig.default_application if self.application_map: prefixes = ("%s@%s" % (ruri.user, ruri.host), ruri.host, ruri.user) for prefix in prefixes: if prefix in self.application_map: - application = self.application_map[prefix] + application_name = self.application_map[prefix] break try: - app = self.applications[application] + return self.application_registry[application_name] except KeyError: - log.error('Application %s is not loaded' % application) + log.error('Application %s is not loaded' % application_name) raise ApplicationNotLoadedError - else: - return app() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionNewIncoming(self, notification): session = notification.sender try: self.authorization_handler.authorize_source(session.peer_address.ip) except UnauthorizedRequest: session.reject(403) return try: app = self.get_application(session.request_uri, notification.data.headers) except ApplicationNotLoadedError: session.reject(404) else: app.incoming_session(session) def _NH_SIPIncomingSubscriptionGotSubscribe(self, notification): subscribe_request = notification.sender try: self.authorization_handler.authorize_source(subscribe_request.peer_address.ip) except UnauthorizedRequest: subscribe_request.reject(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: subscribe_request.reject(404) else: app.incoming_subscription(subscribe_request, notification.data) def _NH_SIPIncomingReferralGotRefer(self, notification): refer_request = notification.sender try: self.authorization_handler.authorize_source(refer_request.peer_address.ip) except UnauthorizedRequest: refer_request.reject(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: refer_request.reject(404) else: app.incoming_referral(refer_request, notification.data) def _NH_SIPIncomingRequestGotRequest(self, notification): request = notification.sender if notification.data.method != 'MESSAGE': request.answer(405) return try: self.authorization_handler.authorize_source(request.peer_address.ip) except UnauthorizedRequest: request.answer(403) return try: app = self.get_application(notification.data.request_uri, notification.data.headers) except ApplicationNotLoadedError: request.answer(404) else: app.incoming_message(request, notification.data) class UnauthorizedRequest(Exception): pass + class AuthorizationHandler(object): implements(IObserver) def __init__(self): self.state = None self.trusted_peers = SIPConfig.trusted_peers self.thor_nodes = [] @property def trusted_parties(self): if ThorNodeConfig.enabled: return self.thor_nodes return self.trusted_peers def start(self): NotificationCenter().add_observer(self, name='ThorNetworkGotUpdate') self.state = 'started' def stop(self): self.state = 'stopped' NotificationCenter().remove_observer(self, name='ThorNetworkGotUpdate') def authorize_source(self, ip_address): if self.state != 'started': raise UnauthorizedRequest for range in self.trusted_parties: if struct.unpack('!L', socket.inet_aton(ip_address))[0] & range[1] == range[0]: return True raise UnauthorizedRequest @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_ThorNetworkGotUpdate(self, notification): self.thor_nodes = [NetworkRange(node) for node in chain.from_iterable(n.nodes for n in notification.data.networks.values())] class ApplicationLogger(object): __metaclass__ = Singleton @classmethod def for_package(cls, package): return cls(package.split('.')[-1]) def __init__(self, prefix): self.prefix = '[%s] ' % prefix def info(self, message, **context): log.info(self.prefix+message, **context) def warning(self, message, **context): log.warning(self.prefix+message, **context) def debug(self, message, **context): log.debug(self.prefix+message, **context) def error(self, message, **context): log.error(self.prefix+message, **context) def critical(self, message, **context): log.critical(self.prefix+message, **context) def exception(self, message=None, **context): if message is not None: message = self.prefix+message log.exception(message, **context) # Some aliases that are commonly used msg = info warn = warning fatal = critical err = exception diff --git a/sylk/server.py b/sylk/server.py index d5a9d27..b9af70c 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,259 +1,259 @@ import os import sys from threading import Event from uuid import uuid4 from application import log from application.notification import NotificationCenter from application.python import Null from application.system import makedirs from eventlib import proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer from sipsimple.lookup import DNSManager from sipsimple.storage import MemoryStorage from sipsimple.threading import ThreadManager from sipsimple.threading.green import run_in_green_thread from sipsimple.video import VideoDevice from twisted.internet import reactor # Load stream extensions needed for integration with SIP SIMPLE SDK import sylk.streams del sylk.streams from sylk.accounts import DefaultAccount from sylk.applications import IncomingRequestHandler from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import Logger from sylk.session import SessionManager from sylk.web import WebServer class SylkServer(SIPApplication): def __init__(self): self.request_handler = Null self.thor_interface = Null self.web_server = Null self.logger = Logger() self.options = Null self.stopping_event = Event() self.stop_event = Event() self.failed = False def start(self, options): self.options = options if self.options.enable_bonjour: ServerConfig.enable_bonjour = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, name='ThorNetworkGotFatalError') Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: super(SylkServer, self).start(MemoryStorage()) except Exception, e: log.fatal("Error starting SIP Application: %s" % e) sys.exit(1) def _initialize_core(self): # SylkServer needs to listen for extra events and request types notification_center = NotificationCenter() settings = SIPSimpleSettings() # initialize core options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP detect_sip_loops=False, udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # video video_codecs=list(settings.rtp.video_codec_list), enable_colorbar_device=True, # logging log_level=settings.logs.pjsip_level if settings.logs.trace_pjsip else 0, trace_sip=settings.logs.trace_sip, # events and requests to handle events={'conference': ['application/conference-info+xml'], 'presence': ['application/pidf+xml'], 'refer': ['message/sipfrag;version=2.0']}, incoming_events={'conference', 'presence'}, incoming_requests={'MESSAGE'}) notification_center.add_observer(self, sender=self.engine) self.engine.start(**options) @run_in_green_thread def _initialize_subsystems(self): account_manager = AccountManager() dns_manager = DNSManager() notification_center = NotificationCenter() session_manager = SessionManager() settings = SIPSimpleSettings() notification_center.post_notification('SIPApplicationWillStart', sender=self) if self.state == 'stopping': reactor.stop() return # Initialize default account default_account = DefaultAccount() account_manager.default_account = default_account # initialize TLS self._initialize_tls() # initialize PJSIP internal resolver self.engine.set_nameservers(dns_manager.nameservers) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize video objects self.video_device = VideoDevice(u'Colorbar generator', settings.video.resolution, settings.video.framerate) # initialize instance id settings.instance_id = uuid4().urn settings.save() # initialize ZRTP cache makedirs(ServerConfig.spool_dir.normalized) self.engine.zrtp_cache = os.path.join(ServerConfig.spool_dir.normalized, 'zrtp.db') # initialize middleware components dns_manager.start() account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self) # start SylkServer components self.web_server = WebServer() self.web_server.start() self.request_handler = IncomingRequestHandler() self.request_handler.start() if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode self.thor_interface = ConferenceNode() thor_roles = [] - if 'conference' in self.request_handler.applications: + if 'conference' in self.request_handler.application_registry: thor_roles.append('conference_server') - if 'xmppgateway' in self.request_handler.applications: + if 'xmppgateway' in self.request_handler.application_registry: thor_roles.append('xmpp_gateway') - if 'webrtcgateway' in self.request_handler.applications: + if 'webrtcgateway' in self.request_handler.application_registry: thor_roles.append('webrtc_gateway') self.thor_interface.start(thor_roles) @run_in_green_thread def _shutdown_subsystems(self): dns_manager = DNSManager() account_manager = AccountManager() session_manager = SessionManager() # terminate all sessions p = proc.spawn(session_manager.stop) p.wait() # shutdown SylkServer components procs = [proc.spawn(self.web_server.stop), proc.spawn(self.request_handler.stop), proc.spawn(self.thor_interface.stop)] proc.waitall(procs) # shutdown other middleware components procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop)] proc.waitall(procs) # shutdown engine self.engine.stop() self.engine.join(timeout=5) # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _NH_AudioDevicesDidChange(self, notification): pass def _NH_DefaultAudioDeviceDidChange(self, notification): pass def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal("Couldn't set TLS options: %s" % notification.data.error) sys.exit(1) def _NH_SIPApplicationWillStart(self, notification): self.logger.start() settings = SIPSimpleSettings() if settings.logs.trace_sip and self.logger._siptrace_filename is not None: log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename) if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None: log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename) if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None: log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename) if settings.logs.trace_notifications and self.logger._notifications_filename is not None: log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename) def _NH_SIPApplicationDidStart(self, notification): settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.msg("SylkServer started, listening on:") for transport in settings.sip.transport_list: try: log.msg("%s:%d (%s)" % (local_ip, getattr(self.engine, '%s_port' % transport), transport.upper())) except TypeError: pass def _NH_SIPApplicationWillEnd(self, notification): self.stopping_event.set() def _NH_SIPApplicationDidEnd(self, notification): log.msg('SIP application ended') self.logger.stop() if not self.stopping_event.is_set(): log.warning('SIP application ended without shutting down all subsystems') self.stopping_event.set() self.stop_event.set() def _NH_SIPApplicationGotFatalError(self, notification): log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback) self.failed = True def _NH_SIPEngineDidFail(self, notification): log.error('SIP engine failed') self.failed = True super(SylkServer, self)._NH_SIPEngineDidFail(notification) def _NH_ThorNetworkGotFatalError(self, notification): log.error("All Thor Event Servers have unrecoverable errors.")