diff --git a/conference.ini.sample b/conference.ini.sample index 1e05027..06afdff 100644 --- a/conference.ini.sample +++ b/conference.ini.sample @@ -1,56 +1,53 @@ ; SylkServer Conference application configuration file [Conference] ; The following settings are the default used by the software, uncomment them ; only if you want to make changes ; db_uri = sqlite:///var/lib/sylkserver/conference.sqlite ; Database table name for storing messages history ; history_table = message_history ; Replay last chat messages after joining a room ; replay_history = 20 ; Directory for storing files transferred to rooms (a subdirectory for each room will be created) ; file_transfer_dir = /var/spool/sylkserver ; File transfer push support. If enabled files will be pushed to all active ; participants after receiving the file ; push_file_transfer = False ; Directory where images use by the Screen Sharing functionality will be stored ; screen_sharing_dir = /var/spool/sylkserver/screensharing ; IP address used for serving Screen Sharing HTTP requests, empty string means listen on interface used ; by the default route ; screen_sharing_ip = ; Port where Screen Sharing HTTP server will listen on, set to 0 for random ; screen_sharing_port = 0 ; Use HTTPS instead of HTTP ; screen_sharing_use_https = True ; Server certificate for HTTPS connections ; screen_sharing_certificate = /etc/sylkserver/tls/default.crt -; Advertise the conference server and each room in Bonjour -; use_bonjour = False - ; Access Lists Default Policy ; Apache-style access lists for the caller using SIP domains or SIP URIs ; https://httpd.apache.org/docs/2.2/mod/mod_authz_host.html#order ; ; access_policy = allow, deny ; allow = all ; deny = none ; Access Lists can be applied per room overriding the Default Policy ; ; [test@domain.com] ; access_policy = allow, deny ; allow = example.com, test@domain.com ; deny = all diff --git a/config.ini.sample b/config.ini.sample index 86fe0e4..762a221 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -1,84 +1,86 @@ ; SylkServer configuration file [Server] ; The following settings are the default used by the software, uncomment ; them only if you want to make changes ; default_application = conference ; Map user part of the Request URI to a specific application ; application_map = 123:conference,test:irc-conference ; Disable the specified applications ; disabled_applications = ; trace_dir = /var/log/sylkserver ; trace_core = False ; trace_sip = False ; trace_msrp = False ; trace_notifications = False ; TLS can be used for encryption of SIP signaling and MSRP media. TLS is ; disabled by default. To enable TLS, you must have a valid X.509 ; certificate and configure it below, then set the local_tls_port in the SIP ; section and use_tls in MSRP section ; The X.509 Certificate Authorities file ; ca_file = /etc/sylkserver/tls/ca.crt ; The file containing X.509 certificate and private key in unencrypted format ; certificate = /etc/sylkserver/tls/default.crt ; verify_server = False +; Enable Bonjour capabilities for applications +; enable_bonjour = False [SIP] ; SIP transport settings ; IP address used for SIP signaling; empty string or any means listen on interface used ; by the default route ; local_ip = ; Ports used for SIP transports, if not set to any value the transport will be disabled ; local_udp_port = 5060 ; local_tcp_port = 5060 ; local_tls_port = 5061 ; If set all outbound SIP requests will be sent through this SIP proxy ; outbound_proxy = ; A comma-separated list of hosts or networks to trust. ; The elements can be an IP address in CIDR format, a ; hostname or an IP address (in the latter 2 a mask of 32 ; is assumed), or the special keywords 'any' and 'none' ; (being equivalent to 0.0.0.0/0 and 0.0.0.0/32 ; respectively). It defaults to 'any'. ; trusted_peers = [MSRP] ; MSRP transport settings ; A valid X.509 certificate is required for MSRP to work over TLS. ; TLS is enabled by default, a default TLS certificate is provided with SylkServer. ; use_tls = True [RTP] ; RTP transport settings ; Allowed codec list, valid values: G722, speex, PCMU, PCMA, iLBC, GSM ; audio_codecs = G722,speex,PCMU,PCMA ; Port range used for RTP ; port_range = 50000:50500 ; SRTP valid values: disabled, mandatory, optional ; srtp_encryption = optional ; RTP stream timeout, session will be disconnected after this value ; timeout = 30 diff --git a/sylk-server b/sylk-server index 0e13e76..555f3a4 100755 --- a/sylk-server +++ b/sylk-server @@ -1,101 +1,101 @@ #!/usr/bin/env python # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # import os import signal import sys from optparse import OptionParser from application import log from application.process import process, ProcessError import sylk DEBUG = False def main(): name = 'sylk-server' fullname = 'SylkServer' runtime_directory = '/var/run/sylkserver' system_config_directory = '/etc/sylkserver' default_pid = os.path.join(runtime_directory, 'server.pid') default_config = sylk.configuration_filename if os.path.isfile(sylk.configuration_filename) else os.path.join(system_config_directory, sylk.configuration_filename) parser = OptionParser(version='%%prog %s' % sylk.__version__) parser.add_option('--no-fork', action='store_false', dest='fork', default=1, help='run the process in the foreground (for debugging)') parser.add_option('--pid', dest='pid_file', help='pid file ("%s")' % default_pid, metavar='File') parser.add_option('--config-file', dest='config_file', default=default_config, help='path to configuration file to read ("%s")' % default_config, metavar='File') - parser.add_option('--use-bonjour', action='store_true', dest='use_bonjour', default=False, - help='enable bonjour services') + parser.add_option('--enable-bonjour', action='store_true', dest='enable_bonjour', default=False, + help='enable Bonjour services') (options, args) = parser.parse_args() path, configuration_filename = os.path.split(options.config_file) if path: system_config_directory = path process.system_config_directory = system_config_directory sylk.configuration_filename = process.config_file(configuration_filename) pid_file = options.pid_file or default_pid # when run in foreground, do not require root access because of /var/run/sylkserver if not options.fork: process._runtime_directory = None else: try: process.runtime_directory = runtime_directory process.daemonize(pid_file) except ProcessError, e: log.fatal("Cannot start %s: %s" % (fullname, e)) sys.exit(1) log.start_syslog(name) if sylk.configuration_filename: log.msg("Starting %s %s, config=%s" % (fullname, sylk.__version__, sylk.configuration_filename)) else: log.msg("Starting %s %s, with no configuration file" % (fullname, sylk.__version__)) try: if not options.fork and DEBUG: from application.debug.memory import memory_dump from sylk.server import SylkServer server = SylkServer() except Exception, e: log.fatal("failed to create %s: %s" % (fullname, e)) log.err() sys.exit(1) def stop_server(*args): log.msg('Stopping SylkServer...') server.stop() process.signals.add_handler(signal.SIGTERM, stop_server) process.signals.add_handler(signal.SIGINT, stop_server) try: server.start() signal.pause() server.stop_event.wait() log.msg("%s stopped" % fullname) except Exception, e: log.fatal("failed to run %s: %s" % (fullname, e)) log.err() sys.exit(1) if not options.fork and DEBUG: print "---------------------" memory_dump() print "---------------------" if __name__ == "__main__": main() diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py index a22d325..7ab1eaf 100644 --- a/sylk/applications/conference/__init__.py +++ b/sylk/applications/conference/__init__.py @@ -1,403 +1,407 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # import mimetypes import os import re from application.notification import IObserver, NotificationCenter from application.python import Null from gnutls.interfaces.twisted import X509Credentials from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError, Header, ContactHeader, FromHeader, ToHeader from sipsimple.lookup import DNSLookup from sipsimple.streams import AudioStream from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.applications import ISylkApplication, SylkApplication, ApplicationLogger from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.database import initialize as init_database from sylk.applications.conference.room import Room from sylk.applications.conference.web import ScreenSharingWebServer from sylk.bonjour import BonjourServices -from sylk.configuration import SIPConfig, ThorNodeConfig +from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.extensions import ChatStream from sylk.session import ServerSession from sylk.tls import Certificate, PrivateKey log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) class ACLValidationError(Exception): pass class RoomNotFoundError(Exception): pass class ConferenceApplication(object): __metaclass__ = SylkApplication implements(ISylkApplication, IObserver) __appname__ = 'conference' def __init__(self): self._rooms = {} self.pending_sessions = [] self.invited_participants_map = {} - self.bonjour_services = Null() + self.bonjour_focus_service = Null() + self.bonjour_room_service = Null() self.screen_sharing_web_server = None def start(self): init_database() - settings = SIPSimpleSettings() - if settings.bonjour.enabled or ConferenceConfig.use_bonjour: - self.bonjour_services = BonjourServices('sipfocus') - self.bonjour_services.start() + if ServerConfig.enable_bonjour: + self.bonjour_focus_service = BonjourServices(service='sipfocus') + self.bonjour_focus_service.start() log.msg("Bonjour publication started for service 'sipfocus'") + self.bonjour_room_service = BonjourServices(service='sipuri', name='Conference Room', uri_user='conference') + self.bonjour_room_service.start() + log.msg("Bonjour publication started for service 'sipuri'") self.screen_sharing_web_server = ScreenSharingWebServer(ConferenceConfig.screen_sharing_dir) if ConferenceConfig.screen_sharing_use_https and ConferenceConfig.screen_sharing_certificate is not None: cert = Certificate(ConferenceConfig.screen_sharing_certificate.normalized) key = PrivateKey(ConferenceConfig.screen_sharing_certificate.normalized) credentials = X509Credentials(cert, key) else: credentials = None self.screen_sharing_web_server.start(ConferenceConfig.screen_sharing_ip, ConferenceConfig.screen_sharing_port, credentials) listen_address = self.screen_sharing_web_server.listener.getHost() log.msg("ScreenSharing listener started on %s:%d" % (listen_address.host, listen_address.port)) def stop(self): - self.bonjour_services.stop() + self.bonjour_focus_service.stop() + self.bonjour_room_service.stop() self.screen_sharing_web_server.stop() def get_room(self, uri, create=False): room_uri = '%s@%s' % (uri.user, uri.host) try: room = self._rooms[room_uri] except KeyError: if create: room = Room(room_uri) self._rooms[room_uri] = room return room else: raise RoomNotFoundError else: return room def remove_room(self, uri): room_uri = '%s@%s' % (uri.user, uri.host) self._rooms.pop(room_uri, None) def validate_acl(self, room_uri, from_uri): room_uri = '%s@%s' % (room_uri.user, room_uri.host) cfg = get_room_config(room_uri) if cfg.access_policy == 'allow,deny': if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri): return raise ACLValidationError else: if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri): raise ACLValidationError def incoming_session(self, session): log.msg('New incoming session from %s' % session.remote_identity.uri) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer'] if not audio_streams and not chat_streams and not transfer_streams: session.reject(488) return try: self.validate_acl(session._invitation.request_uri, session.remote_identity.uri) except ACLValidationError: session.reject(403) return # Check if requested files belong to this room for stream in (stream for stream in transfer_streams if stream.direction == 'sendonly'): try: room = self.get_room(session._invitation.request_uri) except RoomNotFoundError: session.reject(404) return try: file = (file for file in room.files if file.hash == stream.file_selector.hash).next() except StopIteration: session.reject(404) return filename = os.path.basename(file.name) for dirpath, dirnames, filenames in os.walk(os.path.join(ConferenceConfig.file_transfer_dir, room.uri)): if filename in filenames: path = os.path.join(dirpath, filename) stream.file_selector.fd = open(path, 'r') if stream.file_selector.size is None: stream.file_selector.size = os.fstat(stream.file_selector.fd.fileno()).st_size if stream.file_selector.type is None: mime_type, encoding = mimetypes.guess_type(filename) if encoding is not None: type = 'application/x-%s' % encoding elif mime_type is not None: type = mime_type else: type = 'application/octet-stream' stream.file_selector.type = type break else: # File got removed from the filesystem session.reject(404) return self.pending_sessions.append(session) notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) if audio_streams: session.send_ring_indication() streams = [streams[0] for streams in (audio_streams, chat_streams, transfer_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: try: self.validate_acl(to_header.uri, from_header.uri) except ACLValidationError: # Check if we need to skip the ACL because this was an invited participant if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})): subscribe_request.reject(403) return try: room = self.get_room(data.request_uri) except RoomNotFoundError: try: room = self.get_room(to_header.uri) except RoomNotFoundError: subscribe_request.reject(480) return if not room.started: subscribe_request.reject(480) else: room.handle_incoming_subscription(subscribe_request, data) def incoming_referral(self, refer_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) refer_to_header = data.headers.get('Refer-To', Null) if Null in (from_header, to_header, refer_to_header): refer_request.reject(400) return try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: refer_request.reject(403) return referral_handler = IncomingReferralHandler(refer_request, data) referral_handler.start() def incoming_sip_message(self, message_request, data): message_request.answer(405) def accept_session(self, session, streams): if session in self.pending_sessions: session.accept(streams, is_focus=True) def add_participant(self, session, room_uri): log.msg('Outgoing session to %s started' % session.remote_identity.uri) # Keep track of the invited participants, we must skip ACL policy # for SUBSCRIBE requests room_uri_str = '%s@%s' % (room_uri.user, room_uri.host) d = self.invited_participants_map.setdefault(room_uri_str, {}) d.setdefault(str(session.remote_identity.uri), 0) d[str(session.remote_identity.uri)] += 1 notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) room = self.get_room(room_uri, True) room.start() room.add_session(session) def remove_participant(self, participant_uri, room_uri): try: room = self.get_room(room_uri) except RoomNotFoundError: pass else: room.terminate_sessions(participant_uri) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender self.pending_sessions.remove(session) room = self.get_room(session._invitation.request_uri, True) # FIXME room.start() room.add_session(session) @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.msg('Session from %s ended' % session.remote_identity.uri) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) if session.direction == 'incoming': room_uri = session._invitation.request_uri # FIXME else: # Clear invited participants mapping room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host) d = self.invited_participants_map[room_uri_str] d[str(session.remote_identity.uri)] -= 1 if d[str(session.remote_identity.uri)] == 0: del d[str(session.remote_identity.uri)] room_uri = session.local_identity.uri # We could get this notifiction even if we didn't get SIPSessionDidStart try: room = self.get_room(room_uri) except RoomNotFoundError: return if session in room.sessions: room.remove_session(session) if not room.stopping and room.empty: self.remove_room(room_uri) room.stop() def _NH_SIPSessionDidFail(self, notification): session = notification.sender self.pending_sessions.remove(session) log.msg('Session from %s failed' % session.remote_identity.uri) class IncomingReferralHandler(object): implements(IObserver) def __init__(self, refer_request, data): self._refer_request = refer_request self._refer_headers = data.headers self.room_uri = data.headers.get('To').uri self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri) self.method = data.headers.get('Refer-To').parameters.get('method', 'invite').lower() self.session = None self.streams = [] def start(self): if not self.refer_to_uri.startswith(('sip:', 'sips:')): self.refer_to_uri = 'sip:%s' % self.refer_to_uri try: self.refer_to_uri = SIPURI.parse(self.refer_to_uri) except SIPCoreError: self._refer_request.reject(488) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._refer_request) if self.method == 'invite': log.msg('%s added %s to %s' % (self._refer_headers.get('From').uri, self.refer_to_uri, self.room_uri)) self._refer_request.accept() settings = SIPSimpleSettings() account = AccountManager().sylkserver_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.refer_to_uri lookup = DNSLookup() notification_center.add_observer(self, sender=lookup) lookup.lookup_sip_proxy(uri, settings.sip.transport_list) elif self.method == 'bye': log.msg('%s removed %s from %s' % (self._refer_headers.get('From').uri, self.refer_to_uri, self.room_uri)) self._refer_request.accept() conference_application = ConferenceApplication() conference_application.remove_participant(self.refer_to_uri, self.room_uri) self._refer_request.end(200) else: self._refer_request.reject(488) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) account = AccountManager().sylkserver_account conference_application = ConferenceApplication() try: room = conference_application.get_room(self.room_uri) except RoomNotFoundError: return else: active_media = room.active_media if not active_media: return if 'audio' in active_media: self.streams.append(AudioStream(account)) if 'chat' in active_media: self.streams.append(ChatStream(account)) self.session = ServerSession(account) notification_center.add_observer(self, sender=self.session) original_from_header = self._refer_headers.get('From') if original_from_header.display_name: original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host) else: original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host) from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call') to_header = ToHeader(self.refer_to_uri) transport = notification.data.result[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters)) extra_headers = [] if self._refer_headers.get('Referred-By', None) is not None: extra_headers.append(Header.new(self._refer_headers.get('Referred-By'))) else: extra_headers.append(Header('Referred-By', str(original_from_header.uri))) if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(original_from_header.uri))) subject = u'Join conference request from %s' % original_identity self.session.connect(from_header, to_header, contact_header=contact_header, routes=notification.data.result, streams=self.streams, is_focus=True, subject=subject, extra_headers=extra_headers) def _NH_DNSLookupDidFail(self, notification): NotificationCenter().remove_observer(self, sender=notification.sender) def _NH_SIPSessionGotRingIndication(self, notification): if self._refer_request is not None: self._refer_request.send_notify(180) def _NH_SIPSessionGotProvisionalResponse(self, notification): if self._refer_request is not None: self._refer_request.send_notify(notification.data.code, notification.data.reason) def _NH_SIPSessionDidStart(self, notification): NotificationCenter().remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) conference_application = ConferenceApplication() conference_application.add_participant(self.session, self.room_uri) self.session = None self.streams = [] def _NH_SIPSessionDidFail(self, notification): NotificationCenter().remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(notification.data.code or 500, notification.data.reason) self.session = None self.streams = [] def _NH_SIPSessionDidEnd(self, notification): # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead NotificationCenter().remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) self.session = None self.streams = [] def _NH_SIPIncomingReferralDidEnd(self, notification): NotificationCenter().remove_observer(self, sender=notification.sender) self._refer_request = None diff --git a/sylk/applications/conference/configuration.py b/sylk/applications/conference/configuration.py index 37f3b39..35c51c7 100644 --- a/sylk/applications/conference/configuration.py +++ b/sylk/applications/conference/configuration.py @@ -1,132 +1,131 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # __all__ = ['ConferenceConfig', 'get_room_config'] import os import re from application.configuration import ConfigFile, ConfigSection, ConfigSetting from application.system import host from sylk.configuration.datatypes import IPAddress, NillablePath, Path, Port, URL # Datatypes class AccessPolicyValue(str): allowed_values = ('allow,deny', 'deny,allow') def __new__(cls, value): value = re.sub('\s', '', value) if value not in cls.allowed_values: raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values)) return str.__new__(cls, value) class Domain(str): domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$") def __new__(cls, value): value = str(value) if not cls.domain_re.match(value): raise ValueError("illegal domain: %s" % value) return str.__new__(cls, value) class SIPAddress(str): def __new__(cls, address): address = str(address) address = address.replace('@', '%40', address.count('@')-1) try: username, domain = address.split('@') Domain(domain) except ValueError: raise ValueError("illegal SIP address: %s, must be in user@domain format" % address) return str.__new__(cls, address) class PolicySettingValue(list): def __init__(self, value): if isinstance(value, (tuple, list)): l = [str(x) for x in value] elif isinstance(value, basestring): if value.lower() in ('none', ''): return list.__init__(self, []) elif value.lower() in ('any', 'all', '*'): return list.__init__(self, ['*']) else: l = re.split(r'\s*,\s*', value) else: raise TypeError("value must be a string, list or tuple") values = [] for item in l: if '@' in item: values.append(SIPAddress(item)) else: values.append(Domain(item)) return list.__init__(self, values) def match(self, uri): if self == ['*']: return True domain = uri.host uri = re.sub('^(sip:|sips:)', '', str(uri)) return uri in self or domain in self class WebURL(str): def __new__(cls, url): url = URL(url) if url.scheme.lower() not in ('http', 'https'): raise ValueError('invalid web URL: %s' % url.original_url) return url.url # Configuration objects class ConferenceConfig(ConfigSection): __cfgfile__ = 'conference.ini' __section__ = 'Conference' db_uri = ConfigSetting(type=str, value='sqlite://'+os.getcwd()+'/var/lib/sylkserver/conference.sqlite') history_table = ConfigSetting(type=str, value='message_history') replay_history = 20 access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny')) allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all')) deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none')) file_transfer_dir = ConfigSetting(type=Path, value=Path('var/spool/sylkserver')) push_file_transfer = False - use_bonjour = False screen_sharing_dir = ConfigSetting(type=Path, value=Path('var/spool/sylkserver/screensharing')) screen_sharing_ip = ConfigSetting(type=IPAddress, value=host.default_ip) screen_sharing_port = ConfigSetting(type=Port, value=0) screen_sharing_use_https = True screen_sharing_certificate = ConfigSetting(type=NillablePath, value=NillablePath('tls/default.crt')) class RoomConfig(ConfigSection): __cfgfile__ = 'conference.ini' access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny')) allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all')) deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none')) class Configuration(object): def __init__(self, data): self.__dict__.update(data) def get_room_config(room): config_file = ConfigFile(RoomConfig.__cfgfile__) section = config_file.get_section(room) if section is not None: RoomConfig.read(section=room) config = Configuration(dict(RoomConfig)) RoomConfig.reset() else: # Apply general policy config = Configuration(dict((attr, getattr(ConferenceConfig, attr)) for attr in ('access_policy', 'allow', 'deny'))) return config diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py index 79bd16b..70c550e 100644 --- a/sylk/applications/conference/room.py +++ b/sylk/applications/conference/room.py @@ -1,1150 +1,1149 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import hashlib import os import random import re import shutil import string import weakref from collections import defaultdict from datetime import datetime from glob import glob from itertools import chain, cycle from time import mktime try: from weakref import WeakSet except ImportError: from backports.weakref import WeakSet from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.system import makedirs from eventlib import api, coros, proc from sipsimple.account import AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import WavePlayer, WavePlayerError from sipsimple.conference import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPCoreError, SIPCoreInvalidStateError, SIPURI from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import conference from sipsimple.streams import FileTransferStream from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.streams.msrp import ChatStreamError, FileSelector from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.conference import database from sylk.applications.conference.configuration import ConferenceConfig, URL from sylk.bonjour import BonjourServices -from sylk.configuration import SIPConfig, ThorNodeConfig +from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.configuration.datatypes import ResourcePath from sylk.session import ServerSession log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) def format_identity(identity, cpim_format=False): uri = identity.uri if identity.display_name: return u'%s ' % (identity.display_name, uri.user, uri.host) elif cpim_format: return u'' % (uri.user, uri.host) else: return u'sip:%s@%s' % (uri.user, uri.host) class ScreenImage(object): def __init__(self, room, sender): self.room = weakref.ref(room) self.sender = sender self.filename = os.path.join(ConferenceConfig.screen_sharing_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10)))) from sylk.applications.conference import ConferenceApplication port = ConferenceApplication().screen_sharing_web_server.port scheme = 'https' if ConferenceConfig.screen_sharing_use_https else 'http' self.url = URL('%s://%s:%s/' % (scheme, ConferenceConfig.screen_sharing_ip, port)) self.url.query_items['image'] = os.path.join(room.uri, os.path.basename(self.filename)) self.state = None self.timer = None @property def active(self): return self.state == 'active' @property def idle(self): return self.state == 'idle' @run_in_thread('file-io') def save(self, image): makedirs(os.path.dirname(self.filename)) tmp_filename = self.filename + '.tmp' try: with open(tmp_filename, 'wb') as file: file.write(image) except EnvironmentError, e: log.msg('Cannot write screen sharing image: %s: %s' % (self.filename, e)) else: try: os.rename(tmp_filename, self.filename) except EnvironmentError: pass self.advertise() @run_in_twisted_thread def advertise(self): if self.state == 'active': self.timer.reset(10) else: if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'active' self.timer = reactor.callLater(10, self.stop_advertising) room = self.room() or Null room.dispatch_conference_info() txt = '%s is sharing the screen at %s' % (format_identity(self.sender, cpim_format=True), self.url) room.dispatch_server_message(txt) log.msg(txt) @run_in_twisted_thread def stop_advertising(self): if self.state != 'idle': if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'idle' self.timer = None room = self.room() or Null room.dispatch_conference_info() txt = '%s stopped sharing the screen' % format_identity(self.sender, cpim_format=True) room.dispatch_server_message(txt) log.msg(txt) class Room(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ implements(IObserver) def __init__(self, uri): self.uri = uri self.identity = CPIMIdentity.parse('' % self.uri) self.identity.display_name = 'Conference Room' self.files = [] self.screen_images = {} self.sessions = [] self.sessions_with_proposals = {} self.subscriptions = [] self.transfer_handlers = WeakSet() self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.moh_player = None self.conference_info_payload = None self.bonjour_services = Null() self.session_nickname_map = {} self.last_nicknames_map = {} self.participants_counter = defaultdict(lambda: 0) @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' @property def stopping(self): return self.state in ('stopping', 'stopped') @property def active_media(self): return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams)))) def start(self): if self.started: return - from sylk.applications.conference import ConferenceApplication - if ConferenceApplication().bonjour_services is not Null: + if ServerConfig.enable_bonjour and self.identity.uri.user != 'conference': room_user = self.identity.uri.user self.bonjour_services = BonjourServices(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user) self.bonjour_services.start() self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.moh_player = MoHPlayer(self.audio_conference) self.moh_player.start() self.state = 'started' def stop(self): if not self.started: return self.state = 'stopping' self.bonjour_services.stop() self.bonjour_services = None self.incoming_message_queue.send_exception(api.GreenletExit) self.incoming_message_queue = None self.message_dispatcher.kill(proc.ProcExit) self.message_dispatcher = None self.moh_player.stop() self.moh_player = None self.audio_conference = None [handler.stop() for handler in self.transfer_handlers] notification_center = NotificationCenter() for subscription in self.subscriptions: notification_center.remove_observer(self, sender=subscription) subscription.end() self.subscriptions = [] self.cleanup_files() self.conference_info_payload = None self.state = 'stopped' @run_in_thread('file-io') def cleanup_files(self): path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass path = os.path.join(ConferenceConfig.screen_sharing_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'message': message = data.message if message.sender.uri != session.remote_identity.uri: return if message.timestamp is not None: value = message.timestamp timestamp = datetime(value.year, value.month, value.day, value.hour, value.minute, value.second, value.microsecond, value.tzinfo) else: timestamp = datetime.utcnow() recipient = message.recipients[0] sender = message.sender sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), sender.display_name) message.sender = sender database.async_save_message(format_identity(session.remote_identity, True), self.uri, message.body, message.content_type, unicode(message.sender), unicode(recipient), timestamp) private = len(message.recipients) == 1 and recipient.uri != self.identity.uri if private: self.dispatch_private_message(session, message) else: self.dispatch_message(session, message) elif message_type == 'composing_indication': if data.sender.uri != session.remote_identity.uri: return recipient = data.recipients[0] private = len(data.recipients) == 1 and recipient.uri != self.identity.uri if private: self.dispatch_private_iscomposing(session, data) else: self.dispatch_iscomposing(session, data) def dispatch_message(self, session, message): for s in (s for s in self.sessions if s is not session): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: pass else: try: chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers) except ChatStreamError, e: log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_private_message(self, session, message): # Private messages are delivered to all sessions matching the recipient but also to the sender, # for replication in clients recipient = message.recipients[0] for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: continue else: try: chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers) except ChatStreamError, e: log.error(u'Error dispatching private message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_iscomposing(self, session, data): for s in (s for s in self.sessions if s is not session): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: pass else: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) try: chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity, recipients=[self.identity]) except ChatStreamError, e: log.error(u'Error dispatching composing indication to %s: %s' % (s.remote_identity.uri, e)) def dispatch_private_iscomposing(self, session, data): recipient_uri = data.recipients[0].uri for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: continue else: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) try: chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity) except ChatStreamError, e: log.error(u'Error dispatching private composing indication to %s: %s' % (s.remote_identity.uri, e)) def dispatch_server_message(self, body, content_type='text/plain', exclude=None): for session in (session for session in self.sessions if session is not exclude): try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: chat_stream.send_message(body, content_type, local_identity=self.identity, recipients=[self.identity]) self_identity = format_identity(self.identity, cpim_format=True) database.async_save_message(self_identity, self.uri, body, content_type, self_identity, self_identity, datetime.now()) def dispatch_conference_info(self): data = self.build_conference_info_payload() for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(conference.ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def dispatch_file(self, file): sender_uri = CPIMIdentity.parse(file.sender).uri for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)): handler = OutgoingFileTransferHandler(self, uri, file) self.transfer_handlers.add(handler) handler.start() def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] += 1 try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = (stream for stream in session.streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) try: transfer_stream = (stream for stream in session.streams if stream.type == 'file-transfer').next() except StopIteration: pass else: if transfer_stream.direction == 'recvonly': transfer_handler = IncomingFileTransferHandler(self, session) transfer_handler.start() txt = u'%s is uploading file %s (%s)' % (format_identity(session.remote_identity, cpim_format=True), transfer_stream.file_selector.name.decode('utf-8'), self.format_file_size(transfer_stream.file_selector.size)) else: transfer_handler = OutgoingFileTransferRequestHandler(self, session) transfer_handler.start() txt = u'%s requested file %s' % (format_identity(session.remote_identity, cpim_format=True), transfer_stream.file_selector.name.decode('utf-8')) log.msg(txt) self.dispatch_server_message(txt) if len(session.streams) == 1: return welcome_handler = WelcomeHandler(self, session) welcome_handler.start() self.dispatch_conference_info() if len(self.sessions) == 1: log.msg(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, self.format_stream_types(session.streams))) else: log.msg(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, self.format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session) def remove_session(self, session): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.session_nickname_map.pop(session, None) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] -= 1 if self.participants_counter[remote_uri] <= 0: del self.participants_counter[remote_uri] self.last_nicknames_map.pop(remote_uri, None) try: timer = self.sessions_with_proposals.pop(session) except KeyError: pass else: if timer.active(): timer.cancel() try: chat_stream = (stream for stream in session.streams or [] if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = (stream for stream in session.streams or [] if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() try: transfer_stream = (stream for stream in session.streams if stream.type == 'file-transfer').next() except StopIteration: pass else: if len(session.streams) == 1: return self.dispatch_conference_info() log.msg(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, self.format_session_duration(session))) if not self.sessions: log.msg(u'Last participant left conference %s' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session))) def terminate_sessions(self, uri): if not self.started: return for session in (session for session in self.sessions if session.remote_identity.uri == uri): session.end() def build_conference_info_payload(self): if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent) host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com')) self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users()) user_count = len(self.participants_counter.keys()) self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True) users = conference.Users() for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')): try: user = (user for user in users if user.entity == str(session.remote_identity.uri)).next() except StopIteration: display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name) if self.bonjour_services is Null: user = conference.User(str(session.remote_identity.uri), display_text=display_text) else: user = conference.User(str(session._invitation.remote_contact_header.uri), display_text=display_text) user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host) screen_image = self.screen_images.get(user_uri, None) if screen_image is not None and screen_image.active: user.screen_image_url = screen_image.url users.add(user) joining_info = conference.JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected') display_text = self.session_nickname_map.get(session, session.remote_identity.display_name) endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status) for stream in session.streams: if stream.type == 'file-transfer': continue endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream))) user.add(endpoint) self.conference_info_payload.users = users if self.files: files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, file.status) for file in self.files) self.conference_info_payload.conference_description.resources = conference.Resources(files=files) return self.conference_info_payload.toxml() def handle_incoming_subscription(self, subscribe_request, data): if subscribe_request.event != 'conference': subscribe_request.reject(489) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=subscribe_request) data = self.build_conference_info_payload() subscribe_request.accept(conference.ConferenceDocument.content_type, data) self.subscriptions.append(subscribe_request) def accept_proposal(self, session, streams): self.sessions_with_proposals.pop(session) session.accept_proposal(streams) def add_file(self, file): if file.status == 'INCOMPLETE': self.dispatch_server_message('%s has cancelled upload of file %s (%s)' % (file.sender, os.path.basename(file.name), self.format_file_size(file.size))) else: self.dispatch_server_message('%s has uploaded file %s (%s)' % (file.sender, os.path.basename(file.name), self.format_file_size(file.size))) self.files.append(file) self.dispatch_conference_info() if ConferenceConfig.push_file_transfer: self.dispatch_file(file) def add_screen_image(self, sender, image): sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host) screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender)) screen_image.save(image) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_AudioStreamDidTimeout(self, notification): stream = notification.sender session = stream._session log.msg(u'Audio stream for session %s timed out' % format_identity(session.remote_identity)) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session message = data.message content_type = message.content_type.lower() if content_type.startswith('text/'): self.incoming_message_queue.send((session, 'message', data)) elif content_type == 'application/blink-screensharing': self.add_screen_image(message.sender, message.body) def _NH_ChatStreamGotComposingIndication(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session self.incoming_message_queue.send((session, 'composing_indication', data)) def _NH_ChatStreamGotNicknameRequest(self, notification): nickname = notification.data.nickname session = notification.sender.session chunk = notification.data.chunk if nickname: if nickname in self.session_nickname_map.values() and self.session_nickname_map[session] != nickname: notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use') return self.session_nickname_map[session] = nickname self.last_nicknames_map[str(session.remote_identity.uri)] = nickname else: self.session_nickname_map.pop(session, None) self.last_nicknames_map.pop(str(session.remote_identity.uri), None) notification.sender.accept_nickname(chunk) self.dispatch_conference_info() def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification_center = NotificationCenter() notification_center.remove_observer(self, sender=subscription) try: self.subscriptions.remove(subscription) except ValueError: pass def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.msg(u'%s has put the audio session on hold' % format_identity(session.remote_identity)) else: log.msg(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity)) self.dispatch_conference_info() def _NH_SIPSessionGotProposal(self, notification): session = notification.sender audio_streams = [stream for stream in notification.data.streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] timer = reactor.callLater(4, self.accept_proposal, session, streams) self.sessions_with_proposals[session] = timer def _NH_SIPSessionGotRejectProposal(self, notification): session = notification.sender try: timer = self.sessions_with_proposals.pop(session) except KeyError: # If the proposal couldn't be accepted by us we will not add a timer pass else: if timer.active(): timer.cancel() def _NH_SIPSessionDidRenegotiateStreams(self, notification): notification_center = NotificationCenter() session = notification.sender streams = notification.data.streams if notification.data.action == 'add': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) log.msg(u'%s has added chat to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) log.msg(u'%s has added audio to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added audio' % format_identity(session.remote_identity), exclude=session) welcome_handler = WelcomeHandler(self, session) welcome_handler.start(welcome_prompt=False) elif notification.data.action == 'remove': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) log.msg(u'%s has removed chat from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() log.msg(u'%s has removed audio from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed audio' % format_identity(session.remote_identity), exclude=session) if not session.streams: log.msg(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri)) session.end() self.dispatch_conference_info() def _NH_SIPSessionTransferNewIncoming(self, notification): notification.sender.reject_transfer(403) @staticmethod def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt @staticmethod def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type @staticmethod def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text @staticmethod def format_file_size(size): infinite = float('infinity') boundaries = [( 1024, '%d bytes', 1), ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] for boundary, format, divisor in boundaries: if size < boundary: return format % (size/divisor,) else: return "%d bytes" % size class MoHPlayer(object): implements(IObserver) def __init__(self, conference): self.conference = conference self.files = None self.paused = None self._player = None def start(self): files = glob('%s/*.wav' % ResourcePath('sounds/moh').normalized) if not files: log.error(u'No files found, MoH is disabled') return random.shuffle(files) self.files = cycle(files) self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_play=False, volume=20) self.paused = True self.conference.bridge.add(self._player) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._player) def stop(self): if self._player is None: return notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._player) self._player.stop() self.paused = True self.conference.bridge.remove(self._player) self.conference = None def play(self): if self._player is not None and self.paused: self.paused = False self._play_next_file() log.msg(u'Started playing music on hold') def pause(self): if self._player is not None and not self.paused: self.paused = True self._player.stop() log.msg(u'Stopped playing music on hold') def _play_next_file(self): self._player.filename = self.files.next() self._player.play() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_WavePlayerDidFail(self, notification): if not self.paused: self._play_next_file() def _NH_WavePlayerDidEnd(self, notification): if not self.paused: self._play_next_file() class InterruptWelcome(Exception): pass class WelcomeHandler(object): implements(IObserver) def __init__(self, room, session): self.room = room self.session = session self.procs = proc.RunningProcSet() @run_in_green_thread def start(self, welcome_prompt=True): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) self.procs.spawn(self.play_audio_welcome, welcome_prompt) self.procs.spawn(self.render_chat_welcome, welcome_prompt) self.procs.waitall() notification_center.remove_observer(self, sender=self.session) self.session = None self.room = None def play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() except WavePlayerError, e: log.warning(u"Error playing file %s: %s" % (file, e)) def play_audio_welcome(self, welcome_prompt): try: audio_stream = (stream for stream in self.session.streams if stream.type == 'audio').next() except StopIteration: return try: player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_play=False, volume=50) audio_stream.bridge.add(player) if welcome_prompt: file = ResourcePath('sounds/co_welcome_conference.wav').normalized self.play_file_in_player(player, file, 1) user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(self.session.remote_identity.uri)])) if user_count == 0: file = ResourcePath('sounds/co_only_one.wav').normalized self.play_file_in_player(player, file, 0.5) elif user_count == 1: file = ResourcePath('sounds/co_there_is.wav').normalized self.play_file_in_player(player, file, 0.5) elif user_count < 100: file = ResourcePath('sounds/co_there_are.wav').normalized self.play_file_in_player(player, file, 0.2) if user_count <= 24: file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized self.play_file_in_player(player, file, 0.1) else: file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized self.play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized self.play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/co_more_participants.wav').normalized self.play_file_in_player(player, file, 0) file = ResourcePath('sounds/connected_tone.wav').normalized self.play_file_in_player(player, file, 0.1) audio_stream.bridge.remove(player) except InterruptWelcome: try: audio_stream.bridge.remove(player) except ValueError: pass else: self.room.audio_conference.add(audio_stream) self.room.audio_conference.unhold() if len(self.room.audio_conference.streams) == 1: self.room.moh_player.play() else: self.room.moh_player.pause() def render_chat_welcome_prompt(self): txt = 'Welcome to the conference.' user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions) - set([str(self.session.remote_identity.uri)])) if user_count == 0: txt += ' You are the first participant in the room.' else: if user_count == 1: txt += ' There is one more participant in the room.' else: txt += ' There are %s more participants in the room.' % user_count return txt def render_chat_welcome(self, welcome_prompt): try: chat_stream = (stream for stream in self.session.streams if stream.type == 'chat').next() except StopIteration: return try: #welcome_prompt = self.render_chat_welcome_prompt() #chat_stream.send_message(welcome_prompt, 'text/plain', local_identity=self.room.identity, recipients=[self.room.identity]) remote_identity = CPIMIdentity.parse(format_identity(self.session.remote_identity, cpim_format=True)) for msg in database.get_last_messages(self.room.uri, ConferenceConfig.replay_history): recipient = CPIMIdentity.parse(msg.cpim_recipient) sender = CPIMIdentity.parse(msg.cpim_sender) if recipient.uri in (self.room.identity.uri, remote_identity.uri) or sender.uri == remote_identity.uri: chat_stream.send_message(msg.cpim_body, msg.cpim_content_type, local_identity=sender, recipients=[recipient], timestamp=msg.cpim_timestamp) except InterruptWelcome: pass def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): self.procs.killall(InterruptWelcome) class RoomFile(object): def __init__(self, name, hash, size, sender, status): self.name = name self.hash = hash self.size = size self.sender = sender self.status = status @property def file_selector(self): return FileSelector.for_file(self.name.encode('utf-8'), hash=self.hash) class IncomingFileTransferHandler(object): implements(IObserver) def __init__(self, room, session): self.room = weakref.ref(room) self.room_uri = room.uri self.session = session self.stream = (stream for stream in self.session.streams if stream.type == 'file-transfer' and stream.direction == 'recvonly').next() self.error = False self.ended = False self.file = None self.file_selector = None self.filename = None self.hash = None self.status = None self.timer = None self.transfer_finished = False def start(self): self.file_selector = self.stream.file_selector path = os.path.join(ConferenceConfig.file_transfer_dir, self.room_uri) makedirs(path) self.filename = filename = os.path.join(path, self.file_selector.name.decode('utf-8')) basename, ext = os.path.splitext(filename) i = 1 while os.path.exists(filename): filename = '%s_%d%s' % (basename, i, ext) i += 1 self.filename = filename try: self.file = open(self.filename, 'wb') except EnvironmentError: log.msg('Cannot write destination filename: %s' % self.filename) self.session.end() return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) self.hash = hashlib.sha1() @run_in_thread('file-transfer') def write_chunk(self, data): notification_center = NotificationCenter() if data is not None: try: self.file.write(data) except EnvironmentError, e: notification_center.post_notification('IncomingFileTransferHandlerGotError', sender=self, data=NotificationData(error=str(e))) else: self.hash.update(data) else: self.file.close() if self.error: notification_center.post_notification('IncomingFileTransferHandlerDidFail', sender=self) else: notification_center.post_notification('IncomingFileTransferHandlerDidEnd', sender=self) @run_in_thread('file-io') def remove_bogus_file(self, filename): try: os.unlink(filename) except OSError: pass @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidEnd(self, notification): self.ended = True if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) # Mark end of write operation self.write_chunk(None) def _NH_FileTransferStreamGotChunk(self, notification): self.write_chunk(notification.data.content) def _NH_FileTransferStreamDidFinish(self, notification): self.transfer_finished = True if self.timer is None: self.timer = reactor.callLater(5, self.session.end) def _NH_IncomingFileTransferHandlerGotError(self, notification): log.error('Error while handling incoming file transfer: %s' % notification.data.error) self.error = True self.status = notification.data.error if not self.ended and self.timer is None: self.timer = reactor.callLater(5, self.session.end) def _NH_IncomingFileTransferHandlerDidEnd(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self) remote_hash = self.file_selector.hash if not self.transfer_finished: log.msg('File transfer of %s cancelled' % os.path.basename(self.filename)) self.remove_bogus_file(self.filename) self.status = 'INCOMPLETE' else: local_hash = 'sha1:' + ':'.join(re.findall(r'..', self.hash.hexdigest().upper())) if local_hash != remote_hash: log.warning('Hash of transferred file does not match the remote hash (file may have changed).') self.status = 'Hash missmatch' self.remove_bogus_file(self.filename) else: self.status = 'OK' file = RoomFile(self.filename, remote_hash, self.file_selector.size, format_identity(self.session.remote_identity, cpim_format=True), self.status) room = self.room() or Null room.add_file(file) self.session = None self.stream = None def _NH_IncomingFileTransferHandlerDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self) file = RoomFile(self.filename, self.file_selector.hash, self.file_selector.size, format_identity(self.session.remote_identity, cpim_format=True), self.status) room = self.room() or Null room.add_file(file) self.session = None self.stream = None class OutgoingFileTransferRequestHandler(object): implements(IObserver) def __init__(self, room, session): self.room = weakref.ref(room) self.session = session self.stream = (stream for stream in self.session.streams if stream.type == 'file-transfer').next() self.timer = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_FileTransferStreamDidFinish(self, notification): if self.timer is None: self.timer = reactor.callLater(2, self.session.end) def _NH_SIPSessionDidEnd(self, notification): if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) self.session = None self.stream = None _NH_SIPSessionDidFail = _NH_SIPSessionDidEnd class InterruptFileTransfer(Exception): pass class OutgoingFileTransferHandler(object): implements(IObserver) def __init__(self, room, destination, file): self.room_uri = room.identity.uri self.destination = destination self.file = file self.session = None self.stream = None self.timer = None @run_in_green_thread def start(self): self.greenlet = api.getcurrent() settings = SIPSimpleSettings() account = AccountManager().sylkserver_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI.new(self.destination) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: return notification_center = NotificationCenter() self.session = ServerSession(account) self.stream = FileTransferStream(account, self.file.file_selector, 'sendonly') notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) subject = u'File uploaded by %s' % self.file.sender from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference File Transfer') to_header = ToHeader(SIPURI.new(self.destination)) transport = routes[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters)) extra_headers = [] if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) originator_uri = CPIMIdentity.parse(self.file.sender).uri extra_headers.append(Header('X-Originator-From', str(originator_uri))) self.session.connect(from_header, to_header, contact_header=contact_header, routes=routes, streams=[self.stream], is_focus=True, subject=subject, extra_headers=extra_headers) def stop(self): if self.session is not None: self.session.end() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_FileTransferStreamDidFinish(self, notification): if self.timer is None: self.timer = reactor.callLater(2, self.session.end) def _NH_SIPSessionDidEnd(self, notification): if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) self.session = None self.stream = None _NH_SIPSessionDidFail = _NH_SIPSessionDidEnd diff --git a/sylk/configuration/__init__.py b/sylk/configuration/__init__.py index f5d98c7..8f09bb5 100644 --- a/sylk/configuration/__init__.py +++ b/sylk/configuration/__init__.py @@ -1,72 +1,73 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # from application.configuration import ConfigSection, ConfigSetting from application.configuration.datatypes import NetworkRangeList, StringList from application.system import host from sipsimple.configuration.datatypes import NonNegativeInteger, SRTPEncryption from sylk import configuration_filename from sylk.configuration.datatypes import AudioCodecs, IPAddress, NillablePath, Path, Port, PortRange, SIPProxyAddress from sylk.tls import Certificate, PrivateKey class ServerConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'Server' ca_file = ConfigSetting(type=NillablePath, value=NillablePath('tls/ca.crt')) certificate = ConfigSetting(type=NillablePath, value=NillablePath('tls/default.crt')) verify_server = False + enable_bonjour = False default_application = 'conference' application_map = ConfigSetting(type=StringList, value='') disabled_applications = ConfigSetting(type=StringList, value='') resources_dir = ConfigSetting(type=Path, value=None) trace_dir = ConfigSetting(type=Path, value=Path('var/log/sylkserver')) trace_core = False trace_sip = False trace_msrp = False trace_notifications = False class SIPConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'SIP' local_ip = ConfigSetting(type=IPAddress, value=host.default_ip) local_udp_port = ConfigSetting(type=Port, value=5060) local_tcp_port = ConfigSetting(type=Port, value=5060) local_tls_port = ConfigSetting(type=Port, value=5061) outbound_proxy = ConfigSetting(type=SIPProxyAddress, value=None) trusted_peers = ConfigSetting(type=NetworkRangeList, value=NetworkRangeList('any')) class MSRPConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'MSRP' use_tls = True class RTPConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'RTP' audio_codecs = ConfigSetting(type=AudioCodecs, value=None) port_range = ConfigSetting(type=PortRange, value=PortRange('50000:50500')) srtp_encryption = ConfigSetting(type=SRTPEncryption, value='optional') timeout = ConfigSetting(type=NonNegativeInteger, value=30) class ThorNodeConfig(ConfigSection): __cfgfile__ = configuration_filename __section__ = 'ThorNetwork' enabled = False domain = "sipthor.net" multiply = 1000 certificate = ConfigSetting(type=Certificate, value=None) private_key = ConfigSetting(type=PrivateKey, value=None) ca = ConfigSetting(type=Certificate, value=None) diff --git a/sylk/configuration/settings.py b/sylk/configuration/settings.py index f6548de..48f3742 100644 --- a/sylk/configuration/settings.py +++ b/sylk/configuration/settings.py @@ -1,125 +1,120 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # """ SIP SIMPLE SDK settings extensions. """ __all__ = ['AccountExtension', 'BonjourAccountExtension', 'SylkServerSettingsExtension'] from sipsimple.account import MSRPSettings as AccountMSRPSettings, NATTraversalSettings as AccountNATTraversalSettings from sipsimple.account import RTPSettings as AccountRTPSettings, SIPSettings as AccountSIPSettings, TLSSettings as AccountTLSSettings from sipsimple.configuration import CorrelatedSetting, Setting, SettingsGroup, SettingsObjectExtension from sipsimple.configuration.datatypes import MSRPConnectionModel, MSRPTransport, NonNegativeInteger, PortRange, SampleRate, SIPTransportList, SRTPEncryption from sipsimple.configuration.settings import AudioSettings, LogsSettings, RTPSettings, SIPSettings, TLSSettings from sylk import __version__ as server_version from sylk.configuration import ServerConfig, SIPConfig, MSRPConfig, RTPConfig from sylk.configuration.datatypes import AudioCodecs, NillablePath, Path, Port, SIPProxyAddress # Account settings extensions class AccountMSRPSettingsExtension(AccountMSRPSettings): transport = Setting(type=MSRPTransport, default='tls' if MSRPConfig.use_tls else 'tcp') connection_model = Setting(type=MSRPConnectionModel, default='acm') class AccountNATTraversalSettingsExtension(AccountNATTraversalSettings): use_msrp_relay_for_inbound = Setting(type=bool, default=False) use_msrp_relay_for_outbound = Setting(type=bool, default=False) class AccountRTPSettingsExtension(AccountRTPSettings): audio_codec_list = Setting(type=AudioCodecs, default=RTPConfig.audio_codecs, nillable=True) srtp_encryption = Setting(type=SRTPEncryption, default=RTPConfig.srtp_encryption) use_srtp_without_tls = Setting(type=bool, default=True) class AccountSIPSettingsExtension(AccountSIPSettings): register = Setting(type=bool, default=False) outbound_proxy = Setting(type=SIPProxyAddress, default=SIPConfig.outbound_proxy, nillable=True) class AccountTLSSettingsExtension(AccountTLSSettings): certificate = Setting(type=NillablePath, default=ServerConfig.certificate, nillable=True) verify_server = Setting(type=bool, default=ServerConfig.verify_server) class AccountExtension(SettingsObjectExtension): enabled = Setting(type=bool, default=True) msrp = AccountMSRPSettingsExtension nat_traversal = AccountNATTraversalSettingsExtension rtp = AccountRTPSettingsExtension sip = AccountSIPSettingsExtension tls = AccountTLSSettingsExtension class BonjourAccountExtension(SettingsObjectExtension): enabled = Setting(type=bool, default=False) # General settings extensions class AudioSettingsExtension(AudioSettings): input_device = Setting(type=str, default=None, nillable=True) output_device = Setting(type=str, default=None, nillable=True) sample_rate = Setting(type=SampleRate, default=16000) -class BonjourServices(SettingsGroup): - enabled = Setting(type=bool, default=False) - - class LogsSettingsExtension(LogsSettings): directory = Setting(type=Path, default=ServerConfig.trace_dir) trace_sip = Setting(type=bool, default=ServerConfig.trace_sip) trace_msrp = Setting(type=bool, default=ServerConfig.trace_msrp) trace_pjsip = Setting(type=bool, default=ServerConfig.trace_core) trace_notifications = Setting(type=bool, default=ServerConfig.trace_notifications) class RTPSettingsExtension(RTPSettings): port_range = Setting(type=PortRange, default=PortRange(RTPConfig.port_range.start, RTPConfig.port_range.end)) timeout = Setting(type=NonNegativeInteger, default=RTPConfig.timeout) def sip_port_validator(port, sibling_port): if port == sibling_port != 0: raise ValueError("the TCP and TLS ports must be different") transport_list = [] if SIPConfig.local_udp_port is not None: transport_list.append('udp') if SIPConfig.local_tcp_port is not None: transport_list.append('tcp') if SIPConfig.local_tls_port is not None: transport_list.append('tls') udp_port = SIPConfig.local_udp_port or 0 tcp_port = SIPConfig.local_tcp_port or 0 tls_port = SIPConfig.local_tls_port or 0 class SIPSettingsExtension(SIPSettings): udp_port = Setting(type=Port, default=udp_port) tcp_port = CorrelatedSetting(type=Port, sibling='tls_port', validator=sip_port_validator, default=tcp_port) tls_port = CorrelatedSetting(type=Port, sibling='tcp_port', validator=sip_port_validator, default=tls_port) transport_list = Setting(type=SIPTransportList, default=transport_list) class TLSSettingsExtension(TLSSettings): ca_list = Setting(type=NillablePath, default=ServerConfig.ca_file, nillable=True) class SylkServerSettingsExtension(SettingsObjectExtension): user_agent = Setting(type=str, default='SylkServer-%s' % server_version) audio = AudioSettingsExtension - bonjour = BonjourServices logs = LogsSettingsExtension rtp = RTPSettingsExtension sip = SIPSettingsExtension tls = TLSSettingsExtension diff --git a/sylk/server.py b/sylk/server.py index 685750e..55fd01e 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,240 +1,239 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # from __future__ import with_statement import sys from threading import Event from application import log from application.notification import NotificationCenter, NotificationData from eventlib import api, proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration import ConfigurationError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer, Engine, SIPCoreError from sipsimple.lookup import DNSManager from sipsimple.storage import MemoryStorage from sipsimple.threading import ThreadManager from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor # Load extensions needed for integration with SIP SIMPLE SDK import sylk.extensions from sylk.applications import IncomingRequestHandler -from sylk.configuration import SIPConfig, ThorNodeConfig +from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import Logger from sylk.session import SessionManager class SylkServer(SIPApplication): def __init__(self): self.logger = None self.request_handler = None self.stop_event = Event() def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, name='ThorNetworkGotFatalError') self.logger = Logger() Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: SIPApplication.start(self, MemoryStorage()) except ConfigurationError, e: log.fatal("Error loading configuration: ",e) sys.exit(1) def _load_configuration(self): - # Command line options - settings = SIPSimpleSettings() - settings.bonjour.enabled = '--use-bonjour' in sys.argv # Horrible hack, I know + if '--enable-bonjour' in sys.argv: + ServerConfig.enable_bonjour = True account_manager = AccountManager() account = Account("account@example.com") # an account is required by AccountManager # Disable MSRP ACM if we are using Bonjour - account.msrp.connection_model = 'relay' if settings.bonjour.enabled else 'acm' + account.msrp.connection_model = 'relay' if ServerConfig.enable_bonjour else 'acm' account.save() account_manager.sylkserver_account = account @run_in_green_thread def _initialize_subsystems(self): account_manager = AccountManager() engine = Engine() notification_center = NotificationCenter() session_manager = SessionManager() settings = SIPSimpleSettings() self._load_configuration() notification_center.post_notification('SIPApplicationWillStart', sender=self) if self.state == 'stopping': reactor.stop() return account = account_manager.sylkserver_account # initialize core notification_center.add_observer(self, sender=engine) options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP detect_sip_loops=False, udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_protocol='TLSv1', tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, tls_timeout=3000, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # logging log_level=settings.logs.pjsip_level, trace_sip=True, # events and requests to handle events={'conference': ['application/conference-info+xml'], 'presence': ['application/pidf+xml'], 'refer': ['message/sipfrag;version=2.0']}, incoming_events=set(['conference', 'presence']), incoming_requests=set(['MESSAGE']) ) try: engine.start(**options) except SIPCoreError: self.end_reason = 'engine failed' reactor.stop() return # initialize TLS try: engine.set_tls_options(port=settings.sip.tls_port if 'tls' in settings.sip.transport_list else None, protocol=settings.tls.protocol, verify_server=account.tls.verify_server, ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None, cert_file=account.tls.certificate.normalized if account.tls.certificate else None, privkey_file=account.tls.certificate.normalized if account.tls.certificate else None, timeout=settings.tls.timeout) except Exception, e: notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e)) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize middleware components account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self) @run_in_green_thread def _shutdown_subsystems(self): # cleanup internals if self._wakeup_timer is not None and self._wakeup_timer.active(): self._wakeup_timer.cancel() self._wakeup_timer = None # shutdown SIPThor interface sipthor_proc = proc.spawn(self._stop_sipthor) sipthor_proc.wait() # shutdown middleware components dns_manager = DNSManager() account_manager = AccountManager() session_manager = SessionManager() procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(session_manager.stop)] proc.waitall(procs) # shutdown engine engine = Engine() engine.stop() # TODO: timeout should be removed when the Engine is fixed so that it never hangs. -Saul try: with api.timeout(15): while True: notification = self._channel.wait() if notification.name == 'SIPEngineDidEnd': break except api.TimeoutError: pass # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _start_sipthor(self): if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode ConferenceNode() def _stop_sipthor(self): if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode ConferenceNode().stop() def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal("Couldn't set TLS options: %s" % notification.data.error) def _NH_SIPApplicationWillStart(self, notification): self.logger.start() settings = SIPSimpleSettings() if settings.logs.trace_sip and self.logger._siptrace_filename is not None: log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename) if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None: log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename) if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None: log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename) if settings.logs.trace_notifications and self.logger._notifications_filename is not None: log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename) def _NH_SIPApplicationDidStart(self, notification): engine = Engine() settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.msg("SylkServer started, listening on:") for transport in settings.sip.transport_list: try: log.msg("%s:%d (%s)" % (local_ip, getattr(engine, '%s_port' % transport), transport.upper())) except TypeError: pass # Start request handler self.request_handler = IncomingRequestHandler() self.request_handler.start() # Start SIPThor interface proc.spawn(self._start_sipthor) def _NH_SIPApplicationWillEnd(self, notification): self.request_handler.stop() def _NH_SIPApplicationDidEnd(self, notification): self.logger.stop() self.stop_event.set() def _NH_SIPEngineGotException(self, notification): log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback) def _NH_ThorNetworkGotFatalError(self, notification): self.stop()