diff --git a/sipsimple/account/__init__.py b/sipsimple/account/__init__.py index cb8e2199..9ac862eb 100644 --- a/sipsimple/account/__init__.py +++ b/sipsimple/account/__init__.py @@ -1,881 +1,881 @@ """ Implements a SIP Account management system that allows the definition of multiple SIP accounts and their properties. """ __all__ = ['Account', 'BonjourAccount', 'AccountManager'] from itertools import chain from threading import Lock from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import classproperty from application.python.types import Singleton from application.system import host as Host from eventlib import coros, proc from gnutls.crypto import X509Certificate, X509PrivateKey from gnutls.errors import GNUTLSError from gnutls.interfaces.twisted import X509Credentials from zope.interface import implementer from sipsimple.account.bonjour import BonjourServices, _bonjour from sipsimple.account.publication import PresencePublisher, DialogPublisher from sipsimple.account.registration import Registrar from sipsimple.account.subscription import MWISubscriber, PresenceWinfoSubscriber, DialogWinfoSubscriber, PresenceSubscriber, SelfPresenceSubscriber, DialogSubscriber from sipsimple.account.xcap import XCAPManager from sipsimple.core import Credentials, SIPURI, ContactURIFactory from sipsimple.configuration import ConfigurationManager, Setting, SettingsGroup, SettingsObject, SettingsObjectID from sipsimple.configuration.datatypes import AudioCodecList, MSRPConnectionModel, MSRPRelayAddress, MSRPTransport, NonNegativeInteger, Path, SIPAddress, SIPProxyAddress, SRTPKeyNegotiation, STUNServerAddressList, VideoCodecList, XCAPRoot from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.payloads import ParserError from sipsimple.payloads.messagesummary import MessageSummary from sipsimple.payloads.pidf import PIDFDocument from sipsimple.payloads.rlsnotify import RLSNotify from sipsimple.payloads.watcherinfo import WatcherInfoDocument from sipsimple.threading import call_in_thread from sipsimple.threading.green import call_in_green_thread, run_in_green_thread from sipsimple.util import user_info, execute_once class AuthSettings(SettingsGroup): username = Setting(type=str, default=None, nillable=True) password = Setting(type=str, default='') class SIPSettings(SettingsGroup): always_use_my_proxy = Setting(type=bool, default=True) outbound_proxy = Setting(type=SIPProxyAddress, default=None, nillable=True) register = Setting(type=bool, default=True) register_interval = Setting(type=NonNegativeInteger, default=600) subscribe_interval = Setting(type=NonNegativeInteger, default=600) publish_interval = Setting(type=NonNegativeInteger, default=600) tls_name = Setting(type=str, default=None, nillable=True) class SRTPEncryptionSettings(SettingsGroup): enabled = Setting(type=bool, default=True) key_negotiation = Setting(type=SRTPKeyNegotiation, default='opportunistic') class RTPSettings(SettingsGroup): audio_codec_list = Setting(type=AudioCodecList, default=None, nillable=True) video_codec_list = Setting(type=VideoCodecList, default=None, nillable=True) encryption = SRTPEncryptionSettings class NATTraversalSettings(SettingsGroup): use_ice = Setting(type=bool, default=False) stun_server_list = Setting(type=STUNServerAddressList, default=None, nillable=True) msrp_relay = Setting(type=MSRPRelayAddress, default=None, nillable=True) use_msrp_relay_for_outbound = Setting(type=bool, default=False) class MessageSummarySettings(SettingsGroup): enabled = Setting(type=bool, default=False) voicemail_uri = Setting(type=SIPAddress, default=None, nillable=True) class XCAPSettings(SettingsGroup): enabled = Setting(type=bool, default=False) discovered = Setting(type=bool, default=False) xcap_root = Setting(type=XCAPRoot, default=None, nillable=True) xcap_diff = Setting(type=bool, default=True) class PresenceSettings(SettingsGroup): enabled = Setting(type=bool, default=False) class MSRPSettings(SettingsGroup): transport = Setting(type=MSRPTransport, default='tls') connection_model = Setting(type=MSRPConnectionModel, default='relay') @implementer(IObserver) class Account(SettingsObject): """ Object representing a SIP account. Contains configuration settings and attributes for accessing SIP related objects. When the account is active, it will register, publish its presence and subscribe to watcher-info events depending on its settings. If the object is un-pickled and its enabled flag was set, it will automatically activate. When the save method is called, depending on the value of the enabled flag, the account will activate/deactivate. Notifications sent by instances of Account: * CFGSettingsObjectWasCreated * CFGSettingsObjectWasActivated * CFGSettingsObjectWasDeleted * CFGSettingsObjectDidChange * SIPAccountWillActivate * SIPAccountDidActivate * SIPAccountWillDeactivate * SIPAccountDidDeactivate """ __group__ = 'Accounts' __id__ = SettingsObjectID(type=SIPAddress) id = __id__ enabled = Setting(type=bool, default=False) display_name = Setting(type=str, default=None, nillable=True) auth = AuthSettings sip = SIPSettings rtp = RTPSettings nat_traversal = NATTraversalSettings message_summary = MessageSummarySettings msrp = MSRPSettings presence = PresenceSettings xcap = XCAPSettings def __new__(cls, id): #with AccountManager.load.lock: # if not AccountManager.load.called: # raise RuntimeError("cannot instantiate %s before calling AccountManager.load" % cls.__name__) return SettingsObject.__new__(cls, id) def __init__(self, id): self.contact = ContactURIFactory() self.xcap_manager = XCAPManager(self) self._started = False self._deleted = False self._active = False self._activation_lock = coros.Semaphore(1) self._registrar = Registrar(self) self._mwi_subscriber = MWISubscriber(self) self._pwi_subscriber = PresenceWinfoSubscriber(self) self._dwi_subscriber = DialogWinfoSubscriber(self) self._presence_subscriber = PresenceSubscriber(self) self._self_presence_subscriber = SelfPresenceSubscriber(self) self._dialog_subscriber = DialogSubscriber(self) self._presence_publisher = PresencePublisher(self) self._dialog_publisher = DialogPublisher(self) self._mwi_voicemail_uri = None self._pwi_version = None self._dwi_version = None self._presence_version = None self._dialog_version = None self.trusted_cas = [] self.ca_list = None def start(self): if self._started or self._deleted: return self._started = True notification_center = NotificationCenter() notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self) notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) notification_center.add_observer(self, name='XCAPManagerDidDiscoverServerCapabilities', sender=self.xcap_manager) notification_center.add_observer(self, sender=self._mwi_subscriber) notification_center.add_observer(self, sender=self._pwi_subscriber) notification_center.add_observer(self, sender=self._dwi_subscriber) notification_center.add_observer(self, sender=self._presence_subscriber) notification_center.add_observer(self, sender=self._self_presence_subscriber) notification_center.add_observer(self, sender=self._dialog_subscriber) self.xcap_manager.init() if self.enabled: self._activate() def stop(self): if not self._started: return self._started = False self._deactivate() notification_center = NotificationCenter() notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self) notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) notification_center.remove_observer(self, name='XCAPManagerDidDiscoverServerCapabilities', sender=self.xcap_manager) notification_center.remove_observer(self, sender=self._mwi_subscriber) notification_center.remove_observer(self, sender=self._pwi_subscriber) notification_center.remove_observer(self, sender=self._dwi_subscriber) notification_center.remove_observer(self, sender=self._presence_subscriber) notification_center.remove_observer(self, sender=self._self_presence_subscriber) notification_center.remove_observer(self, sender=self._dialog_subscriber) def delete(self): if self._deleted: return self._deleted = True self.stop() self._registrar = None self._mwi_subscriber = None self._pwi_subscriber = None self._dwi_subscriber = None self._presence_subscriber = None self._self_presence_subscriber = None self._dialog_subscriber = None self._presence_publisher = None self._dialog_publisher = None self.xcap_manager = None SettingsObject.delete(self) @run_in_green_thread def reregister(self): if self._started: self._registrar.reregister() @run_in_green_thread def resubscribe(self): if self._started: self._mwi_subscriber.resubscribe() self._pwi_subscriber.resubscribe() self._dwi_subscriber.resubscribe() self._presence_subscriber.resubscribe() self._self_presence_subscriber.resubscribe() self._dialog_subscriber.resubscribe() @property def credentials(self): username = self.auth.username or self.id.username username = username.encode() if username else None password = self.auth.password.encode() if self.auth.password else None return Credentials(username, password) @property def registered(self): try: return self._registrar.registered except AttributeError: return False @property def mwi_active(self): try: return self._mwi_subscriber.subscribed except AttributeError: return False @property def tls_credentials(self): # This property can be optimized to cache the credentials it loads from disk, # however this is not a time consuming operation (~ 3000 req/sec). -Luci settings = SIPSimpleSettings() tls_certificate = settings.tls.certificate certificate = None private_key = None if tls_certificate is not None: try: certificate_data = open(tls_certificate.normalized).read() certificate = X509Certificate(certificate_data) private_key = X509PrivateKey(certificate_data) except (FileNotFoundError, GNUTLSError, UnicodeDecodeError): pass trusted_cas = [] ca_list = settings.tls.ca_list if ca_list is not None: if len(self.trusted_cas) > 0: trusted_cas = self.trusted_cas else: crt = None start = False try: ca_text = open(ca_list.normalized).read() except (FileNotFoundError, GNUTLSError, UnicodeDecodeError): ca_text = '' for line in ca_text.split("\n"): if "BEGIN CERT" in line: start = True crt = line + "\n" elif "END CERT" in line: crt = crt + line + "\n" end = True start = False try: trusted_cas.append(X509Certificate(crt)) except (GNUTLSError, ValueError) as e: continue elif start: crt = crt + line + "\n" self.trusted_cas = trusted_cas self.ca_list = ca_list credentials = X509Credentials(certificate, private_key, trusted_cas) - credentials.verify_peer = settings.tls.verify_peer + credentials.verify_server = settings.tls.verify_server return credentials @property def uri(self): return SIPURI(user=self.id.username, host=self.id.domain) @property def voicemail_uri(self): return self._mwi_voicemail_uri or self.message_summary.voicemail_uri @property def presence_state(self): try: return self._presence_publisher.state except AttributeError: return None @presence_state.setter def presence_state(self, state): try: self._presence_publisher.state = state except AttributeError: pass @property def dialog_state(self): try: return self._dialog_publisher.state except AttributeError: return None @dialog_state.setter def dialog_state(self, state): try: self._dialog_publisher.state = state except AttributeError: pass def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) @run_in_green_thread def _NH_CFGSettingsObjectDidChange(self, notification): if self._started and 'enabled' in notification.data.modified: if self.enabled: self._activate() else: self._deactivate() def _NH_XCAPManagerDidDiscoverServerCapabilities(self, notification): if self._started and self.xcap.discovered is False: self.xcap.discovered = True self.save() notification.center.post_notification('SIPAccountDidDiscoverXCAPSupport', sender=self) def _NH_MWISubscriberDidDeactivate(self, notification): self._mwi_voicemail_uri = None def _NH_MWISubscriptionGotNotify(self, notification): body = notification.data.body.decode() if notification.data.body else None if body and notification.data.content_type == MessageSummary.content_type: try: message_summary = MessageSummary.parse(body) except ParserError: pass else: self._mwi_voicemail_uri = message_summary.message_account and SIPAddress(message_summary.message_account.replace('sip:', '', 1)) or None notification.center.post_notification('SIPAccountGotMessageSummary', sender=self, data=NotificationData(message_summary=message_summary)) def _NH_PresenceWinfoSubscriptionGotNotify(self, notification): body = notification.data.body.decode() if notification.data.body else None if body and notification.data.content_type == WatcherInfoDocument.content_type: try: watcher_info = WatcherInfoDocument.parse(body) watcher_list = watcher_info['sip:' + self.id] except (ParserError, KeyError): pass else: if watcher_list.package != 'presence': return if self._pwi_version is None: if watcher_info.state == 'partial': self._pwi_subscriber.resubscribe() elif watcher_info.version <= self._pwi_version: return elif watcher_info.state == 'partial' and watcher_info.version > self._pwi_version + 1: self._pwi_subscriber.resubscribe() self._pwi_version = watcher_info.version data = NotificationData(version=watcher_info.version, state=watcher_info.state, watcher_list=watcher_list) notification.center.post_notification('SIPAccountGotPresenceWinfo', sender=self, data=data) def _NH_PresenceWinfoSubscriptionDidEnd(self, notification): self._pwi_version = None def _NH_PresenceWinfoSubscriptionDidFail(self, notification): self._pwi_version = None def _NH_DialogWinfoSubscriptionGotNotify(self, notification): body = notification.data.body.decode() if notification.data.body else None if body and notification.data.content_type == WatcherInfoDocument.content_type: try: watcher_info = WatcherInfoDocument.parse(body) watcher_list = watcher_info['sip:' + self.id] except (ParserError, KeyError): pass else: if watcher_list.package != 'dialog': return if self._dwi_version is None: if watcher_info.state == 'partial': self._dwi_subscriber.resubscribe() elif watcher_info.version <= self._dwi_version: return elif watcher_info.state == 'partial' and watcher_info.version > self._dwi_version + 1: self._dwi_subscriber.resubscribe() self._dwi_version = watcher_info.version data = NotificationData(version=watcher_info.version, state=watcher_info.state, watcher_list=watcher_list) notification.center.post_notification('SIPAccountGotDialogWinfo', sender=self, data=data) def _NH_DialogWinfoSubscriptionDidEnd(self, notification): self._dwi_version = None def _NH_DialogWinfoSubscriptionDidFail(self, notification): self._dwi_version = None def _NH_PresenceSubscriptionGotNotify(self, notification): body = notification.data.body.decode() if notification.data.body else None if body and notification.data.content_type == RLSNotify.content_type: try: rls_notify = RLSNotify.parse('{content_type}\r\n\r\n{body}'.format(content_type=notification.data.headers['Content-Type'], body=body)) except ParserError: pass else: if rls_notify.uri != self.xcap_manager.rls_presence_uri: return if self._presence_version is None: if not rls_notify.full_state: self._presence_subscriber.resubscribe() elif rls_notify.version <= self._presence_version: return elif not rls_notify.full_state and rls_notify.version > self._presence_version + 1: self._presence_subscriber.resubscribe() self._presence_version = rls_notify.version data = NotificationData(version=rls_notify.version, full_state=rls_notify.full_state, resource_map=dict((str(resource.uri), resource) for resource in rls_notify)) notification.center.post_notification('SIPAccountGotPresenceState', sender=self, data=data) def _NH_PresenceSubscriptionDidEnd(self, notification): self._presence_version = None def _NH_PresenceSubscriptionDidFail(self, notification): self._presence_version = None def _NH_SelfPresenceSubscriptionGotNotify(self, notification): body = notification.data.body.decode() if notification.data.body else None if body and notification.data.content_type == PIDFDocument.content_type: try: pidf_doc = PIDFDocument.parse(body) except ParserError: pass else: if pidf_doc.entity.partition('sip:')[2] != self.id: return notification.center.post_notification('SIPAccountGotSelfPresenceState', sender=self, data=NotificationData(pidf=pidf_doc)) def _NH_DialogSubscriptionGotNotify(self, notification): body = notification.data.body.decode() if notification.data.body else None if body and notification.data.content_type == RLSNotify.content_type: try: rls_notify = RLSNotify.parse('{content_type}\r\n\r\n{body}'.format(content_type=notification.data.headers['Content-Type'], body=body)) except ParserError: pass else: if rls_notify.uri != self.xcap_manager.rls_dialog_uri: return if self._dialog_version is None: if not rls_notify.full_state: self._dialog_subscriber.resubscribe() elif rls_notify.version <= self._dialog_version: return elif not rls_notify.full_state and rls_notify.version > self._dialog_version + 1: self._dialog_subscriber.resubscribe() self._dialog_version = rls_notify.version data = NotificationData(version=rls_notify.version, full_state=rls_notify.full_state, resource_map=dict((resource.uri, resource) for resource in rls_notify)) notification.center.post_notification('SIPAccountGotDialogState', sender=self, data=data) def _NH_DialogSubscriptionDidEnd(self, notification): self._dialog_version = None def _NH_DialogSubscriptionDidFail(self, notification): self._dialog_version = None def _activate(self): with self._activation_lock: if self._active: return notification_center = NotificationCenter() notification_center.post_notification('SIPAccountWillActivate', sender=self) self._active = True self._registrar.start() self._mwi_subscriber.start() self._pwi_subscriber.start() self._dwi_subscriber.start() self._presence_subscriber.start() self._self_presence_subscriber.start() self._dialog_subscriber.start() self._presence_publisher.start() self._dialog_publisher.start() if self.xcap.enabled: self.xcap_manager.start() notification_center.post_notification('SIPAccountDidActivate', sender=self) def _deactivate(self): with self._activation_lock: if not self._active: return notification_center = NotificationCenter() notification_center.post_notification('SIPAccountWillDeactivate', sender=self) self._active = False handlers = [self._registrar, self._mwi_subscriber, self._pwi_subscriber, self._dwi_subscriber, self._presence_subscriber, self._self_presence_subscriber, self._dialog_subscriber, self._presence_publisher, self._dialog_publisher, self.xcap_manager] proc.waitall([proc.spawn(handler.stop) for handler in handlers]) notification_center.post_notification('SIPAccountDidDeactivate', sender=self) def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.id) def __setstate__(self, data): # This restores the password from its previous location as a top level setting # after it was moved under the auth group. SettingsObject.__setstate__(self, data) if not data.get('auth', {}).get('password') and data.get('password'): self.auth.password = data.pop('password') self.save() class BonjourMSRPSettings(SettingsGroup): transport = Setting(type=MSRPTransport, default='tls') class BonjourAccountEnabledSetting(Setting): def __get__(self, obj, objtype): if obj is None: return self return _bonjour.available and self.values.get(obj, self.default) def __set__(self, obj, value): if not _bonjour.available: raise RuntimeError('mdns support is not available') Setting.__set__(self, obj, value) @implementer(IObserver) class BonjourAccount(SettingsObject): """ Object representing a bonjour account. Contains configuration settings and attributes for accessing bonjour related options. When the account is active, it will send broadcast its contact address on the LAN. If the object is un-pickled and its enabled flag was set, it will automatically activate. When the save method is called, depending on the value of the enabled flag, the account will activate/deactivate. Notifications sent by instances of Account: * CFGSettingsObjectWasCreated * CFGSettingsObjectWasActivated * CFGSettingsObjectWasDeleted * CFGSettingsObjectDidChange * SIPAccountWillActivate * SIPAccountDidActivate * SIPAccountWillDeactivate * SIPAccountDidDeactivate """ __group__ = 'Accounts' __id__ = SIPAddress('bonjour@local') id = property(lambda self: self.__id__) enabled = BonjourAccountEnabledSetting(type=bool, default=True) display_name = Setting(type=str, default=user_info.fullname, nillable=False) msrp = BonjourMSRPSettings presence = PresenceSettings rtp = RTPSettings def __new__(cls): # with AccountManager.load.lock: # if not AccountManager.load.called: # raise RuntimeError("cannot instantiate %s before calling AccountManager.load" % cls.__name__) return SettingsObject.__new__(cls) def __init__(self): self.contact = ContactURIFactory() self.credentials = None self._started = False self._active = False self._activation_lock = coros.Semaphore(1) self._bonjour_services = BonjourServices(self) # initialize fake settings (these are here to make the bonjour account quack like a duck) self.nat_traversal = NATTraversalSettings() self.nat_traversal.use_ice = False self.nat_traversal.msrp_relay = None self.nat_traversal.use_msrp_relay_for_outbound = False self.xcap = XCAPSettings() self.xcap.enabled = False self.xcap.discovered = False self.xcap.xcap_root = None def __repr__(self): return '%s()' % self.__class__.__name__ def start(self): if self._started: return self._started = True notification_center = NotificationCenter() notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self) notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) self._bonjour_services.start() if self.enabled: self._activate() def stop(self): if not self._started: return self._started = False self._deactivate() self._bonjour_services.stop() notification_center = NotificationCenter() notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self) notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) @classproperty def mdns_available(cls): return _bonjour.available @property def registered(self): return False @property def tls_credentials(self): # This property can be optimized to cache the credentials it loads from disk, # however this is not a time consuming operation (~ 3000 req/sec). -Luci settings = SIPSimpleSettings() tls_certificate = settings.tls.certificate if tls_certificate is not None: certificate_data = open(tls_certificate.normalized).read() certificate = X509Certificate(certificate_data) private_key = X509PrivateKey(certificate_data) else: certificate = None private_key = None credentials = X509Credentials(certificate, private_key, []) - credentials.verify_peer = False + credentials.verify_server = False return credentials @property def uri(self): return SIPURI(user=self.contact.username, host=Host.default_ip or '127.0.0.1') @property def presence_state(self): return self._bonjour_services.presence_state @presence_state.setter def presence_state(self, state): self._bonjour_services.presence_state = state def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) @run_in_green_thread def _NH_CFGSettingsObjectDidChange(self, notification): if self._started: if 'enabled' in notification.data.modified: if self.enabled: self._activate() else: self._deactivate() elif self.enabled: if 'display_name' in notification.data.modified: self._bonjour_services.update_registrations() if {'sip.transport_list', 'tls.certificate'}.intersection(notification.data.modified): self._bonjour_services.update_registrations() self._bonjour_services.restart_discovery() def _activate(self): with self._activation_lock: if self._active: return notification_center = NotificationCenter() notification_center.post_notification('SIPAccountWillActivate', sender=self) self._active = True self._bonjour_services.activate() notification_center.post_notification('SIPAccountDidActivate', sender=self) def _deactivate(self): with self._activation_lock: if not self._active: return notification_center = NotificationCenter() notification_center.post_notification('SIPAccountWillDeactivate', sender=self) self._active = False self._bonjour_services.deactivate() notification_center.post_notification('SIPAccountDidDeactivate', sender=self) @implementer(IObserver) class AccountManager(object, metaclass=Singleton): """ This is a singleton object which manages all the SIP accounts. It is also used to manage the default account (the one used for outbound sessions) using the default_account attribute: manager = AccountManager() manager.default_account = manager.get_account('alice@example.net') The following notifications are sent: * SIPAccountManagerDidRemoveAccount * SIPAccountManagerDidAddAccount * SIPAccountManagerDidChangeDefaultAccount """ def __init__(self): self._lock = Lock() self.accounts = {} notification_center = NotificationCenter() notification_center.add_observer(self, name='CFGSettingsObjectWasActivated') notification_center.add_observer(self, name='CFGSettingsObjectWasCreated') @execute_once def load(self): """ Load all accounts from the configuration. The accounts will not be started until the start method is called. """ configuration = ConfigurationManager() bonjour_account = BonjourAccount() names = configuration.get_names([Account.__group__]) [Account(id) for id in names if id != bonjour_account.id] default_account = self.default_account if default_account is None or not default_account.enabled: try: self.default_account = next((account for account in list(self.accounts.values()) if account.enabled)) except StopIteration: self.default_account = None def start(self): """ Start the accounts, which will determine the ones with the enabled flag set to activate. """ notification_center = NotificationCenter() notification_center.post_notification('SIPAccountManagerWillStart', sender=self) proc.waitall([proc.spawn(account.start) for account in list(self.accounts.values())]) notification_center.post_notification('SIPAccountManagerDidStart', sender=self) def stop(self): """ Stop the accounts, which will determine the ones that were enabled to deactivate. This method returns only once the accounts were stopped successfully or they timed out trying. """ notification_center = NotificationCenter() notification_center.post_notification('SIPAccountManagerWillEnd', sender=self) proc.waitall([proc.spawn(account.stop) for account in list(self.accounts.values())]) notification_center.post_notification('SIPAccountManagerDidEnd', sender=self) def has_account(self, id): return id in self.accounts def get_account(self, id): return self.accounts[id] def get_accounts(self): return list(self.accounts.values()) def iter_accounts(self): return iter(list(self.accounts.values())) def find_account(self, contact_uri): # compare contact_address with account contact exact_matches = (account for account in list(self.accounts.values()) if account.enabled and account.contact.username==contact_uri.user) # compare username in contact URI with account username loose_matches = (account for account in list(self.accounts.values()) if account.enabled and account.id.username==contact_uri.user) return next(chain(exact_matches, loose_matches, [None])) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_CFGSettingsObjectWasActivated(self, notification): if isinstance(notification.sender, Account) or (isinstance(notification.sender, BonjourAccount) and _bonjour.available): account = notification.sender self.accounts[account.id] = account notification.center.add_observer(self, sender=account, name='CFGSettingsObjectDidChange') notification.center.add_observer(self, sender=account, name='CFGSettingsObjectWasDeleted') notification.center.post_notification('SIPAccountManagerDidAddAccount', sender=self, data=NotificationData(account=account)) from sipsimple.application import SIPApplication if SIPApplication.running: call_in_green_thread(account.start) def _NH_CFGSettingsObjectWasCreated(self, notification): if isinstance(notification.sender, Account): account = notification.sender if account.enabled and self.default_account is None: self.default_account = account def _NH_CFGSettingsObjectWasDeleted(self, notification): account = notification.sender del self.accounts[account.id] notification.center.remove_observer(self, sender=account, name='CFGSettingsObjectDidChange') notification.center.remove_observer(self, sender=account, name='CFGSettingsObjectWasDeleted') notification.center.post_notification('SIPAccountManagerDidRemoveAccount', sender=self, data=NotificationData(account=account)) def _NH_CFGSettingsObjectDidChange(self, notification): account = notification.sender if '__id__' in notification.data.modified: modified_id = notification.data.modified['__id__'] self.accounts[modified_id.new] = self.accounts.pop(modified_id.old) if 'enabled' in notification.data.modified: if account.enabled and self.default_account is None: self.default_account = account elif not account.enabled and self.default_account is account: try: self.default_account = next((account for account in list(self.accounts.values()) if account.enabled)) except StopIteration: self.default_account = None @property def default_account(self): settings = SIPSimpleSettings() return self.accounts.get(settings.default_account, None) @default_account.setter def default_account(self, account): if account is not None and not account.enabled: raise ValueError("account %s is not enabled" % account.id) notification_center = NotificationCenter() settings = SIPSimpleSettings() with self._lock: old_account = self.accounts.get(settings.default_account, None) if account is old_account: return if account is None: settings.default_account = None else: settings.default_account = account.id settings.save() # we need to post the notification in the file-io thread in order to have it serialized after the # SIPAccountManagerDidAddAccount notification that is triggered when the account is saved the first # time, because save is executed in the file-io thread while this runs in the current thread. -Dan call_in_thread('file-io', notification_center.post_notification, 'SIPAccountManagerDidChangeDefaultAccount', sender=self, data=NotificationData(old_account=old_account, account=account)) diff --git a/sipsimple/streams/msrp/__init__.py b/sipsimple/streams/msrp/__init__.py index ac6bf2ba..5fdce851 100644 --- a/sipsimple/streams/msrp/__init__.py +++ b/sipsimple/streams/msrp/__init__.py @@ -1,424 +1,426 @@ """ Handling of MSRP media streams according to RFC4975, RFC4976, RFC5547 and RFC3994. """ __all__ = ['MSRPStreamError', 'MSRPStreamBase'] import traceback + from application.notification import NotificationCenter, NotificationData, IObserver from application.python import Null from application.system import host from gnutls.errors import CertificateAuthorityError, CertificateError, CertificateRevokedError from twisted.internet.error import ConnectionDone from zope.interface import implementer from eventlib import api from msrplib.connect import DirectConnector, DirectAcceptor, RelayConnection, MSRPRelaySettings from msrplib.protocol import URI from msrplib.session import contains_mime_type from sipsimple.account import Account, BonjourAccount from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SDPAttribute, SDPConnection, SDPMediaStream from sipsimple.streams import IMediaStream, MediaStreamType, StreamError from sipsimple.threading.green import run_in_green_thread from gnutls.errors import CertificateError, CertificateAuthorityError, CertificateExpiredError, CertificateSecurityError, CertificateRevokedError class MSRPStreamError(StreamError): pass @implementer(IMediaStream, IObserver) class MSRPStreamBase(object, metaclass=MediaStreamType): # Attributes that need to be defined by each MSRP stream type type = None priority = None msrp_session_class = None media_type = None accept_types = None accept_wrapped_types = None # These attributes are always False for any MSRP stream hold_supported = False on_hold = False on_hold_by_local = False on_hold_by_remote = False def __new__(cls, *args, **kw): if cls is MSRPStreamBase: raise TypeError("MSRPStreamBase cannot be instantiated directly") return object.__new__(cls) def __init__(self, direction='sendrecv'): self.direction = direction self.greenlet = None self.local_media = None self.remote_media = None self.msrp = None # Placeholder for the MSRPTransport that will be set when started self.msrp_connector = None self.cpim_enabled = None # Boolean value. None means it was not negotiated yet self.session = None self.msrp_session = None self.shutting_down = False self.local_role = None self.remote_role = None self.transport = None self.remote_accept_types = None self.remote_accept_wrapped_types = None self._initialize_done = False self._done = False self._failure_reason = None @property def local_uri(self): msrp = self.msrp or self.msrp_connector return msrp.local_uri if msrp is not None else None def _create_local_media(self, uri_path): transport = "TCP/TLS/MSRP" if uri_path[-1].use_tls else "TCP/MSRP" attributes = [] path = " ".join(str(uri) for uri in uri_path) attributes.append(SDPAttribute(b"path", path.encode())) if self.direction not in [None, 'sendrecv']: attributes.append(SDPAttribute(self.direction.encode(), b'')) if self.accept_types is not None: a_types = " ".join(self.accept_types) attributes.append(SDPAttribute(b"accept-types", a_types.encode())) if self.accept_wrapped_types is not None: a_w_types = " ".join(self.accept_wrapped_types) attributes.append(SDPAttribute(b"accept-wrapped-types", a_w_types.encode())) attributes.append(SDPAttribute(b"setup", self.local_role.encode() if self.local_role else None)) local_ip = uri_path[-1].host connection = SDPConnection(local_ip.encode()) return SDPMediaStream(self.media_type.encode(), uri_path[-1].port or 2855, transport.encode(), connection=connection, formats=[b"*"], attributes=attributes) # The public API (the IMediaStream interface) # noinspection PyUnusedLocal def get_local_media(self, remote_sdp=None, index=0): return self.local_media def new_from_sdp(self, session, remote_sdp, stream_index): raise NotImplementedError @run_in_green_thread def initialize(self, session, direction): self.greenlet = api.getcurrent() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) + settings = SIPSimpleSettings() try: self.session = session self.transport = self.session.account.msrp.transport outgoing = direction == 'outgoing' logger = NotificationProxyLogger() if self.session.account is BonjourAccount(): if outgoing: self.msrp_connector = DirectConnector(logger=logger) self.local_role = 'active' else: - if self.transport == 'tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): + if self.transport == 'tls' and settings.tls.certificate is None: raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger) self.local_role = 'passive' else: if self.session.account.msrp.connection_model == 'relay': if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' elif outgoing and not self.session.account.nat_traversal.use_msrp_relay_for_outbound: self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if self.session.account.nat_traversal.msrp_relay is None: relay_host = relay_port = None else: if self.transport != self.session.account.nat_traversal.msrp_relay.transport: raise MSRPStreamError("MSRP relay transport conflicts with MSRP transport setting") relay_host = self.session.account.nat_traversal.msrp_relay.host relay_port = self.session.account.nat_traversal.msrp_relay.port relay = MSRPRelaySettings(domain=self.session.account.uri.host.decode(), username=self.session.account.uri.user.decode(), password=self.session.account.credentials.password.decode(), host=relay_host, port=relay_port, use_tls=self.transport=='tls') self.msrp_connector = RelayConnection(relay, 'passive', logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' else: if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: - if not outgoing and self.transport == 'tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): + if not outgoing and self.transport == 'tls' and settings.tls.certificate is None: raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' full_local_path = self.msrp_connector.prepare(local_uri=URI(host=host.default_ip, port=0, use_tls=self.transport=='tls', credentials=self.session.account.tls_credentials)) self.local_media = self._create_local_media(full_local_path) except (CertificateError, CertificateAuthorityError, CertificateExpiredError, CertificateSecurityError, CertificateRevokedError) as e: reason = "%s for %s" % (e.error, e.certificate.subject.CN.lower()) notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=reason)) except Exception as e: notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=str(e))) else: notification_center.post_notification('MediaStreamDidInitialize', sender=self) finally: self._initialize_done = True self.greenlet = None # noinspection PyUnusedLocal @run_in_green_thread def start(self, local_sdp, remote_sdp, stream_index): self.greenlet = api.getcurrent() notification_center = NotificationCenter() context = 'sdp_negotiation' try: remote_media = remote_sdp.media[stream_index] self.remote_media = remote_media self.remote_accept_types = remote_media.attributes.getfirst(b'accept-types', b'').decode().split() self.remote_accept_wrapped_types = remote_media.attributes.getfirst(b'accept-wrapped-types', b'').decode().split() self.cpim_enabled = contains_mime_type(self.accept_types, 'message/cpim') and contains_mime_type(self.remote_accept_types, 'message/cpim') remote_uri_path = remote_media.attributes.getfirst(b'path') if remote_uri_path is None: raise AttributeError("remote SDP media does not have 'path' attribute") full_remote_path = [URI.parse(uri) for uri in remote_uri_path.decode().split()] remote_transport = 'tls' if full_remote_path[0].use_tls else 'tcp' if self.transport != remote_transport: raise MSRPStreamError("remote transport ('%s') different from local transport ('%s')" % (remote_transport, self.transport)) if isinstance(self.session.account, Account) and self.local_role == 'actpass': remote_setup = remote_media.attributes.getfirst('setup', 'passive') if remote_setup == 'passive': # If actpass is offered connectors are always started as passive # We need to switch to active if the remote answers with passive if self.session.account.msrp.connection_model == 'relay': self.msrp_connector.mode = 'active' else: local_uri = self.msrp_connector.local_uri logger = self.msrp_connector.logger self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.msrp_connector.prepare(local_uri) context = 'start' self.msrp = self.msrp_connector.complete(full_remote_path) if self.msrp_session_class is not None: self.msrp_session = self.msrp_session_class(self.msrp, accept_types=self.accept_types, on_incoming_cb=self._handle_incoming, automatic_reports=False) self.msrp_connector = None except (CertificateAuthorityError, CertificateError, CertificateRevokedError) as e: peer = '%s:%s' % (full_remote_path[0].host, full_remote_path[0].port) self._failure_reason = "Peer %s: %s" % (peer, e.error) notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context=context, reason=self._failure_reason)) except Exception as e: #traceback.print_exc() self._failure_reason = str(e) notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context=context, reason=self._failure_reason)) else: notification_center.post_notification('MediaStreamDidStart', sender=self) finally: self.greenlet = None def deactivate(self): self.shutting_down = True @run_in_green_thread def end(self): if self._done: return self._done = True notification_center = NotificationCenter() if not self._initialize_done: # we are in the middle of initialize() try: msrp_connector = self.msrp_connector if self.greenlet is not None: api.kill(self.greenlet) if msrp_connector is not None: msrp_connector.cleanup() finally: notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason='Interrupted')) notification_center.remove_observer(self, sender=self) self.msrp_connector = None self.greenlet = None else: notification_center.post_notification('MediaStreamWillEnd', sender=self) msrp = self.msrp msrp_session = self.msrp_session msrp_connector = self.msrp_connector try: if self.greenlet is not None: api.kill(self.greenlet) if msrp_session is not None: msrp_session.shutdown() elif msrp is not None: msrp.loseConnection(wait=False) if msrp_connector is not None: msrp_connector.cleanup() finally: notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason)) notification_center.remove_observer(self, sender=self) self.msrp = None self.msrp_session = None self.msrp_connector = None self.session = None self.greenlet = None # noinspection PyMethodMayBeStatic,PyUnusedLocal def validate_update(self, remote_sdp, stream_index): return True # TODO def update(self, local_sdp, remote_sdp, stream_index): pass # TODO def hold(self): pass def unhold(self): pass def reset(self, stream_index): pass # Internal IObserver interface def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Internal message handlers def _handle_incoming(self, chunk=None, error=None): notification_center = NotificationCenter() if error is not None: if self.shutting_down and isinstance(error.value, ConnectionDone): return self._failure_reason = error.getErrorMessage() notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason)) elif chunk is not None: method_handler = getattr(self, '_handle_%s' % chunk.method, None) if method_handler is not None: method_handler(chunk) def _handle_REPORT(self, chunk): pass def _handle_SEND(self, chunk): pass # temporary solution. to be replaced later by a better logging system in msrplib -Dan # class ChunkInfo(object): __slots__ = 'content_type', 'header', 'footer', 'data' def __init__(self, content_type, header='', footer='', data=''): self.content_type = content_type self.header = header self.footer = footer self.data = data def __repr__(self): return "{0.__class__.__name__}(content_type={0.content_type!r}, header={0.header!r}, footer={0.footer!r}, data={0.data!r})".format(self) @property def content(self): return self.header + self.data + self.footer @property def normalized_content(self): header = self.header.decode() if isinstance(self.header, bytes) else self.header footer = self.footer.decode() if isinstance(self.footer, bytes) else self.footer try: data = self.data.decode() if isinstance(self.data, bytes) else self.data except UnicodeDecodeError: data = '<<>>' if not data: return header + footer elif self.content_type == 'message/cpim': headers, sep, body = data.partition('\r\n\r\n') if not sep: return header + data + footer mime_headers, mime_sep, mime_body = body.partition('\n\n') if not mime_sep: return header + data + footer for mime_header in mime_headers.lower().splitlines(): if mime_header.startswith('content-type:'): wrapped_content_type = mime_header[13:].partition(';')[0].strip() break else: wrapped_content_type = None if wrapped_content_type is None or wrapped_content_type == 'application/im-iscomposing+xml' or wrapped_content_type.startswith(('text/', 'message/')): data = data else: data = headers + sep + mime_headers + mime_sep + '<<>>' return header + data + footer elif self.content_type is None or self.content_type == 'application/im-iscomposing+xml' or self.content_type.startswith(('text/', 'message/')): return header + data + footer else: return header + '<<>>' + footer class NotificationProxyLogger(object): def __init__(self): from application import log self.level = log.level self.notification_center = NotificationCenter() self.log_settings = SIPSimpleSettings().logs def received_chunk(self, data, transport): if self.log_settings.trace_msrp: chunk_info = ChunkInfo(data.content_type, header=data.chunk_header, footer=data.chunk_footer, data=data.data) notification_data = NotificationData(direction='incoming', local_address=transport.getHost(), remote_address=transport.getPeer(), data=chunk_info.normalized_content, illegal=False) self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data) def sent_chunk(self, data, transport): if self.log_settings.trace_msrp: chunk_info = ChunkInfo(data.content_type, header=data.encoded_header, footer=data.encoded_footer, data=data.data) notification_data = NotificationData(direction='outgoing', local_address=transport.getHost(), remote_address=transport.getPeer(), data=chunk_info.normalized_content, illegal=False) self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data) def received_illegal_data(self, data, transport): if self.log_settings.trace_msrp: notification_data = NotificationData(direction='incoming', local_address=transport.getHost(), remote_address=transport.getPeer(), data=data, illegal=True) self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data) def debug(self, message, *args, **kw): pass def info(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.INFO)) def warning(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.WARNING)) warn = warning def error(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.ERROR)) def exception(self, message='', *args, **kw): if self.log_settings.trace_msrp: message = message % args if args else message exception = traceback.format_exc() self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message + '\n' + exception if message else exception, level=self.level.ERROR)) def critical(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.CRITICAL)) fatal = critical from sipsimple.streams.msrp import chat, filetransfer, screensharing