diff --git a/sylk/extensions.py b/sylk/extensions.py index 4e68969..986a11e 100644 --- a/sylk/extensions.py +++ b/sylk/extensions.py @@ -1,135 +1,204 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import random from datetime import datetime from dateutil.tz import tzlocal from application.notification import NotificationCenter +from eventlet import api from msrplib.protocol import URI from msrplib.session import contains_mime_type +from msrplib.connect import DirectConnector, DirectAcceptor, RelayConnection, MSRPRelaySettings from sipsimple.account import AccountManager from sipsimple.core import SDPAttribute from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage, State, LastActive, Refresh, ContentType from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError -from sipsimple.streams.msrp import ChatStream as _ChatStream, ChatStreamError, MSRPStreamBase +from sipsimple.streams.msrp import ChatStreamError, MSRPStreamError, NotificationProxyLogger +from sipsimple.streams.msrp import ChatStream as _ChatStream, MSRPStreamBase as _MSRPStreamBase +from sipsimple.threading.green import run_in_green_thread from sipsimple.util import TimestampedNotificationData +from twisted.python.failure import Failure from sylk.configuration import SIPConfig from sylk.session import ServerSession # We need to match on the only account that will be available def _always_find_default_account(self, contact_uri): return self.default_account AccountManager.find_account = _always_find_default_account # Patch sipsimple.session to use ServerSession instead import sipsimple.session sipsimple.session.Session = ServerSession # We need to be able to set the local identity in the message CPIM envelope # so that messages appear to be coming from the users themselves, instead of # just seeying the server identity registry = MediaStreamRegistry() for stream_type in registry.stream_types[:]: if stream_type is _ChatStream: registry.stream_types.remove(stream_type) break del registry -class ChatStream(_ChatStream): +class MSRPStreamBase(_MSRPStreamBase): + @run_in_green_thread + def initialize(self, session, direction): + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=self) + try: + self.session = session + self.transport = self.account.msrp.transport + outgoing = direction=='outgoing' + logger = NotificationProxyLogger() + if self.account.msrp.connection_model == 'relay': + if not outgoing and self.remote_role in ('actpass', 'passive'): + # 'passive' not allowed by the RFC but play nice for interoperability. -Saul + self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) + self.local_role = 'active' + elif not outgoing and not self.account.nat_traversal.use_msrp_relay_for_inbound: + if self.transport=='tls' and None in (self.account.tls_credentials.cert, self.account.tls_credentials.key): + raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") + self.msrp_connector = DirectAcceptor(logger=logger) + self.local_role = 'passive' + elif outgoing and not self.account.nat_traversal.use_msrp_relay_for_outbound: + self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) + self.local_role = 'active' + else: + if self.account.nat_traversal.msrp_relay is None: + relay_host = relay_port = None + else: + if self.transport != self.account.nat_traversal.msrp_relay.transport: + raise MSRPStreamError("MSRP relay transport conflicts with MSRP transport setting") + relay_host = self.account.nat_traversal.msrp_relay.host + relay_port = self.account.nat_traversal.msrp_relay.port + relay = MSRPRelaySettings(domain=self.account.uri.host, + username=self.account.uri.user, + password=self.account.credentials.password, + host=relay_host, + port=relay_port, + use_tls=self.transport=='tls') + self.msrp_connector = RelayConnection(relay, 'passive', logger=logger, use_sessmatch=True) + self.local_role = 'actpass' if outgoing else 'passive' + else: + if not outgoing and self.remote_role in ('actpass', 'passive'): + # 'passive' not allowed by the RFC but play nice for interoperability. -Saul + self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) + self.local_role = 'active' + else: + if not outgoing and self.transport=='tls' and None in (self.account.tls_credentials.cert, self.account.tls_credentials.key): + raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") + self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True) + self.local_role = 'actpass' if outgoing else 'passive' + full_local_path = self.msrp_connector.prepare(self.local_uri) + self.local_media = self._create_local_media(full_local_path) + except api.GreenletExit: + raise + except Exception, ex: + ndata = TimestampedNotificationData(context='initialize', failure=Failure(), reason=str(ex)) + notification_center.post_notification('MediaStreamDidFail', self, ndata) + else: + notification_center.post_notification('MediaStreamDidInitialize', self, data=TimestampedNotificationData()) + finally: + if self.msrp_session is None and self.msrp is None and self.msrp_connector is None: + notification_center.remove_observer(self, sender=self) + self.greenlet = None + +class ChatStream(_ChatStream, MSRPStreamBase): accept_types = ['message/cpim'] accept_wrapped_types = ['*'] chatroom_capabilities = ['private-messages', 'com.ag-projects.screen-sharing'] @property def local_uri(self): return URI(host=SIPConfig.local_ip, port=0, use_tls=self.transport=='tls', credentials=self.account.tls_credentials) def _create_local_media(self, uri_path): local_media = MSRPStreamBase._create_local_media(self, uri_path) if self.session.local_focus and self.chatroom_capabilities: local_media.attributes.append(SDPAttribute('chatroom', ' '.join(self.chatroom_capabilities))) return local_media def _handle_SEND(self, chunk): # This ChatStream doesn't send MSRP REPORT chunks automatically, the developer needs to manually send them if self.direction=='sendonly': self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if not chunk.data: self.msrp_session.send_report(chunk, 200, 'OK') return if chunk.segment is not None: self.incoming_queue.setdefault(chunk.message_id, []).append(chunk.data) if chunk.final: chunk.data = ''.join(self.incoming_queue.pop(chunk.message_id)) else: self.msrp_session.send_report(chunk, 200, 'OK') return if chunk.content_type.lower() == 'message/cpim': try: message = CPIMMessage.parse(chunk.data) except CPIMParserError: self.msrp_session.send_report(chunk, 400, 'CPIM Parser Error') return else: if message.timestamp is None: message.timestamp = datetime.now(tzlocal()) if message.sender is None: message.sender = self.remote_identity private = self.session.remote_focus and len(message.recipients) == 1 and message.recipients[0] != self.remote_identity else: self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') return # TODO: check wrapped content-type and issue a report if it's invalid notification_center = NotificationCenter() if message.content_type.lower() == IsComposingDocument.content_type: data = IsComposingDocument.parse(message.body) ndata = TimestampedNotificationData(state=data.state.value, refresh=data.refresh.value if data.refresh is not None else None, content_type=data.content_type.value if data.content_type is not None else None, last_active=data.last_active.value if data.last_active is not None else None, sender=message.sender, recipients=message.recipients, private=private, chunk=chunk) notification_center.post_notification('ChatStreamGotComposingIndication', self, ndata) else: notification_center.post_notification('ChatStreamGotMessage', self, TimestampedNotificationData(message=message, private=private, chunk=chunk)) def send_message(self, content, content_type='text/plain', local_identity=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None, message_id=None, notify_progress=True, success_report='yes', failure_report='yes'): if self.direction=='recvonly': raise ChatStreamError('Cannot send message on recvonly stream') if message_id is None: message_id = '%x' % random.getrandbits(64) if not contains_mime_type(self.accept_wrapped_types, content_type): raise ChatStreamError('Invalid content_type for outgoing message: %r' % content_type) if not recipients: recipients = [self.remote_identity] if timestamp is None: timestamp = datetime.now() # Only use CPIM, it's the only type we accept msg = CPIMMessage(content, content_type, sender=local_identity or self.local_identity, recipients=recipients, courtesy_recipients=courtesy_recipients, subject=subject, timestamp=timestamp, required=required, additional_headers=additional_headers) self._enqueue_message(str(message_id), str(msg), 'message/cpim', failure_report=failure_report, success_report=success_report, notify_progress=notify_progress) return message_id def send_composing_indication(self, state, refresh, last_active=None, recipients=None, local_identity=None, message_id=None, notify_progress=False, success_report='no', failure_report='partial'): if self.direction == 'recvonly': raise ChatStreamError('Cannot send message on recvonly stream') if state not in ('active', 'idle'): raise ValueError('Invalid value for composing indication state') if message_id is None: message_id = '%x' % random.getrandbits(64) content = IsComposingMessage(state=State(state), refresh=Refresh(refresh), last_active=LastActive(last_active or datetime.now()), content_type=ContentType('text')).toxml() if recipients is None: recipients = [self.remote_identity] # Only use CPIM, it's the only type we accept msg = CPIMMessage(content, IsComposingDocument.content_type, sender=local_identity or self.local_identity, recipients=recipients, timestamp=datetime.now()) self._enqueue_message(str(message_id), str(msg), 'message/cpim', failure_report='partial', success_report='no') return message_id diff --git a/sylk/server.py b/sylk/server.py index d1b60d5..37ff767 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,237 +1,240 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # from __future__ import with_statement import sys from threading import Event from application import log from application.notification import NotificationCenter from eventlet import api, proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration import ConfigurationError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer, Engine, SIPCoreError from sipsimple.lookup import DNSManager from sipsimple.session import SessionManager from sipsimple.storage import MemoryStorage from sipsimple.threading import ThreadManager from sipsimple.threading.green import run_in_green_thread from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from sylk.applications import IncomingRequestHandler from sylk.configuration import SIPConfig, ThorNodeConfig from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import Logger # Load extensions needed for integration with SIP SIMPLE SDK import sylk.extensions class SylkServer(SIPApplication): def __init__(self): self.logger = None self.request_handler = None self.stop_event = Event() def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, name='ThorNetworkGotFatalError') self.logger = Logger() Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: SIPApplication.start(self, MemoryStorage()) except ConfigurationError, e: log.fatal("Error loading configuration: ",e) sys.exit(1) def _load_configuration(self): + # Command line options + settings = SIPSimpleSettings() + settings.bonjour.enabled = '--use-bonjour' in sys.argv # Horrible hack, I know account_manager = AccountManager() account = Account("account@example.com") # an account is required by AccountManager + # Disable MSRP ACM if we are using Bonjour + account.msrp.connection_model = 'relay' if settings.bonjour.enabled else 'acm' account.save() account_manager.default_account = account @run_in_green_thread def _initialize_subsystems(self): account_manager = AccountManager() engine = Engine() notification_center = NotificationCenter() session_manager = SessionManager() settings = SIPSimpleSettings() self._load_configuration() notification_center.post_notification('SIPApplicationWillStart', sender=self, data=TimestampedNotificationData()) if self.state == 'stopping': reactor.stop() return account = account_manager.default_account # initialize core notification_center.add_observer(self, sender=engine) options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_protocol='TLSv1', tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, tls_timeout=3000, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # logging log_level=settings.logs.pjsip_level, trace_sip=True, # events and requests to handle events={'conference': ['application/conference-info+xml'], 'presence': ['application/pidf+xml'], 'refer': ['message/sipfrag;version=2.0']}, incoming_events=set(['conference', 'presence']), incoming_requests=set(['MESSAGE']) ) try: engine.start(**options) except SIPCoreError: self.end_reason = 'engine failed' reactor.stop() return # initialize TLS try: engine.set_tls_options(port=settings.sip.tls_port if 'tls' in settings.sip.transport_list else None, protocol=settings.tls.protocol, verify_server=account.tls.verify_server if account else False, ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None, cert_file=account.tls.certificate.normalized if account and account.tls.certificate else None, privkey_file=account.tls.certificate.normalized if account and account.tls.certificate else None, timeout=settings.tls.timeout) except Exception, e: notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=TimestampedNotificationData(error=e)) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize middleware components account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self, data=TimestampedNotificationData()) @run_in_green_thread def _shutdown_subsystems(self): # cleanup internals if self._wakeup_timer is not None and self._wakeup_timer.active(): self._wakeup_timer.cancel() self._wakeup_timer = None # shutdown SIPThor interface sipthor_proc = proc.spawn(self._stop_sipthor) sipthor_proc.wait() # shutdown middleware components dns_manager = DNSManager() account_manager = AccountManager() session_manager = SessionManager() procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(session_manager.stop)] proc.waitall(procs) # shutdown engine engine = Engine() engine.stop() # TODO: timeout should be removed when the Engine is fixed so that it never hangs. -Saul try: with api.timeout(15): while True: notification = self._channel.wait() if notification.name == 'SIPEngineDidEnd': break except api.TimeoutError: pass # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _start_sipthor(self): if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode ConferenceNode() def _stop_sipthor(self): if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode ConferenceNode().stop() def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal("Couldn't set TLS options: %s" % notification.data.error) def _NH_SIPApplicationWillStart(self, notification): self.logger.start() settings = SIPSimpleSettings() if settings.logs.trace_sip and self.logger._siptrace_filename is not None: log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename) if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None: log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename) if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None: log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename) if settings.logs.trace_notifications and self.logger._notifications_filename is not None: log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename) - # Command line options - settings.bonjour.enabled = '--use-bonjour' in sys.argv # Horrible hack, I know def _NH_SIPApplicationDidStart(self, notification): engine = Engine() settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.msg("SylkServer started, listening on:") for transport in settings.sip.transport_list: try: log.msg("%s:%d (%s)" % (local_ip, getattr(engine, '%s_port' % transport), transport.upper())) except TypeError: pass # Start request handler self.request_handler = IncomingRequestHandler() self.request_handler.start() # Start SIPThor interface proc.spawn(self._start_sipthor) def _NH_SIPApplicationWillEnd(self, notification): self.request_handler.stop() def _NH_SIPApplicationDidEnd(self, notification): self.logger.stop() self.stop_event.set() def _NH_SIPEngineGotException(self, notification): log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback) def _NH_ThorNetworkGotFatalError(self, notification): self.stop()