diff --git a/sipsimple/addressbook.py b/sipsimple/addressbook.py index e5e2174b..7fc54812 100644 --- a/sipsimple/addressbook.py +++ b/sipsimple/addressbook.py @@ -1,1365 +1,1365 @@ """Implementation of an addressbook management system""" __all__ = ['AddressbookManager', 'Contact', 'ContactURI', 'Group', 'Policy', 'SharedSetting', 'ContactExtension', 'ContactURIExtension', 'GroupExtension', 'PolicyExtension'] from functools import reduce from operator import attrgetter from random import randint from .threading import Lock from time import time from zope.interface import implementer -from .application.notification import IObserver, NotificationCenter, NotificationData -from .application.python import Null -from .application.python.decorator import execute_once -from .application.python.types import Singleton, MarkerType -from .application.python.weakref import defaultweakobjectmap +from application.notification import IObserver, NotificationCenter, NotificationData +from application.python import Null +from application.python.decorator import execute_once +from application.python.types import Singleton, MarkerType +from application.python.weakref import defaultweakobjectmap from sipsimple import log from sipsimple.account import xcap, AccountManager from sipsimple.configuration import ConfigurationManager, ObjectNotFoundError, DuplicateIDError, PersistentKey, ModifiedValue, ModifiedList from sipsimple.configuration import AbstractSetting, RuntimeSetting, SettingsObjectImmutableID, SettingsGroup, SettingsGroupMeta, SettingsState, ItemCollection, ItemManagement from sipsimple.payloads.addressbook import PolicyValue, ElementAttributes from sipsimple.payloads.datatypes import ID from sipsimple.payloads.resourcelists import ResourceListsDocument from sipsimple.threading import run_in_thread def unique_id(prefix='id'): return "%s%d%06d" % (prefix, time()*1e6, randint(0, 999999)) def recursive_getattr(obj, name): return reduce(getattr, name.split('.'), obj) class Local(object, metaclass=MarkerType): pass class Remote(object): def __init__(self, account, xcap_object): self.account = account self.xcap_object = xcap_object def __repr__(self): return "%s(%r, %r)" % (self.__class__.__name__, self.account, self.xcap_object) class Setting(AbstractSetting): """ Descriptor representing a setting in an addressbook object. Unlike a standard Setting, this one will only use the default value as a template to fill in a missing value and explicitly set it when saving if it was not specified explicitly prior to that. """ def __init__(self, type, default=None, nillable=False): if default is None and not nillable: raise TypeError("default must be specified if object is not nillable") self.type = type self.default = default self.nillable = nillable self.values = defaultweakobjectmap(lambda: default) self.oldvalues = defaultweakobjectmap(lambda: default) self.dirty = defaultweakobjectmap(bool) self.lock = Lock() def __get__(self, obj, objtype): if obj is None: return self with self.lock: return self.values[obj] def __set__(self, obj, value): if value is None and not self.nillable: raise ValueError("setting attribute is not nillable") if value is not None and not isinstance(value, self.type): value = self.type(value) with self.lock: self.values[obj] = value self.dirty[obj] = value != self.oldvalues[obj] def __getstate__(self, obj): with self.lock: value = self.values[obj] if value is None: pass elif issubclass(self.type, bool): value = 'true' if value else 'false' elif issubclass(self.type, (int, int, str)): value = str(value) elif hasattr(value, '__getstate__'): value = value.__getstate__() else: value = str(value) return value def __setstate__(self, obj, value): if value is None and not self.nillable: raise ValueError("setting attribute is not nillable") if value is None: pass elif issubclass(self.type, bool): if value.lower() in ('true', 'yes', 'on', '1'): value = True elif value.lower() in ('false', 'no', 'off', '0'): value = False else: raise ValueError("invalid boolean value: %s" % (value,)) elif issubclass(self.type, (int, int, str)): value = self.type(value) elif hasattr(self.type, '__setstate__'): object = self.type.__new__(self.type) object.__setstate__(value) value = object else: value = self.type(value) with self.lock: self.oldvalues[obj] = self.values[obj] = value self.dirty[obj] = False def get_modified(self, obj): with self.lock: try: if self.dirty[obj]: return ModifiedValue(old=self.oldvalues[obj], new=self.values[obj]) else: return None finally: self.oldvalues[obj] = self.values[obj] self.dirty[obj] = False def get_old(self, obj): with self.lock: return self.oldvalues[obj] def undo(self, obj): with self.lock: self.values[obj] = self.oldvalues[obj] self.dirty[obj] = False class SharedSetting(Setting): """A setting that is shared by being also stored remotely in XCAP""" __namespace__ = None @classmethod def set_namespace(cls, namespace): """ Set the XML namespace to be used for the extra shared attributes of a contact, when storing it in XCAP """ if cls.__namespace__ is not None: raise RuntimeError("namespace already set to %s" % cls.__namespace__) cls.__namespace__ = namespace class ApplicationElementAttributes(ElementAttributes): _xml_namespace = 'urn:%s:xml:ns:addressbook' % namespace ResourceListsDocument.unregister_namespace(ElementAttributes._xml_namespace) ResourceListsDocument.register_namespace(ApplicationElementAttributes._xml_namespace, prefix=namespace.rpartition(':')[2]) for cls, attribute_name in ((cls, name) for cls in list(ResourceListsDocument.element_map.values()) for name, elem in list(cls._xml_element_children.items()) if elem.type is ElementAttributes): cls.unregister_extension(attribute_name) cls.register_extension(attribute_name, ApplicationElementAttributes) class AddressbookKey(object): def __init__(self, section): self.group = 'Addressbook' self.section = section def __get__(self, obj, objtype): if obj is None: return [self.group, self.section] else: return [self.group, self.section, PersistentKey(obj.__id__)] def __set__(self, obj, value): raise AttributeError('cannot set attribute') def __delete__(self, obj): raise AttributeError('cannot delete attribute') class MultiAccountTransaction(object): def __init__(self, accounts): self.accounts = accounts def __enter__(self): for account in self.accounts: account.xcap_manager.start_transaction() return self def __exit__(self, exc_type, exc_value, traceback): for account in self.accounts: account.xcap_manager.commit_transaction() def __iter__(self): return iter(self.accounts) class XCAPGroup(xcap.Group): """An XCAP Group with attributes normalized to unicode""" __attributes__ = set() def __init__(self, id, name, contacts, **attributes): normalized_attributes = dict((name, str(value) if value is not None else None) for name, value in list(attributes.items()) if name in self.__attributes__) contacts = [XCAPContact.normalize(contact) for contact in contacts] super(XCAPGroup, self).__init__(id, name, contacts, **normalized_attributes) @classmethod def normalize(cls, group): return cls(group.id, group.name, group.contacts, **group.attributes) def get_modified(self, modified_keys): names = {'name'} attributes = dict((name, getattr(self, name)) for name in names.intersection(modified_keys)) attributes.update((name, self.attributes[name]) for name in self.__attributes__.intersection(modified_keys)) return attributes class XCAPContactURI(xcap.ContactURI): """An XCAP ContactURI with attributes normalized to unicode""" __attributes__ = set() def __init__(self, id, uri, type, **attributes): normalized_attributes = dict((name, str(value) if value is not None else None) for name, value in list(attributes.items()) if name in self.__attributes__) super(XCAPContactURI, self).__init__(id, uri, type, **normalized_attributes) @classmethod def normalize(cls, uri): return cls(uri.id, uri.uri, uri.type, **uri.attributes) def get_modified(self, modified_keys): names = {'uri', 'type'} attributes = dict((name, getattr(self, name)) for name in names.intersection(modified_keys)) attributes.update((name, self.attributes[name]) for name in self.__attributes__.intersection(modified_keys)) return attributes class XCAPContact(xcap.Contact): """An XCAP Contact with attributes normalized to unicode""" __attributes__ = set() def __init__(self, id, name, uris, presence_handling=None, dialog_handling=None, **attributes): normalized_attributes = dict((name, str(value) if value is not None else None) for name, value in list(attributes.items()) if name in self.__attributes__) uris = xcap.ContactURIList((XCAPContactURI.normalize(uri) for uri in uris), default=getattr(uris, 'default', None)) super(XCAPContact, self).__init__(id, name, uris, presence_handling, dialog_handling, **normalized_attributes) @classmethod def normalize(cls, contact): return cls(contact.id, contact.name, contact.uris, contact.presence, contact.dialog, **contact.attributes) def get_modified(self, modified_keys): names = {'name', 'uris.default', 'presence.policy', 'presence.subscribe', 'dialog.policy', 'dialog.subscribe'} attributes = dict((name, recursive_getattr(self, name)) for name in names.intersection(modified_keys)) attributes.update((name, self.attributes[name]) for name in self.__attributes__.intersection(modified_keys)) return attributes class XCAPPolicy(xcap.Policy): """An XCAP Policy with attributes normalized to unicode""" __attributes__ = set() def __init__(self, id, uri, name, presence_handling=None, dialog_handling=None, **attributes): normalized_attributes = dict((name, str(value) if value is not None else None) for name, value in list(attributes.items()) if name in self.__attributes__) super(XCAPPolicy, self).__init__(id, uri, name, presence_handling, dialog_handling, **normalized_attributes) @classmethod def normalize(cls, policy): return cls(policy.id, policy.uri, policy.name, policy.presence, policy.dialog, **policy.attributes) def get_modified(self, modified_keys): names = {'uri', 'name', 'presence.policy', 'presence.subscribe', 'dialog.policy', 'dialog.subscribe'} attributes = dict((name, recursive_getattr(self, name)) for name in names.intersection(modified_keys)) attributes.update((name, self.attributes[name]) for name in self.__attributes__.intersection(modified_keys)) return attributes class ContactListDescriptor(AbstractSetting): def __init__(self): self.values = defaultweakobjectmap(ContactList) self.oldvalues = defaultweakobjectmap(ContactList) self.lock = Lock() def __get__(self, obj, objtype): if obj is None: return self with self.lock: return self.values[obj] def __set__(self, obj, value): if value is None: raise ValueError("setting attribute is not nillable") elif not isinstance(value, ContactList): value = ContactList(value) with self.lock: self.values[obj] = value def __getstate__(self, obj): with self.lock: return self.values[obj].__getstate__() def __setstate__(self, obj, value): if value is None: raise ValueError("setting attribute is not nillable") object = ContactList.__new__(ContactList) object.__setstate__(value) with self.lock: self.values[obj] = object self.oldvalues[obj] = ContactList(object) def get_modified(self, obj): with self.lock: old = self.oldvalues[obj] new = self.values[obj] with new.lock: old_ids = set(old.ids()) new_ids = set(new.ids()) added_contacts = [new[id] for id in new_ids - old_ids] removed_contacts = [old[id] for id in old_ids - new_ids] try: if added_contacts or removed_contacts: return ModifiedList(added=added_contacts, removed=removed_contacts, modified=None) else: return None finally: self.oldvalues[obj] = ContactList(new) def get_old(self, obj): with self.lock: return self.oldvalues[obj] def undo(self, obj): with self.lock: self.values[obj] = ContactList(self.oldvalues[obj]) class ContactList(object): def __new__(cls, contacts=None): instance = object.__new__(cls) instance.lock = Lock() return instance def __init__(self, contacts=None): self.contacts = dict((contact.id, contact) for contact in contacts or [] if contact.__state__ != 'deleted') def __getitem__(self, key): return self.contacts[key] def __contains__(self, key): return key in self.contacts def __iter__(self): return iter(sorted(list(self.contacts.values()), key=attrgetter('id'))) def __reversed__(self): return iter(sorted(list(self.contacts.values()), key=attrgetter('id'), reverse=True)) __hash__ = None def __len__(self): return len(self.contacts) def __eq__(self, other): if isinstance(other, ContactList): return self.contacts == other.contacts return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal def __repr__(self): return "%s(%r)" % (self.__class__.__name__, sorted(list(self.contacts.values()), key=attrgetter('id'))) def __getstate__(self): return list(self.contacts.keys()) def __setstate__(self, value): addressbook_manager = AddressbookManager() for id in [id for id in value if not addressbook_manager.has_contact(id)]: value.remove(id) with self.lock: self.contacts = dict((id, addressbook_manager.get_contact(id)) for id in value) def ids(self): return sorted(self.contacts.keys()) def add(self, contact): if contact.__state__ == 'deleted': return with self.lock: self.contacts[contact.id] = contact def remove(self, contact): with self.lock: self.contacts.pop(contact.id, None) class Group(SettingsState): __key__ = AddressbookKey('Groups') __id__ = SettingsObjectImmutableID(type=ID) id = __id__ name = Setting(type=str, default='') contacts = ContactListDescriptor() def __new__(cls, id=None): with AddressbookManager.load.lock: if not AddressbookManager.load.called: raise RuntimeError("cannot instantiate %s before calling AddressbookManager.load" % cls.__name__) if id is None: id = unique_id() elif not isinstance(id, str): raise TypeError("id needs to be a string or unicode object") instance = SettingsState.__new__(cls) instance.__id__ = id instance.__state__ = 'new' instance.__xcapgroup__ = None configuration = ConfigurationManager() try: data = configuration.get(instance.__key__) except ObjectNotFoundError: pass else: instance.__setstate__(data) instance.__state__ = 'loaded' instance.__xcapgroup__ = instance.__toxcap__() return instance def __establish__(self): if self.__state__ == 'loaded': self.__state__ = 'active' notification_center = NotificationCenter() notification_center.post_notification('AddressbookGroupWasActivated', sender=self) def __repr__(self): return "%s(id=%r)" % (self.__class__.__name__, self.id) def __toxcap__(self): xcap_contacts = [contact.__xcapcontact__ for contact in self.contacts] attributes = dict((name, getattr(self, name)) for name, attr in list(vars(self.__class__).items()) if isinstance(attr, SharedSetting)) return XCAPGroup(self.id, self.name, xcap_contacts, **attributes) @run_in_thread('file-io') def _internal_save(self, originator): if self.__state__ == 'deleted': return for contact in [contact for contact in self.contacts if contact.__state__ == 'deleted']: self.contacts.remove(contact) modified_settings = self.get_modified() if not modified_settings and self.__state__ != 'new': return account_manager = AccountManager() configuration = ConfigurationManager() notification_center = NotificationCenter() if originator is Local: originator_account = None previous_xcapgroup = self.__xcapgroup__ else: originator_account = originator.account previous_xcapgroup = originator.xcap_object xcap_accounts = [account for account in account_manager.get_accounts() if account.xcap.discovered] self.__xcapgroup__ = self.__toxcap__() if self.__state__ == 'new': configuration.update(self.__key__, self.__getstate__()) self.__state__ = 'active' for account in (account for account in xcap_accounts if account is not originator_account): account.xcap_manager.add_group(self.__xcapgroup__) modified_data = None notification_center.post_notification('AddressbookGroupWasActivated', sender=self) notification_center.post_notification('AddressbookGroupWasCreated', sender=self) elif all(isinstance(self.__settings__[key], RuntimeSetting) for key in modified_settings): notification_center.post_notification('AddressbookGroupDidChange', sender=self, data=NotificationData(modified=modified_settings)) return else: configuration.update(self.__key__, self.__getstate__()) attributes = self.__xcapgroup__.get_modified(modified_settings) if 'contacts' in modified_settings: added_contacts = [contact.__xcapcontact__ for contact in modified_settings['contacts'].added] removed_contacts = [contact.__xcapcontact__ for contact in modified_settings['contacts'].removed] else: added_contacts = [] removed_contacts = [] if self.__xcapgroup__ != previous_xcapgroup: outofsync_accounts = xcap_accounts elif originator is Local: outofsync_accounts = [] else: outofsync_accounts = list(account for account in xcap_accounts if account is not originator_account) with MultiAccountTransaction(outofsync_accounts): for account in outofsync_accounts: xcap_manager = account.xcap_manager for xcapcontact in added_contacts: xcap_manager.add_group_member(self.__xcapgroup__, xcapcontact) for xcapcontact in removed_contacts: xcap_manager.remove_group_member(self.__xcapgroup__, xcapcontact) if attributes: xcap_manager.update_group(self.__xcapgroup__, attributes) notification_center.post_notification('AddressbookGroupDidChange', sender=self, data=NotificationData(modified=modified_settings)) modified_data = modified_settings try: configuration.save() except Exception as e: log.exception() notification_center.post_notification('CFGManagerSaveFailed', sender=configuration, data=NotificationData(object=self, operation='save', modified=modified_data, exception=e)) @run_in_thread('file-io') def _internal_delete(self, originator): if self.__state__ == 'deleted': return self.__state__ = 'deleted' configuration = ConfigurationManager() account_manager = AccountManager() notification_center = NotificationCenter() if originator is Local: originator_account = None else: originator_account = originator.account configuration.delete(self.__key__) for account in (account for account in account_manager.iter_accounts() if account.xcap.discovered and account is not originator_account): account.xcap_manager.remove_group(self.__xcapgroup__) notification_center.post_notification('AddressbookGroupWasDeleted', sender=self) try: configuration.save() except Exception as e: log.exception() notification_center.post_notification('CFGManagerSaveFailed', sender=configuration, data=NotificationData(object=self, operation='delete', exception=e)) def save(self): """ Store the group into persistent storage (local and xcap). This method will post the AddressbookGroupWasCreated and AddressbookGroupWasActivated notifications on the first save or a AddressbookGroupDidChange notification on subsequent saves, regardless of whether the contact has been saved to persistent storage or not. A CFGManagerSaveFailed notification is posted if saving to the persistent configuration storage fails. """ self._internal_save(originator=Local) def delete(self): """Remove the group from the persistent storage.""" self._internal_delete(originator=Local) def clone(self, new_id=None): """Create a copy of this group and all its sub-settings.""" raise NotImplementedError @classmethod def register_extension(cls, extension): """ Register an extension for this class. All Settings and SettingsGroups defined in the extension will be added to this class, overwriting any attributes with the same name. Other attributes in the extension are ignored. """ if not issubclass(extension, GroupExtension): raise TypeError("expected subclass of GroupExtension, got %r" % (extension,)) for name in dir(extension): attribute = getattr(extension, name, None) if isinstance(attribute, SharedSetting): if SharedSetting.__namespace__ is None: raise RuntimeError("cannot use SharedSetting attributes without first calling SharedSetting.set_namespace") XCAPGroup.__attributes__.add(name) if isinstance(attribute, (AbstractSetting, SettingsGroupMeta)): setattr(cls, name, attribute) class GroupExtension(object): """Base class for extensions of Groups""" def __new__(cls, *args, **kw): raise TypeError("GroupExtension subclasses cannot be instantiated") class ContactURI(SettingsState): __id__ = SettingsObjectImmutableID(type=ID) id = __id__ uri = Setting(type=str, default='') type = Setting(type=str, default=None, nillable=True) def __new__(cls, id=None, **state): if id is None: id = unique_id() elif not isinstance(id, str): raise TypeError("id needs to be a string or unicode object") instance = SettingsState.__new__(cls) instance.__id__ = id instance.__setstate__(state) return instance def __repr__(self): return "%s(id=%r)" % (self.__class__.__name__, self.id) def __toxcap__(self): attributes = dict((name, getattr(self, name)) for name, attr in list(vars(self.__class__).items()) if isinstance(attr, SharedSetting)) return XCAPContactURI(self.id, self.uri, self.type, **attributes) @classmethod def register_extension(cls, extension): """ Register an extension for this class. All Settings and SettingsGroups defined in the extension will be added to this class, overwriting any attributes with the same name. Other attributes in the extension are ignored. """ if not issubclass(extension, ContactURIExtension): raise TypeError("expected subclass of ContactURIExtension, got %r" % (extension,)) for name in dir(extension): attribute = getattr(extension, name, None) if isinstance(attribute, SharedSetting): if SharedSetting.__namespace__ is None: raise RuntimeError("cannot use SharedSetting attributes without first calling SharedSetting.set_namespace") XCAPContactURI.__attributes__.add(name) if isinstance(attribute, (AbstractSetting, SettingsGroupMeta)): setattr(cls, name, attribute) class ContactURIExtension(object): """Base class for extensions of ContactURIs""" def __new__(cls, *args, **kw): raise TypeError("ContactURIExtension subclasses cannot be instantiated") class DefaultContactURI(Setting): def __init__(self): super(DefaultContactURI, self).__init__(type=str, default=None, nillable=True) def __get__(self, obj, objtype): value = super(DefaultContactURI, self).__get__(obj, objtype) return value if value in (self, None) else obj._item_map.get(value) def __set__(self, obj, value): if value is not None: if not isinstance(value, ContactURI): raise TypeError("the default URI must be a ContactURI instance or None") with obj._lock: if value.id not in obj._item_map: raise ValueError("the default URI can only be set to one of the URIs of the contact") super(DefaultContactURI, self).__set__(obj, value.id) else: super(DefaultContactURI, self).__set__(obj, None) def get_modified(self, obj): modified_value = super(DefaultContactURI, self).get_modified(obj) if modified_value is not None: old_uri = obj._item_map.old.get(modified_value.old) if modified_value.old is not None else None new_uri = obj._item_map.get(modified_value.new) if modified_value.new is not None else None modified_value = ModifiedValue(old=old_uri, new=new_uri) return modified_value def get_old(self, obj): value = super(DefaultContactURI, self).get_old(obj) return value if value is None else obj._item_map.old.get(value) class ContactURIManagement(ItemManagement): def remove_item(self, item, collection): if collection.default is item: collection.default = None def set_items(self, items, collection): if collection.default is not None and collection.default not in items: collection.default = None class ContactURIList(ItemCollection): _item_type = ContactURI _item_management = ContactURIManagement() default = DefaultContactURI() class DialogSettings(SettingsGroup): policy = Setting(type=PolicyValue, default='default') subscribe = Setting(type=bool, default=False) class PresenceSettings(SettingsGroup): policy = Setting(type=PolicyValue, default='default') subscribe = Setting(type=bool, default=False) class Contact(SettingsState): __key__ = AddressbookKey('Contacts') __id__ = SettingsObjectImmutableID(type=ID) id = __id__ name = Setting(type=str, default='') uris = ContactURIList dialog = DialogSettings presence = PresenceSettings def __new__(cls, id=None): with AddressbookManager.load.lock: if not AddressbookManager.load.called: raise RuntimeError("cannot instantiate %s before calling AddressbookManager.load" % cls.__name__) if id is None: id = unique_id() elif not isinstance(id, str): raise TypeError("id needs to be a string or unicode object") instance = SettingsState.__new__(cls) instance.__id__ = id instance.__state__ = 'new' instance.__xcapcontact__ = None configuration = ConfigurationManager() try: data = configuration.get(instance.__key__) except ObjectNotFoundError: pass else: instance.__setstate__(data) instance.__state__ = 'loaded' instance.__xcapcontact__ = instance.__toxcap__() return instance def __establish__(self): if self.__state__ == 'loaded': self.__state__ = 'active' notification_center = NotificationCenter() notification_center.post_notification('AddressbookContactWasActivated', sender=self) def __repr__(self): return "%s(id=%r)" % (self.__class__.__name__, self.id) def __toxcap__(self): contact_uris = xcap.ContactURIList((uri.__toxcap__() for uri in self.uris), default=self.uris.default.id if self.uris.default is not None else None) dialog_handling = xcap.EventHandling(self.dialog.policy, self.dialog.subscribe) presence_handling = xcap.EventHandling(self.presence.policy, self.presence.subscribe) attributes = dict((name, getattr(self, name)) for name, attr in list(vars(self.__class__).items()) if isinstance(attr, SharedSetting)) return XCAPContact(self.id, self.name, contact_uris, presence_handling, dialog_handling, **attributes) @run_in_thread('file-io') def _internal_save(self, originator): if self.__state__ == 'deleted': return modified_settings = self.get_modified() if not modified_settings and self.__state__ != 'new': return account_manager = AccountManager() configuration = ConfigurationManager() notification_center = NotificationCenter() if originator is Local: originator_account = None previous_xcapcontact = self.__xcapcontact__ else: originator_account = originator.account previous_xcapcontact = originator.xcap_object xcap_accounts = [account for account in account_manager.get_accounts() if account.xcap.discovered] self.__xcapcontact__ = self.__toxcap__() if self.__state__ == 'new': configuration.update(self.__key__, self.__getstate__()) self.__state__ = 'active' for account in (account for account in xcap_accounts if account is not originator_account): account.xcap_manager.add_contact(self.__xcapcontact__) modified_data = None notification_center.post_notification('AddressbookContactWasActivated', sender=self) notification_center.post_notification('AddressbookContactWasCreated', sender=self) elif all(isinstance(self.__settings__[key], RuntimeSetting) for key in modified_settings): notification_center.post_notification('AddressbookContactDidChange', sender=self, data=NotificationData(modified=modified_settings)) return else: configuration.update(self.__key__, self.__getstate__()) contact_attributes = self.__xcapcontact__.get_modified(modified_settings) if 'uris' in modified_settings: xcap_uris = self.__xcapcontact__.uris added_uris = [xcap_uris[uri.id] for uri in modified_settings['uris'].added] removed_uris = [uri.__toxcap__() for uri in modified_settings['uris'].removed] modified_uris = dict((xcap_uris[id], xcap_uris[id].get_modified(changemap)) for id, changemap in list(modified_settings['uris'].modified.items())) else: added_uris = [] removed_uris = [] modified_uris = {} if self.__xcapcontact__ != previous_xcapcontact: outofsync_accounts = xcap_accounts elif originator is Local: outofsync_accounts = [] else: outofsync_accounts = list(account for account in xcap_accounts if account is not originator_account) with MultiAccountTransaction(outofsync_accounts): for account in outofsync_accounts: xcap_manager = account.xcap_manager for xcapuri in added_uris: xcap_manager.add_contact_uri(self.__xcapcontact__, xcapuri) for xcapuri in removed_uris: xcap_manager.remove_contact_uri(self.__xcapcontact__, xcapuri) for xcapuri, uri_attributes in list(modified_uris.items()): xcap_manager.update_contact_uri(self.__xcapcontact__, xcapuri, uri_attributes) if contact_attributes: xcap_manager.update_contact(self.__xcapcontact__, contact_attributes) notification_center.post_notification('AddressbookContactDidChange', sender=self, data=NotificationData(modified=modified_settings)) modified_data = modified_settings try: configuration.save() except Exception as e: log.exception() notification_center.post_notification('CFGManagerSaveFailed', sender=configuration, data=NotificationData(object=self, operation='save', modified=modified_data, exception=e)) @run_in_thread('file-io') def _internal_delete(self, originator): if self.__state__ == 'deleted': return self.__state__ = 'deleted' configuration = ConfigurationManager() account_manager = AccountManager() addressbook_manager = AddressbookManager() notification_center = NotificationCenter() if originator is Local: originator_account = None else: originator_account = originator.account configuration.delete(self.__key__) xcap_accounts = [account for account in account_manager.get_accounts() if account.xcap.discovered] with MultiAccountTransaction(xcap_accounts): for group in (group for group in addressbook_manager.get_groups() if self.id in group.contacts): group.contacts.remove(self) group.save() for account in (account for account in xcap_accounts if account is not originator_account): account.xcap_manager.remove_contact(self.__xcapcontact__) notification_center.post_notification('AddressbookContactWasDeleted', sender=self) try: configuration.save() except Exception as e: log.exception() notification_center.post_notification('CFGManagerSaveFailed', sender=configuration, data=NotificationData(object=self, operation='delete', exception=e)) def save(self): """ Store the contact into persistent storage (local and xcap). This method will post the AddressbookContactWasCreated and AddressbookContactWasActivated notifications on the first save or a AddressbookContactDidChange notification on subsequent saves, regardless of whether the contact has been saved to persistent storage or not. A CFGManagerSaveFailed notification is posted if saving to the persistent configuration storage fails. """ self._internal_save(originator=Local) def delete(self): """Remove the contact from the persistent storage.""" self._internal_delete(originator=Local) def clone(self, new_id=None): """Create a copy of this contact and all its sub-settings.""" raise NotImplementedError @classmethod def register_extension(cls, extension): """ Register an extension for this class. All Settings and SettingsGroups defined in the extension will be added to this class, overwriting any attributes with the same name. Other attributes in the extension are ignored. """ if not issubclass(extension, ContactExtension): raise TypeError("expected subclass of ContactExtension, got %r" % (extension,)) for name in dir(extension): attribute = getattr(extension, name, None) if isinstance(attribute, SharedSetting): if SharedSetting.__namespace__ is None: raise RuntimeError("cannot use SharedSetting attributes without first calling SharedSetting.set_namespace") XCAPContact.__attributes__.add(name) if isinstance(attribute, (AbstractSetting, SettingsGroupMeta)): setattr(cls, name, attribute) class ContactExtension(object): """Base class for extensions of Contacts""" def __new__(cls, *args, **kw): raise TypeError("ContactExtension subclasses cannot be instantiated") class Policy(SettingsState): __key__ = AddressbookKey('Policies') __id__ = SettingsObjectImmutableID(type=ID) id = __id__ uri = Setting(type=str, default='') name = Setting(type=str, default='') dialog = DialogSettings presence = PresenceSettings def __new__(cls, id=None): with AddressbookManager.load.lock: if not AddressbookManager.load.called: raise RuntimeError("cannot instantiate %s before calling AddressbookManager.load" % cls.__name__) if id is None: id = unique_id() elif not isinstance(id, str): raise TypeError("id needs to be a string or unicode object") instance = SettingsState.__new__(cls) instance.__id__ = id instance.__state__ = 'new' instance.__xcappolicy__ = None configuration = ConfigurationManager() try: data = configuration.get(instance.__key__) except ObjectNotFoundError: pass else: instance.__setstate__(data) instance.__state__ = 'loaded' instance.__xcappolicy__ = instance.__toxcap__() return instance def __establish__(self): if self.__state__ == 'loaded': self.__state__ = 'active' notification_center = NotificationCenter() notification_center.post_notification('AddressbookPolicyWasActivated', sender=self) def __repr__(self): return "%s(id=%r)" % (self.__class__.__name__, self.id) def __toxcap__(self): dialog_handling = xcap.EventHandling(self.dialog.policy, self.dialog.subscribe) presence_handling = xcap.EventHandling(self.presence.policy, self.presence.subscribe) attributes = dict((name, getattr(self, name)) for name, attr in list(vars(self.__class__).items()) if isinstance(attr, SharedSetting)) return XCAPPolicy(self.id, self.uri, self.name, presence_handling, dialog_handling, **attributes) @run_in_thread('file-io') def _internal_save(self, originator): if self.__state__ == 'deleted': return modified_settings = self.get_modified() if not modified_settings and self.__state__ != 'new': return account_manager = AccountManager() configuration = ConfigurationManager() notification_center = NotificationCenter() if originator is Local: originator_account = None previous_xcappolicy = self.__xcappolicy__ else: originator_account = originator.account previous_xcappolicy = originator.xcap_object xcap_accounts = [account for account in account_manager.get_accounts() if account.xcap.discovered] self.__xcappolicy__ = self.__toxcap__() if self.__state__ == 'new': configuration.update(self.__key__, self.__getstate__()) self.__state__ = 'active' for account in (account for account in xcap_accounts if account is not originator_account): account.xcap_manager.add_policy(self.__xcappolicy__) modified_data = None notification_center.post_notification('AddressbookPolicyWasActivated', sender=self) notification_center.post_notification('AddressbookPolicyWasCreated', sender=self) elif all(isinstance(self.__settings__[key], RuntimeSetting) for key in modified_settings): notification_center.post_notification('AddressbookPolicyDidChange', sender=self, data=NotificationData(modified=modified_settings)) return else: configuration.update(self.__key__, self.__getstate__()) attributes = self.__xcappolicy__.get_modified(modified_settings) if self.__xcappolicy__ != previous_xcappolicy: outofsync_accounts = xcap_accounts elif originator is Local: outofsync_accounts = [] else: outofsync_accounts = list(account for account in xcap_accounts if account is not originator_account) for account in outofsync_accounts: account.xcap_manager.update_policy(self.__xcappolicy__, attributes) notification_center.post_notification('AddressbookPolicyDidChange', sender=self, data=NotificationData(modified=modified_settings)) modified_data = modified_settings try: configuration.save() except Exception as e: log.exception() notification_center.post_notification('CFGManagerSaveFailed', sender=configuration, data=NotificationData(object=self, operation='save', modified=modified_data, exception=e)) @run_in_thread('file-io') def _internal_delete(self, originator): if self.__state__ == 'deleted': return self.__state__ = 'deleted' configuration = ConfigurationManager() account_manager = AccountManager() notification_center = NotificationCenter() if originator is Local: originator_account = None else: originator_account = originator.account configuration.delete(self.__key__) for account in (account for account in account_manager.iter_accounts() if account.xcap.discovered and account is not originator_account): account.xcap_manager.remove_policy(self.__xcappolicy__) notification_center.post_notification('AddressbookPolicyWasDeleted', sender=self) try: configuration.save() except Exception as e: log.exception() notification_center.post_notification('CFGManagerSaveFailed', sender=configuration, data=NotificationData(object=self, operation='delete', exception=e)) def save(self): """ Store the policy into persistent storage (local and xcap). It will post the AddressbookPolicyWasCreated and AddressbookPolicyWasActivated notifications on the first save or a AddressbookPolicyDidChange notification on subsequent saves, regardless of whether the policy has been saved to persistent storage or not. A CFGManagerSaveFailed notification is posted if saving to the persistent configuration storage fails. """ self._internal_save(originator=Local) def delete(self): """Remove the policy from the persistent storage.""" self._internal_delete(originator=Local) def clone(self, new_id=None): """Create a copy of this policy and all its sub-settings.""" raise NotImplementedError @classmethod def register_extension(cls, extension): """ Register an extension for this class. All Settings and SettingsGroups defined in the extension will be added to this class, overwriting any attributes with the same name. Other attributes in the extension are ignored. """ if not issubclass(extension, PolicyExtension): raise TypeError("expected subclass of PolicyExtension, got %r" % (extension,)) for name in dir(extension): attribute = getattr(extension, name, None) if isinstance(attribute, SharedSetting): if SharedSetting.__namespace__ is None: raise RuntimeError("cannot use SharedSetting attributes without first calling SharedSetting.set_namespace") XCAPPolicy.__attributes__.add(name) if isinstance(attribute, (AbstractSetting, SettingsGroupMeta)): setattr(cls, name, attribute) class PolicyExtension(object): """Base class for extensions of Policies""" def __new__(cls, *args, **kw): raise TypeError("PolicyExtension subclasses cannot be instantiated") @implementer(IObserver) class AddressbookManager(object, metaclass=Singleton): def __init__(self): self.contacts = {} self.groups = {} self.policies = {} self.__xcapaddressbook__ = None notification_center = NotificationCenter() notification_center.add_observer(self, name='AddressbookContactWasActivated') notification_center.add_observer(self, name='AddressbookContactWasDeleted') notification_center.add_observer(self, name='AddressbookGroupWasActivated') notification_center.add_observer(self, name='AddressbookGroupWasDeleted') notification_center.add_observer(self, name='AddressbookPolicyWasActivated') notification_center.add_observer(self, name='AddressbookPolicyWasDeleted') notification_center.add_observer(self, name='SIPAccountDidDiscoverXCAPSupport') notification_center.add_observer(self, name='XCAPManagerDidReloadData') @execute_once def load(self): configuration = ConfigurationManager() # temporary workaround to migrate contacts to the new format. to be removed later. -Dan if 'Contacts' in configuration.data or 'ContactGroups' in configuration.data: account_manager = AccountManager() old_data = dict(contacts=configuration.data.pop('Contacts', {}), groups=configuration.data.pop('ContactGroups', {})) if any(account.enabled and account.xcap.enabled and account.xcap.discovered for account in account_manager.get_accounts()): self.__old_data = old_data else: self.__migrate_contacts(old_data) return [Contact(id=id) for id in configuration.get_names(Contact.__key__)] [Group(id=id) for id in configuration.get_names(Group.__key__)] [Policy(id=id) for id in configuration.get_names(Policy.__key__)] def start(self): pass def stop(self): pass def has_contact(self, id): return id in self.contacts def get_contact(self, id): return self.contacts[id] def get_contacts(self): return list(self.contacts.values()) def has_group(self, id): return id in self.groups def get_group(self, id): return self.groups[id] def get_groups(self): return list(self.groups.values()) def has_policy(self, id): return id in self.policies def get_policy(self, id): return self.policies[id] def get_policies(self): return list(self.policies.values()) @classmethod def transaction(cls): account_manager = AccountManager() xcap_accounts = [account for account in account_manager.get_accounts() if account.xcap.discovered] return MultiAccountTransaction(xcap_accounts) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_AddressbookContactWasActivated(self, notification): contact = notification.sender self.contacts[contact.id] = contact notification.center.post_notification('AddressbookManagerDidAddContact', sender=self, data=NotificationData(contact=contact)) def _NH_AddressbookContactWasDeleted(self, notification): contact = notification.sender del self.contacts[contact.id] notification.center.post_notification('AddressbookManagerDidRemoveContact', sender=self, data=NotificationData(contact=contact)) def _NH_AddressbookGroupWasActivated(self, notification): group = notification.sender self.groups[group.id] = group notification.center.post_notification('AddressbookManagerDidAddGroup', sender=self, data=NotificationData(group=group)) def _NH_AddressbookGroupWasDeleted(self, notification): group = notification.sender del self.groups[group.id] notification.center.post_notification('AddressbookManagerDidRemoveGroup', sender=self, data=NotificationData(group=group)) def _NH_AddressbookPolicyWasActivated(self, notification): policy = notification.sender self.policies[policy.id] = policy notification.center.post_notification('AddressbookManagerDidAddPolicy', sender=self, data=NotificationData(policy=policy)) def _NH_AddressbookPolicyWasDeleted(self, notification): policy = notification.sender del self.policies[policy.id] notification.center.post_notification('AddressbookManagerDidRemovePolicy', sender=self, data=NotificationData(policy=policy)) @run_in_thread('file-io') def _NH_SIPAccountDidDiscoverXCAPSupport(self, notification): xcap_manager = notification.sender.xcap_manager with xcap_manager.transaction(): for contact in list(self.contacts.values()): xcap_manager.add_contact(contact.__xcapcontact__) for group in list(self.groups.values()): xcap_manager.add_group(group.__xcapgroup__) for policy in list(self.policies.values()): xcap_manager.add_policy(policy.__xcappolicy__) @run_in_thread('file-io') def _NH_XCAPManagerDidReloadData(self, notification): if notification.data.addressbook == self.__xcapaddressbook__: return self.__xcapaddressbook__ = notification.data.addressbook xcap_manager = notification.sender xcap_contacts = notification.data.addressbook.contacts xcap_groups = notification.data.addressbook.groups xcap_policies = notification.data.addressbook.policies account_manager = AccountManager() xcap_accounts = [account for account in account_manager.get_accounts() if account.xcap.discovered] # temporary workaround to migrate contacts to the new format. to be removed later. -Dan if hasattr(self, '_AddressbookManager__old_data'): old_data = self.__old_data del self.__old_data if not xcap_contacts and not xcap_groups: self.__migrate_contacts(old_data) return with MultiAccountTransaction(xcap_accounts): # because groups depend on contacts, operation order is add/update contacts, add/update/remove groups & policies, remove contacts -Dan for xcap_contact in xcap_contacts: xcap_contact = XCAPContact.normalize(xcap_contact) try: contact = self.contacts[xcap_contact.id] except KeyError: try: contact = Contact(xcap_contact.id) except DuplicateIDError: log.exception() continue contact.name = xcap_contact.name contact.presence.policy = xcap_contact.presence.policy contact.presence.subscribe = xcap_contact.presence.subscribe contact.dialog.policy = xcap_contact.dialog.policy contact.dialog.subscribe = xcap_contact.dialog.subscribe for name, value in list(xcap_contact.attributes.items()): setattr(contact, name, value) for xcap_uri in xcap_contact.uris: xcap_uri = XCAPContactURI.normalize(xcap_uri) try: uri = contact.uris[xcap_uri.id] except KeyError: try: uri = ContactURI(xcap_uri.id) except DuplicateIDError: log.exception() continue contact.uris.add(uri) uri.uri = xcap_uri.uri uri.type = xcap_uri.type for name, value in list(xcap_uri.attributes.items()): setattr(uri, name, value) for uri in (uri for uri in list(contact.uris) if uri.id not in xcap_contact.uris): contact.uris.remove(uri) contact.uris.default = contact.uris.get(xcap_contact.uris.default, None) contact._internal_save(originator=Remote(xcap_manager.account, xcap_contact)) for xcap_group in xcap_groups: xcap_group = XCAPGroup.normalize(xcap_group) try: group = self.groups[xcap_group.id] except KeyError: try: group = Group(xcap_group.id) except DuplicateIDError: log.exception() continue group.name = xcap_group.name for name, value in list(xcap_group.attributes.items()): setattr(group, name, value) old_contact_ids = set(group.contacts.ids()) new_contact_ids = set(xcap_group.contacts.ids()) for contact in (self.contacts[id] for id in new_contact_ids - old_contact_ids): group.contacts.add(contact) for contact in (group.contacts[id] for id in old_contact_ids - new_contact_ids): group.contacts.remove(contact) group._internal_save(originator=Remote(xcap_manager.account, xcap_group)) for xcap_policy in xcap_policies: xcap_policy = XCAPPolicy.normalize(xcap_policy) try: policy = self.policies[xcap_policy.id] except KeyError: try: policy = Policy(xcap_policy.id) except DuplicateIDError: log.exception() continue policy.uri = xcap_policy.uri policy.name = xcap_policy.name policy.presence.policy = xcap_policy.presence.policy policy.presence.subscribe = xcap_policy.presence.subscribe policy.dialog.policy = xcap_policy.dialog.policy policy.dialog.subscribe = xcap_policy.dialog.subscribe for name, value in list(xcap_policy.attributes.items()): setattr(policy, name, value) policy._internal_save(originator=Remote(xcap_manager.account, xcap_policy)) originator = Remote(xcap_manager.account, None) for policy in (policy for policy in list(self.policies.values()) if policy.id not in xcap_policies): policy._internal_delete(originator=originator) for group in (group for group in list(self.groups.values()) if group.id not in xcap_groups): group._internal_delete(originator=originator) for contact in (contact for contact in list(self.contacts.values()) if contact.id not in xcap_contacts): contact._internal_delete(originator=originator) def __migrate_contacts(self, old_data): account_manager = AccountManager() xcap_accounts = [account for account in account_manager.get_accounts() if account.xcap.discovered] with MultiAccountTransaction(xcap_accounts): # restore the old contacts and groups old_groups = old_data['groups'] old_contacts = old_data['contacts'] group_idmap = {} for group_id, group_state in list(old_groups.items()): group_idmap[group_id] = group = Group() for name, value in list(group_state.items()): try: setattr(group, name, value) except (ValueError, TypeError): pass for account_id, account_contacts in list(old_contacts.items()): for group_id, contact_map in list(account_contacts.items()): for uri, contact_data in list(contact_map.items()): contact = Contact() for name, value in list(contact_data.items()): try: setattr(contact, name, value) except (ValueError, TypeError): pass contact.uris.add(ContactURI(uri=uri)) contact.save() group = group_idmap.get(group_id, Null) group.contacts.add(contact) for group in list(group_idmap.values()): group.save() diff --git a/sipsimple/application.py b/sipsimple/application.py index 2376a952..93f36c95 100644 --- a/sipsimple/application.py +++ b/sipsimple/application.py @@ -1,527 +1,527 @@ """ Implements a high-level application responsible for starting and stopping various sub-systems required to implement a fully featured SIP User Agent application. """ __all__ = ["SIPApplication"] import os -from .application.notification import IObserver, NotificationCenter, NotificationData -from .application.python import Null -from .application.python.descriptor import classproperty -from .application.python.types import Singleton +from application.notification import IObserver, NotificationCenter, NotificationData +from application.python import Null +from application.python.descriptor import classproperty +from application.python.types import Singleton from eventlib import proc from operator import attrgetter -from .threading import RLock, Thread +from threading import RLock, Thread from twisted.internet import reactor from uuid import uuid4 from xcaplib import client as xcap_client from zope.interface import implementer from sipsimple.account import AccountManager from sipsimple.addressbook import AddressbookManager from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration import ConfigurationManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer, Engine from sipsimple.lookup import DNSManager from sipsimple.session import SessionManager from sipsimple.storage import ISIPSimpleStorage, ISIPSimpleApplicationDataStorage from sipsimple.threading import ThreadManager, run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.video import VideoDevice class ApplicationAttribute(object): def __init__(self, value): self.value = value def __get__(self, obj, objtype): return self.value def __set__(self, obj, value): self.value = value def __delete__(self, obj): raise AttributeError('cannot delete attribute') @implementer(IObserver) class SIPApplication(object, metaclass=Singleton): storage = ApplicationAttribute(value=None) engine = ApplicationAttribute(value=None) thread = ApplicationAttribute(value=None) state = ApplicationAttribute(value=None) alert_audio_device = ApplicationAttribute(value=None) alert_audio_bridge = ApplicationAttribute(value=None) voice_audio_device = ApplicationAttribute(value=None) voice_audio_bridge = ApplicationAttribute(value=None) video_device = ApplicationAttribute(value=None) _lock = ApplicationAttribute(value=RLock()) _timer = ApplicationAttribute(value=None) _stop_pending = ApplicationAttribute(value=False) running = classproperty(lambda cls: cls.state == 'started') alert_audio_mixer = classproperty(lambda cls: cls.alert_audio_bridge.mixer if cls.alert_audio_bridge else None) voice_audio_mixer = classproperty(lambda cls: cls.voice_audio_bridge.mixer if cls.voice_audio_bridge else None) def start(self, storage): if not ISIPSimpleStorage.providedBy(storage): raise TypeError("storage must implement the ISIPSimpleStorage interface") with self._lock: if self.state is not None: raise RuntimeError("SIPApplication cannot be started from '%s' state" % self.state) self.state = 'starting' self.engine = Engine() self.storage = storage thread_manager = ThreadManager() thread_manager.start() configuration_manager = ConfigurationManager() addressbook_manager = AddressbookManager() account_manager = AccountManager() # load configuration and initialize core try: configuration_manager.start() SIPSimpleSettings() account_manager.load() addressbook_manager.load() except: self.engine = None self.state = None self.storage = None raise # run the reactor thread self.thread = Thread(name='Reactor Thread', target=self._run_reactor) self.thread.start() def stop(self): with self._lock: if self.state in (None, 'stopping', 'stopped'): return elif self.state == 'starting': self._stop_pending = True return self.state = 'stopping' notification_center = NotificationCenter() notification_center.post_notification('SIPApplicationWillEnd', sender=self) self._shutdown_subsystems() def _run_reactor(self): from eventlib.twistedutil import join_reactor; del join_reactor # imported for the side effect of making the twisted reactor green notification_center = NotificationCenter() notification_center.post_notification('SIPApplicationWillStart', sender=self) with self._lock: stop_pending = self._stop_pending if stop_pending: self.state = 'stopping' if stop_pending: notification_center.post_notification('SIPApplicationWillEnd', sender=self) else: self._initialize_core() reactor.run(installSignalHandlers=False) with self._lock: self.state = 'stopped' notification_center.post_notification('SIPApplicationDidEnd', sender=self) def _initialize_core(self): notification_center = NotificationCenter() settings = SIPSimpleSettings() # initialize core options = dict(# general user_agent=settings.user_agent, # SIP detect_sip_loops=True, udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # video video_codecs=list(settings.rtp.video_codec_list), # logging log_level=settings.logs.pjsip_level if settings.logs.trace_pjsip else 0, trace_sip=settings.logs.trace_sip) notification_center.add_observer(self, sender=self.engine) self.engine.start(**options) def _initialize_tls(self): settings = SIPSimpleSettings() account_manager = AccountManager() account = account_manager.default_account if account is not None: try: self.engine.set_tls_options(port=settings.sip.tls_port, verify_server=account.tls.verify_server, ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None, cert_file=account.tls.certificate.normalized if account.tls.certificate else None, privkey_file=account.tls.certificate.normalized if account.tls.certificate else None) except Exception as e: notification_center = NotificationCenter() notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e)) @run_in_green_thread def _initialize_subsystems(self): notification_center = NotificationCenter() with self._lock: stop_pending = self._stop_pending if stop_pending: self.state = 'stopping' if stop_pending: notification_center.post_notification('SIPApplicationWillEnd', sender=self) # stop the subsystems we already started: threads, engine and reactor self.engine.stop() self.engine.join(timeout=5) thread_manager = ThreadManager() thread_manager.stop() reactor.stop() return account_manager = AccountManager() addressbook_manager = AddressbookManager() dns_manager = DNSManager() session_manager = SessionManager() settings = SIPSimpleSettings() xcap_client.DEFAULT_HEADERS = {'User-Agent': settings.user_agent} # initialize TLS self._initialize_tls() # initialize PJSIP internal resolver self.engine.set_nameservers(dns_manager.nameservers) # initialize audio objects alert_device = settings.audio.alert_device if alert_device not in (None, 'system_default') and alert_device not in self.engine.output_devices: alert_device = 'system_default' input_device = settings.audio.input_device if input_device not in (None, 'system_default') and input_device not in self.engine.input_devices: input_device = 'system_default' output_device = settings.audio.output_device if output_device not in (None, 'system_default') and output_device not in self.engine.output_devices: output_device = 'system_default' tail_length = settings.audio.echo_canceller.tail_length if settings.audio.echo_canceller.enabled else 0 voice_mixer = AudioMixer(input_device, output_device, settings.audio.sample_rate, tail_length) voice_mixer.muted = settings.audio.muted self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) alert_mixer = AudioMixer(None, alert_device, settings.audio.sample_rate, 0) if settings.audio.silent: alert_mixer.output_volume = 0 self.alert_audio_device = AudioDevice(alert_mixer) self.alert_audio_bridge = RootAudioBridge(alert_mixer) self.alert_audio_bridge.add(self.alert_audio_device) settings.audio.input_device = voice_mixer.input_device settings.audio.output_device = voice_mixer.output_device settings.audio.alert_device = alert_mixer.output_device # initialize video self.video_device = VideoDevice(settings.video.device, settings.video.resolution, settings.video.framerate) self.video_device.muted = settings.video.muted settings.video.device = self.video_device.name self.engine.set_video_options(settings.video.resolution, settings.video.framerate, settings.video.max_bitrate) self.engine.set_h264_options(settings.video.h264.profile, settings.video.h264.level) # initialize instance id if not settings.instance_id: settings.instance_id = uuid4().urn # initialize path for ZRTP cache file if ISIPSimpleApplicationDataStorage.providedBy(self.storage): self.engine.zrtp_cache = os.path.join(self.storage.directory, 'zrtp.db') # save settings in case something was modified during startup settings.save() # initialize middleware components dns_manager.start() account_manager.start() addressbook_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') notification_center.add_observer(self, name='DNSNameserversDidChange') notification_center.add_observer(self, name='SystemIPAddressDidChange') notification_center.add_observer(self, name='SystemDidWakeUpFromSleep') with self._lock: self.state = 'started' stop_pending = self._stop_pending notification_center.post_notification('SIPApplicationDidStart', sender=self) if stop_pending: self.stop() @run_in_green_thread def _shutdown_subsystems(self): # cleanup internals if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None # shutdown middleware components dns_manager = DNSManager() account_manager = AccountManager() addressbook_manager = AddressbookManager() session_manager = SessionManager() procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(addressbook_manager.stop), proc.spawn(session_manager.stop)] proc.waitall(procs) # stop video device self.video_device.producer.close() # shutdown engine self.engine.stop() self.engine.join(timeout=5) # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _network_conditions_changed(self): if self.running and self._timer is None: def notify(): if self.running: settings = SIPSimpleSettings() if 'tcp' in settings.sip.transport_list: self.engine.set_tcp_port(None) self.engine.set_tcp_port(settings.sip.tcp_port) if 'tls' in settings.sip.transport_list: self._initialize_tls() notification_center = NotificationCenter() notification_center.post_notification('NetworkConditionsDidChange', sender=self) self._timer = None self._timer = reactor.callLater(5, notify) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPEngineDidStart(self, notification): self._initialize_subsystems() def _NH_SIPEngineDidFail(self, notification): with self._lock: if self.state == 'stopping': return self.state = 'stopping' notification.center.post_notification('SIPApplicationWillEnd', sender=self) # # In theory we need to stop the subsystems here, based on what subsystems are already running according to our state, # but in practice the majority of those subsystems need the engine even to stop and the engine has failed. # # Even the ThreadManager might have threads that try to execute operations on the engine, which could block indefinitely # waiting for an answer that will no longer arrive, thus blocking the ThreadManager stop operation. # # As a result the safest thing to do is to just stop the engine thread and the reactor, which means in this case we # will not cleanup properly (the engine thread should already have ended as a result of the failure, so stopping it # is technically a no-op). # self.engine.stop() self.engine.join(timeout=5) reactor.stop() def _NH_SIPEngineGotException(self, notification): notification.center.post_notification('SIPApplicationGotFatalError', sender=self, data=notification.data) @run_in_thread('device-io') def _NH_CFGSettingsObjectDidChange(self, notification): settings = SIPSimpleSettings() account_manager = AccountManager() if notification.sender is settings: if 'audio.sample_rate' in notification.data.modified: alert_device = settings.audio.alert_device if alert_device not in (None, 'system_default') and alert_device not in self.engine.output_devices: alert_device = 'system_default' input_device = settings.audio.input_device if input_device not in (None, 'system_default') and input_device not in self.engine.input_devices: input_device = 'system_default' output_device = settings.audio.output_device if output_device not in (None, 'system_default') and output_device not in self.engine.output_devices: output_device = 'system_default' tail_length = settings.audio.echo_canceller.tail_length if settings.audio.echo_canceller.enabled else 0 voice_mixer = AudioMixer(input_device, output_device, settings.audio.sample_rate, tail_length) voice_mixer.muted = settings.audio.muted self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) alert_mixer = AudioMixer(None, alert_device, settings.audio.sample_rate, 0) self.alert_audio_device = AudioDevice(alert_mixer) self.alert_audio_bridge = RootAudioBridge(alert_mixer) self.alert_audio_bridge.add(self.alert_audio_device) if settings.audio.silent: alert_mixer.output_volume = 0 settings.audio.input_device = voice_mixer.input_device settings.audio.output_device = voice_mixer.output_device settings.audio.alert_device = alert_mixer.output_device settings.save() else: if {'audio.input_device', 'audio.output_device', 'audio.alert_device', 'audio.echo_canceller.enabled', 'audio.echo_canceller.tail_length'}.intersection(notification.data.modified): input_device = settings.audio.input_device if input_device not in (None, 'system_default') and input_device not in self.engine.input_devices: input_device = 'system_default' output_device = settings.audio.output_device if output_device not in (None, 'system_default') and output_device not in self.engine.output_devices: output_device = 'system_default' tail_length = settings.audio.echo_canceller.tail_length if settings.audio.echo_canceller.enabled else 0 if (input_device, output_device, tail_length) != attrgetter('input_device', 'output_device', 'ec_tail_length')(self.voice_audio_bridge.mixer): self.voice_audio_bridge.mixer.set_sound_devices(input_device, output_device, tail_length) settings.audio.input_device = self.voice_audio_bridge.mixer.input_device settings.audio.output_device = self.voice_audio_bridge.mixer.output_device settings.save() alert_device = settings.audio.alert_device if alert_device not in (None, 'system_default') and alert_device not in self.engine.output_devices: alert_device = 'system_default' if alert_device != self.alert_audio_bridge.mixer.output_device: self.alert_audio_bridge.mixer.set_sound_devices(None, alert_device, 0) settings.audio.alert_device = self.alert_audio_bridge.mixer.output_device settings.save() if 'audio.muted' in notification.data.modified: self.voice_audio_bridge.mixer.muted = settings.audio.muted if 'audio.silent' in notification.data.modified: if settings.audio.silent: self.alert_audio_bridge.mixer.output_volume = 0 else: self.alert_audio_bridge.mixer.output_volume = 100 if 'video.muted' in notification.data.modified: self.video_device.muted = settings.video.muted if {'video.h264.profile', 'video.h264.level'}.intersection(notification.data.modified): self.engine.set_h264_options(settings.video.h264.profile, settings.video.h264.level) if {'video.device', 'video.resolution', 'video.framerate', 'video.max_bitrate'}.intersection(notification.data.modified): if {'video.device', 'video.resolution', 'video.framerate'}.intersection(notification.data.modified) or settings.video.device != self.video_device.name: self.video_device.set_camera(settings.video.device, settings.video.resolution, settings.video.framerate) settings.video.device = self.video_device.name settings.save() self.engine.set_video_options(settings.video.resolution, settings.video.framerate, settings.video.max_bitrate) if 'user_agent' in notification.data.modified: self.engine.user_agent = settings.user_agent if 'sip.udp_port' in notification.data.modified: self.engine.set_udp_port(settings.sip.udp_port) if 'sip.tcp_port' in notification.data.modified: self.engine.set_tcp_port(settings.sip.tcp_port) if {'sip.tls_port', 'tls.ca_list', 'default_account'}.intersection(notification.data.modified): self._initialize_tls() if 'rtp.port_range' in notification.data.modified: self.engine.rtp_port_range = (settings.rtp.port_range.start, settings.rtp.port_range.end) if 'rtp.audio_codec_list' in notification.data.modified: self.engine.codecs = list(settings.rtp.audio_codec_list) if 'logs.trace_sip' in notification.data.modified: self.engine.trace_sip = settings.logs.trace_sip if {'logs.trace_pjsip', 'logs.pjsip_level'}.intersection(notification.data.modified): self.engine.log_level = settings.logs.pjsip_level if settings.logs.trace_pjsip else 0 elif notification.sender is account_manager.default_account: if {'tls.verify_server', 'tls.certificate'}.intersection(notification.data.modified): self._initialize_tls() @run_in_thread('device-io') def _NH_DefaultAudioDeviceDidChange(self, notification): if None in (self.voice_audio_bridge, self.alert_audio_bridge): return settings = SIPSimpleSettings() current_input_device = self.voice_audio_bridge.mixer.input_device current_output_device = self.voice_audio_bridge.mixer.output_device current_alert_device = self.alert_audio_bridge.mixer.output_device ec_tail_length = self.voice_audio_bridge.mixer.ec_tail_length if notification.data.changed_input and 'system_default' in (current_input_device, settings.audio.input_device): self.voice_audio_bridge.mixer.set_sound_devices('system_default', current_output_device, ec_tail_length) if notification.data.changed_output and 'system_default' in (current_output_device, settings.audio.output_device): self.voice_audio_bridge.mixer.set_sound_devices(current_input_device, 'system_default', ec_tail_length) if notification.data.changed_output and 'system_default' in (current_alert_device, settings.audio.alert_device): self.alert_audio_bridge.mixer.set_sound_devices(None, 'system_default', 0) @run_in_thread('device-io') def _NH_AudioDevicesDidChange(self, notification): old_devices = set(notification.data.old_devices) new_devices = set(notification.data.new_devices) removed_devices = old_devices - new_devices if not removed_devices: return input_device = self.voice_audio_bridge.mixer.input_device output_device = self.voice_audio_bridge.mixer.output_device alert_device = self.alert_audio_bridge.mixer.output_device if self.voice_audio_bridge.mixer.real_input_device in removed_devices: input_device = 'system_default' if new_devices else None if self.voice_audio_bridge.mixer.real_output_device in removed_devices: output_device = 'system_default' if new_devices else None if self.alert_audio_bridge.mixer.real_output_device in removed_devices: alert_device = 'system_default' if new_devices else None self.voice_audio_bridge.mixer.set_sound_devices(input_device, output_device, self.voice_audio_bridge.mixer.ec_tail_length) self.alert_audio_bridge.mixer.set_sound_devices(None, alert_device, 0) settings = SIPSimpleSettings() settings.audio.input_device = self.voice_audio_bridge.mixer.input_device settings.audio.output_device = self.voice_audio_bridge.mixer.output_device settings.audio.alert_device = self.alert_audio_bridge.mixer.output_device settings.save() @run_in_thread('device-io') def _NH_VideoDevicesDidChange(self, notification): old_devices = set(notification.data.old_devices) new_devices = set(notification.data.new_devices) removed_devices = old_devices - new_devices if not removed_devices: return device = self.video_device.name if self.video_device.real_name in removed_devices: device = 'system_default' if new_devices else None settings = SIPSimpleSettings() self.video_device.set_camera(device, settings.video.resolution, settings.video.framerate) settings.video.device = self.video_device.name settings.save() def _NH_DNSNameserversDidChange(self, notification): if self.running: self.engine.set_nameservers(notification.data.nameservers) notification.center.post_notification('NetworkConditionsDidChange', sender=self) def _NH_SystemIPAddressDidChange(self, notification): self._network_conditions_changed() def _NH_SystemDidWakeUpFromSleep(self, notification): self._network_conditions_changed() diff --git a/sipsimple/audio.py b/sipsimple/audio.py index 7790f7f8..b952f130 100644 --- a/sipsimple/audio.py +++ b/sipsimple/audio.py @@ -1,537 +1,537 @@ """Audio support""" __all__ = ['IAudioPort', 'AudioDevice', 'AudioBridge', 'RootAudioBridge', 'AudioConference', 'WavePlayer', 'WavePlayerError', 'WaveRecorder'] import os import weakref from functools import partial from itertools import combinations -from .threading import RLock +from threading import RLock -from .application.notification import IObserver, NotificationCenter, NotificationData, ObserverWeakrefProxy -from .application.system import makedirs +from application.notification import IObserver, NotificationCenter, NotificationData, ObserverWeakrefProxy +from application.system import makedirs from eventlib import coros from twisted.internet import reactor from zope.interface import Attribute, Interface, implementer from sipsimple.core import MixerPort, RecordingWaveFile, SIPCoreError, WaveFile from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread, run_in_waitable_green_thread class WavePlayerError(Exception): pass class IAudioPort(Interface): """ Interface describing an object which can produce and/or consume audio data. If an object cannot produce audio data, its producer_slot attribute must be None; similarly, if an object cannot consume audio data, its consumer_slot attribute must be None. As part of the interface, whenever an IAudioPort implementation changes its slot attributes, it must send a AudioPortDidChangeSlots notification with the following attributes in the notification data: * consumer_slot_changed * producer_slot_changed * old_consumer_slot (only required if consumer_slot_changed is True) * new_consumer_slot (only required if consumer_slot_changed is True) * old_producer_slot (only required if producer_slot_changed is True) * new_producer_slot (only required if producer_slot_changed is True) All attributes of this interface are read-only. """ mixer = Attribute("The mixer that is responsible for mixing the audio data to/from this audio port") consumer_slot = Attribute("The slot to which audio data can be written") producer_slot = Attribute("The slot from which audio data can be read") @implementer(IAudioPort) class AudioDevice(object): """ Objects of this class represent an audio device which can be used in an AudioBridge as they implement the IAudioPort interface. Since a mixer is connected to an audio device which provides the mixer's clock, an AudioDevice constructed for a specific mixer represents the device that mixer is using. """ def __init__(self, mixer, input_muted=False, output_muted=False): self.mixer = mixer self.__dict__['input_muted'] = input_muted self.__dict__['output_muted'] = output_muted @property def consumer_slot(self): return 0 if not self.output_muted else None @property def producer_slot(self): return 0 if not self.input_muted else None @property def input_muted(self): return self.__dict__['input_muted'] @input_muted.setter def input_muted(self, value): if not isinstance(value, bool): raise ValueError('illegal value for input_muted property: %r' % (value,)) if value == self.input_muted: return old_producer_slot = self.producer_slot self.__dict__['input_muted'] = value notification_center = NotificationCenter() notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot)) @property def output_muted(self): return self.__dict__['output_muted'] @output_muted.setter def output_muted(self, value): if not isinstance(value, bool): raise ValueError('illegal value for output_muted property: %r' % (value,)) if value == self.output_muted: return old_consumer_slot = self.consumer_slot self.__dict__['output_muted'] = value notification_center = NotificationCenter() notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=False, old_consumer_slot=old_consumer_slot, new_consumer_slot=self.consumer_slot)) @implementer(IAudioPort, IObserver) class AudioBridge(object): """ An AudioBridge is a container for objects providing the IAudioPort interface. It connects all such objects in a full-mesh such that all audio producers are connected to all consumers. AudioBridge implements the IAudioPort interface which means a bridge can contain another bridge. This must be done such that the resulting structure is a tree (i.e. no loops are allowed). All leafs of the tree will be connected as if they were the children of a single bridge. """ def __init__(self, mixer): self._lock = RLock() self.ports = set() self.mixer = mixer self.multiplexer = MixerPort(mixer) self.demultiplexer = MixerPort(mixer) self.multiplexer.start() self.demultiplexer.start() notification_center = NotificationCenter() notification_center.add_observer(ObserverWeakrefProxy(self), name='AudioPortDidChangeSlots') def __del__(self): self.multiplexer.stop() self.demultiplexer.stop() if len(self.ports) >= 2: for port1, port2 in ((wr1(), wr2()) for wr1, wr2 in combinations(self.ports, 2)): if port1 is None or port2 is None: continue if port1.producer_slot is not None and port2.consumer_slot is not None: self.mixer.disconnect_slots(port1.producer_slot, port2.consumer_slot) if port2.producer_slot is not None and port1.consumer_slot is not None: self.mixer.disconnect_slots(port2.producer_slot, port1.consumer_slot) self.ports.clear() def __contains__(self, port): return weakref.ref(port) in self.ports @property def consumer_slot(self): return self.demultiplexer.slot if self.demultiplexer.is_active else None @property def producer_slot(self): return self.multiplexer.slot if self.multiplexer.is_active else None def add(self, port): with self._lock: if not IAudioPort.providedBy(port): raise TypeError("expected object implementing IAudioPort, got %s" % port.__class__.__name__) if port.mixer is not self.mixer: raise ValueError("expected port with Mixer %r, got %r" % (self.mixer, port.mixer)) if weakref.ref(port) in self.ports: return if port.consumer_slot is not None and self.demultiplexer.slot is not None: self.mixer.connect_slots(self.demultiplexer.slot, port.consumer_slot) if port.producer_slot is not None and self.multiplexer.slot is not None: self.mixer.connect_slots(port.producer_slot, self.multiplexer.slot) for other in (wr() for wr in self.ports): if other is None: continue if other.producer_slot is not None and port.consumer_slot is not None: self.mixer.connect_slots(other.producer_slot, port.consumer_slot) if port.producer_slot is not None and other.consumer_slot is not None: self.mixer.connect_slots(port.producer_slot, other.consumer_slot) # This hack is required because a weakly referenced object keeps a # strong reference to weak references of itself and thus to any # callbacks registered in those weak references. To be more # precise, we don't want the port to have a strong reference to # ourselves. -Luci self.ports.add(weakref.ref(port, partial(self._remove_port, weakref.ref(self)))) def remove(self, port): with self._lock: if weakref.ref(port) not in self.ports: raise ValueError("port %r is not part of this bridge" % port) if port.consumer_slot is not None and self.demultiplexer.slot is not None: self.mixer.disconnect_slots(self.demultiplexer.slot, port.consumer_slot) if port.producer_slot is not None and self.multiplexer.slot is not None: self.mixer.disconnect_slots(port.producer_slot, self.multiplexer.slot) for other in (wr() for wr in self.ports): if other is None: continue if other.producer_slot is not None and port.consumer_slot is not None: self.mixer.disconnect_slots(other.producer_slot, port.consumer_slot) if port.producer_slot is not None and other.consumer_slot is not None: self.mixer.disconnect_slots(port.producer_slot, other.consumer_slot) self.ports.remove(weakref.ref(port)) def stop(self): with self._lock: for port1 in (wr() for wr in self.ports): if port1 is None: continue for port2 in (wr() for wr in self.ports): if port2 is None or port2 is port1: continue if port1.producer_slot is not None and port2.consumer_slot is not None: self.mixer.disconnect_slots(port1.producer_slot, port2.consumer_slot) if port2.producer_slot is not None and port1.consumer_slot is not None: self.mixer.disconnect_slots(port2.producer_slot, port1.consumer_slot) self.ports.clear() self.multiplexer.stop() self.demultiplexer.stop() def handle_notification(self, notification): with self._lock: if weakref.ref(notification.sender) not in self.ports: return if notification.data.consumer_slot_changed: if notification.data.old_consumer_slot is not None and self.demultiplexer.slot is not None: self.mixer.disconnect_slots(self.demultiplexer.slot, notification.data.old_consumer_slot) if notification.data.new_consumer_slot is not None and self.demultiplexer.slot is not None: self.mixer.connect_slots(self.demultiplexer.slot, notification.data.new_consumer_slot) for other in (wr() for wr in self.ports): if other is None or other is notification.sender or other.producer_slot is None: continue if notification.data.old_consumer_slot is not None: self.mixer.disconnect_slots(other.producer_slot, notification.data.old_consumer_slot) if notification.data.new_consumer_slot is not None: self.mixer.connect_slots(other.producer_slot, notification.data.new_consumer_slot) if notification.data.producer_slot_changed: if notification.data.old_producer_slot is not None and self.multiplexer.slot is not None: self.mixer.disconnect_slots(notification.data.old_producer_slot, self.multiplexer.slot) if notification.data.new_producer_slot is not None and self.multiplexer.slot is not None: self.mixer.connect_slots(notification.data.new_producer_slot, self.multiplexer.slot) for other in (wr() for wr in self.ports): if other is None or other is notification.sender or other.consumer_slot is None: continue if notification.data.old_producer_slot is not None: self.mixer.disconnect_slots(notification.data.old_producer_slot, other.consumer_slot) if notification.data.new_producer_slot is not None: self.mixer.connect_slots(notification.data.new_producer_slot, other.consumer_slot) @staticmethod def _remove_port(selfwr, portwr): self = selfwr() if self is not None: with self._lock: self.ports.discard(portwr) @implementer(IObserver) class RootAudioBridge(object): """ A RootAudioBridge is a container for objects providing the IAudioPort interface. It connects all such objects in a full-mesh such that all audio producers are connected to all consumers. The difference between a RootAudioBridge and an AudioBridge is that the RootAudioBridge does not implement the IAudioPort interface. This makes it more efficient. """ def __init__(self, mixer): self.mixer = mixer self.ports = set() self._lock = RLock() notification_center = NotificationCenter() notification_center.add_observer(ObserverWeakrefProxy(self), name='AudioPortDidChangeSlots') def __del__(self): if len(self.ports) >= 2: for port1, port2 in ((wr1(), wr2()) for wr1, wr2 in combinations(self.ports, 2)): if port1 is None or port2 is None: continue if port1.producer_slot is not None and port2.consumer_slot is not None: self.mixer.disconnect_slots(port1.producer_slot, port2.consumer_slot) if port2.producer_slot is not None and port1.consumer_slot is not None: self.mixer.disconnect_slots(port2.producer_slot, port1.consumer_slot) self.ports.clear() def __contains__(self, port): return weakref.ref(port) in self.ports def add(self, port): with self._lock: if not IAudioPort.providedBy(port): raise TypeError("expected object implementing IAudioPort, got %s" % port.__class__.__name__) if port.mixer is not self.mixer: raise ValueError("expected port with Mixer %r, got %r" % (self.mixer, port.mixer)) if weakref.ref(port) in self.ports: return for other in (wr() for wr in self.ports): if other is None: continue if other.producer_slot is not None and port.consumer_slot is not None: self.mixer.connect_slots(other.producer_slot, port.consumer_slot) if port.producer_slot is not None and other.consumer_slot is not None: self.mixer.connect_slots(port.producer_slot, other.consumer_slot) # This hack is required because a weakly referenced object keeps a # strong reference to weak references of itself and thus to any # callbacks registered in those weak references. To be more # precise, we don't want the port to have a strong reference to # ourselves. -Luci self.ports.add(weakref.ref(port, partial(self._remove_port, weakref.ref(self)))) def remove(self, port): with self._lock: if weakref.ref(port) not in self.ports: raise ValueError("port %r is not part of this bridge" % port) for other in (wr() for wr in self.ports): if other is None: continue if other.producer_slot is not None and port.consumer_slot is not None: self.mixer.disconnect_slots(other.producer_slot, port.consumer_slot) if port.producer_slot is not None and other.consumer_slot is not None: self.mixer.disconnect_slots(port.producer_slot, other.consumer_slot) self.ports.remove(weakref.ref(port)) def handle_notification(self, notification): with self._lock: if weakref.ref(notification.sender) not in self.ports: return if notification.data.consumer_slot_changed: for other in (wr() for wr in self.ports): if other is None or other is notification.sender or other.producer_slot is None: continue if notification.data.old_consumer_slot is not None: self.mixer.disconnect_slots(other.producer_slot, notification.data.old_consumer_slot) if notification.data.new_consumer_slot is not None: self.mixer.connect_slots(other.producer_slot, notification.data.new_consumer_slot) if notification.data.producer_slot_changed: for other in (wr() for wr in self.ports): if other is None or other is notification.sender or other.consumer_slot is None: continue if notification.data.old_producer_slot is not None: self.mixer.disconnect_slots(notification.data.old_producer_slot, other.consumer_slot) if notification.data.new_producer_slot is not None: self.mixer.connect_slots(notification.data.new_producer_slot, other.consumer_slot) @staticmethod def _remove_port(selfwr, portwr): self = selfwr() if self is not None: with self._lock: self.ports.discard(portwr) class AudioConference(object): def __init__(self): from sipsimple.application import SIPApplication mixer = SIPApplication.voice_audio_mixer self.bridge = RootAudioBridge(mixer) self.device = AudioDevice(mixer) self.on_hold = False self.streams = [] self._lock = RLock() self.bridge.add(self.device) def add(self, stream): with self._lock: if stream in self.streams: return stream.bridge.remove(stream.device) self.bridge.add(stream.bridge) self.streams.append(stream) def remove(self, stream): with self._lock: self.streams.remove(stream) self.bridge.remove(stream.bridge) stream.bridge.add(stream.device) def hold(self): with self._lock: if self.on_hold: return self.bridge.remove(self.device) self.on_hold = True def unhold(self): with self._lock: if not self.on_hold: return self.bridge.add(self.device) self.on_hold = False @implementer(IAudioPort, IObserver) class WavePlayer(object): """ An object capable of playing a WAV file. It can be used as part of an AudioBridge as it implements the IAudioPort interface. """ def __init__(self, mixer, filename, volume=100, loop_count=1, pause_time=0, initial_delay=0): self.mixer = mixer self.filename = filename self.initial_delay = initial_delay self.loop_count = loop_count self.pause_time = pause_time self.volume = volume self._channel = None self._current_loop = 0 self._state = 'stopped' self._wave_file = None @property def is_active(self): return self._state == "started" @property def consumer_slot(self): return None @property def producer_slot(self): return self._wave_file.slot if self._wave_file else None def start(self): self.play() @run_in_green_thread # run stop in a green thread in order to be similar with start/play. this avoids start/stop running out of order. def stop(self): if self._state != 'started': return self._channel.send(Command('stop')) @run_in_waitable_green_thread def play(self): if self._state != 'stopped': raise WavePlayerError('already playing') self._state = 'started' self._channel = coros.queue() self._current_loop = 0 if self.initial_delay: reactor.callLater(self.initial_delay, self._channel.send, Command('play')) else: self._channel.send(Command('play')) self._run().wait() @run_in_waitable_green_thread def _run(self): notification_center = NotificationCenter() try: while True: command = self._channel.wait() if command.name == 'play': self._wave_file = WaveFile(self.mixer, self.filename) notification_center.add_observer(self, sender=self._wave_file, name='WaveFileDidFinishPlaying') self._wave_file.volume = self.volume try: self._wave_file.start() except SIPCoreError as e: notification_center.post_notification('WavePlayerDidFail', sender=self, data=NotificationData(error=e)) raise WavePlayerError(e) else: if self._current_loop == 0: notification_center.post_notification('WavePlayerDidStart', sender=self) notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=None, new_producer_slot=self._wave_file.slot)) elif command.name == 'reschedule': self._current_loop += 1 notification_center.remove_observer(self, sender=self._wave_file, name='WaveFileDidFinishPlaying') self._wave_file = None notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=None, new_producer_slot=None)) if self.loop_count == 0 or self._current_loop < self.loop_count: reactor.callLater(self.pause_time, self._channel.send, Command('play')) else: notification_center.post_notification('WavePlayerDidEnd', sender=self) break elif command.name == 'stop': if self._wave_file is not None: notification_center.remove_observer(self, sender=self._wave_file, name='WaveFileDidFinishPlaying') self._wave_file.stop() self._wave_file = None notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=None, new_producer_slot=None)) notification_center.post_notification('WavePlayerDidEnd', sender=self) break finally: self._channel = None self._state = 'stopped' @run_in_twisted_thread def handle_notification(self, notification): if self._channel is not None: self._channel.send(Command('reschedule')) @implementer(IAudioPort) class WaveRecorder(object): """ An object capable of recording to a WAV file. It can be used as part of an AudioBridge as it implements the IAudioPort interface. """ def __init__(self, mixer, filename): self.mixer = mixer self.filename = filename self._recording_wave_file = None @property def is_active(self): return bool(self._recording_wave_file and self._recording_wave_file.is_active) @property def consumer_slot(self): return self._recording_wave_file.slot if self._recording_wave_file else None @property def producer_slot(self): return None def start(self): # There is still a race condition here in that the directory can be removed # before the PJSIP opens the file. There's nothing that can be done about # it as long as PJSIP doesn't accept an already open file descriptor. -Luci makedirs(os.path.dirname(self.filename)) self._recording_wave_file = RecordingWaveFile(self.mixer, self.filename) self._recording_wave_file.start() notification_center = NotificationCenter() notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=False, old_consumer_slot=None, new_consumer_slot=self._recording_wave_file.slot)) def stop(self): old_slot = self.consumer_slot self._recording_wave_file.stop() self._recording_wave_file = None notification_center = NotificationCenter() notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=False, old_consumer_slot=old_slot, new_consumer_slot=None)) diff --git a/sipsimple/logging.py b/sipsimple/logging.py index 34a0529b..ca136680 100644 --- a/sipsimple/logging.py +++ b/sipsimple/logging.py @@ -1,7 +1,7 @@ -from .application import log +from application import log # Use a named logger for sipsimple logging log = log.get_logger('sipsimple') diff --git a/sipsimple/lookup.py b/sipsimple/lookup.py index 2333a525..aa56a3da 100644 --- a/sipsimple/lookup.py +++ b/sipsimple/lookup.py @@ -1,557 +1,557 @@ """ Implements DNS lookups in the context of SIP, STUN and MSRP relay based on RFC3263 and related standards. This can be used to determine the next hop(s) and failover for routing of SIP messages and reservation of network resources prior the starting of a SIP session. """ import re from itertools import chain from time import time from urllib.parse import urlparse # patch dns.entropy module which is not thread-safe import dns import sys from functools import partial from random import randint, randrange dns.entropy = dns.__class__('dns.entropy') dns.entropy.__file__ = dns.__file__.replace('__init__.py', 'entropy.py') dns.entropy.__builtins__ = dns.__builtins__ dns.entropy.random_16 = partial(randrange, 2**16) dns.entropy.between = randint sys.modules['dns.entropy'] = dns.entropy del partial, randint, randrange, sys # replace standard select and socket modules with versions from eventlib from eventlib import coros, proc from eventlib.green import select from eventlib.green import socket import dns.name import dns.resolver import dns.query dns.resolver.socket = socket dns.query.socket = socket dns.query.select = select dns.query._set_polling_backend(dns.query._select_for) -from .application.notification import IObserver, NotificationCenter, NotificationData -from .application.python import Null, limit -from .application.python.decorator import decorator, preserve_signature -from .application.python.types import Singleton +from application.notification import IObserver, NotificationCenter, NotificationData +from application.python import Null, limit +from application.python.decorator import decorator, preserve_signature +from application.python.types import Singleton from dns import exception, rdatatype from twisted.internet import reactor from zope.interface import implementer from sipsimple.core import Route from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, InterruptCommand, run_in_waitable_green_thread def domain_iterator(domain): """ A generator which returns the domain and its parent domains. """ while domain not in ('.', ''): yield domain domain = (domain.split('.', 1)+[''])[1] @decorator def post_dns_lookup_notifications(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): notification_center = NotificationCenter() try: result = func(obj, *args, **kwargs) except DNSLookupError as e: notification_center.post_notification('DNSLookupDidFail', sender=obj, data=NotificationData(error=str(e))) raise else: notification_center.post_notification('DNSLookupDidSucceed', sender=obj, data=NotificationData(result=result)) return result return wrapper class DNSLookupError(Exception): """ The error raised by DNSLookup when a lookup cannot be performed. """ class DNSCache(object): """ A simple DNS cache which uses twisted's timers to invalidate its expired data. """ def __init__(self): self.data = {} def get(self, key): return self.data.get(key, None) def put(self, key, value): expiration = value.expiration-time() if expiration > 0: self.data[key] = value reactor.callLater(limit(expiration, max=3600), self.data.pop, key, None) def flush(self, key=None): if key is not None: self.data.pop(key, None) else: self.data = {} class InternalResolver(dns.resolver.Resolver): def __init__(self, *args, **kw): super(InternalResolver, self).__init__(*args, **kw) if self.domain.to_text().endswith('local.'): self.domain = dns.name.root self.search = [item for item in self.search if not item.to_text().endswith('local.')] class DNSResolver(dns.resolver.Resolver): """ The resolver used by DNSLookup. The lifetime setting on it applies to all the queries made on this resolver. Each time a query is performed, its duration is subtracted from the lifetime value. """ def __init__(self): dns.resolver.Resolver.__init__(self, configure=False) dns_manager = DNSManager() self.search = dns_manager.search self.domain = dns_manager.domain self.nameservers = dns_manager.nameservers def query(self, *args, **kw): start_time = time() try: return dns.resolver.Resolver.query(self, *args, **kw) finally: self.lifetime -= min(self.lifetime, time()-start_time) class SRVResult(object): """ Internal object used to save the result of SRV queries. """ def __init__(self, priority, weight, port, address): self.priority = priority self.weight = weight self.port = port self.address = address class NAPTRResult(object): """ Internal object used to save the result of NAPTR queries. """ def __init__(self, service, order, preference, priority, weight, port, address): self.service = service self.order = order self.preference = preference self.priority = priority self.weight = weight self.port = port self.address = address class DNSLookup(object): cache = DNSCache() @run_in_waitable_green_thread @post_dns_lookup_notifications def lookup_service(self, uri, service, timeout=3.0, lifetime=15.0): """ Performs an SRV query to determine the servers used for the specified service from the domain in uri.host. If this fails and falling back is supported, also performs an A query on uri.host, returning the default port of the service along with the IP addresses in the answer. The services supported are `stun' and 'msrprelay'. The DNSLookupDidSucceed notification contains a result attribute which is a list of (address, port) tuples. The DNSLookupDidFail notification contains an error attribute describing the error encountered. """ service_srv_record_map = {"stun": ("_stun._udp", 3478, False), "msrprelay": ("_msrps._tcp", 2855, True)} log_context = dict(context='lookup_service', service=service, uri=uri) try: service_prefix, service_port, service_fallback = service_srv_record_map[service] except KeyError: raise DNSLookupError("Unknown service: %s" % service) try: # If the host part of the URI is an IP address, we will not do any lookup if re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", uri.host): return [(uri.host, uri.port or service_port)] resolver = DNSResolver() resolver.cache = self.cache resolver.timeout = timeout resolver.lifetime = lifetime record_name = '%s.%s' % (service_prefix, uri.host) services = self._lookup_srv_records(resolver, [record_name], log_context=log_context) if services[record_name]: return [(result.address, result.port) for result in services[record_name]] elif service_fallback: addresses = self._lookup_a_records(resolver, [uri.host], log_context=log_context) if addresses[uri.host]: return [(addr, service_port) for addr in addresses[uri.host]] except dns.resolver.Timeout: raise DNSLookupError('Timeout in lookup for %s servers for domain %s' % (service, uri.host)) else: raise DNSLookupError('No %s servers found for domain %s' % (service, uri.host)) @run_in_waitable_green_thread @post_dns_lookup_notifications def lookup_sip_proxy(self, uri, supported_transports, timeout=3.0, lifetime=15.0): """ Performs an RFC 3263 compliant lookup of transport/ip/port combinations for a particular SIP URI. As arguments it takes a SIPURI object and a list of supported transports, in order of preference of the application. It returns a list of Route objects that can be used in order of preference. The DNSLookupDidSucceed notification contains a result attribute which is a list of Route objects. The DNSLookupDidFail notification contains an error attribute describing the error encountered. """ naptr_service_transport_map = {"sips+d2t": "tls", "sip+d2t": "tcp", "sip+d2u": "udp"} transport_service_map = {"udp": "_sip._udp", "tcp": "_sip._tcp", "tls": "_sips._tcp"} log_context = dict(context='lookup_sip_proxy', uri=uri) if not supported_transports: raise DNSLookupError("No transports are supported") supported_transports = [transport.lower() for transport in supported_transports] unknown_transports = set(supported_transports).difference(transport_service_map) if unknown_transports: raise DNSLookupError("Unknown transports: %s" % ', '.join(unknown_transports)) try: # If the host part of the URI is an IP address, we will not do any lookup if re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", uri.host): transport = 'tls' if uri.secure else uri.transport.lower() if transport not in supported_transports: raise DNSLookupError("Transport %s dictated by URI is not supported" % transport) port = uri.port or (5061 if transport=='tls' else 5060) return [Route(address=uri.host, port=port, transport=transport)] resolver = DNSResolver() resolver.cache = self.cache resolver.timeout = timeout resolver.lifetime = lifetime # If the port is specified in the URI, we will only do an A lookup if uri.port: transport = 'tls' if uri.secure else uri.transport.lower() if transport not in supported_transports: raise DNSLookupError("Transport %s dictated by URI is not supported" % transport) addresses = self._lookup_a_records(resolver, [uri.host], log_context=log_context) if addresses[uri.host]: return [Route(address=addr, port=uri.port, transport=transport) for addr in addresses[uri.host]] # If the transport was already set as a parameter on the SIP URI, only do SRV lookups elif 'transport' in uri.parameters: transport = uri.parameters['transport'].lower() if transport not in supported_transports: raise DNSLookupError("Requested lookup for URI with %s transport, but it is not supported" % transport) if uri.secure and transport != 'tls': raise DNSLookupError("Requested lookup for SIPS URI, but with %s transport parameter" % transport) record_name = '%s.%s' % (transport_service_map[transport], uri.host) services = self._lookup_srv_records(resolver, [record_name], log_context=log_context) if services[record_name]: return [Route(address=result.address, port=result.port, transport=transport) for result in services[record_name]] else: # If SRV lookup fails, try A lookup addresses = self._lookup_a_records(resolver, [uri.host], log_context=log_context) port = 5061 if transport=='tls' else 5060 if addresses[uri.host]: return [Route(address=addr, port=port, transport=transport) for addr in addresses[uri.host]] # Otherwise, it means we don't have a numeric IP address, a port isn't specified and neither is a transport. So we have to do a full NAPTR lookup else: # If the URI is a SIPS URI, we only support the TLS transport. if uri.secure: if 'tls' not in supported_transports: raise DNSLookupError("Requested lookup for SIPS URI, but TLS transport is not supported") supported_transports = ['tls'] # First try NAPTR lookup naptr_services = [service for service, transport in list(naptr_service_transport_map.items()) if transport in supported_transports] try: pointers = self._lookup_naptr_record(resolver, uri.host, naptr_services, log_context=log_context) except dns.resolver.Timeout: pointers = [] if pointers: return [Route(address=result.address, port=result.port, transport=naptr_service_transport_map[result.service]) for result in pointers] else: # If that fails, try SRV lookup routes = [] for transport in supported_transports: record_name = '%s.%s' % (transport_service_map[transport], uri.host) try: services = self._lookup_srv_records(resolver, [record_name], log_context=log_context) except dns.resolver.Timeout: continue if services[record_name]: routes.extend(Route(address=result.address, port=result.port, transport=transport) for result in services[record_name]) if routes: return routes else: # If SRV lookup fails, try A lookup transport = 'tls' if uri.secure else 'udp' if transport in supported_transports: addresses = self._lookup_a_records(resolver, [uri.host], log_context=log_context) port = 5061 if transport=='tls' else 5060 if addresses[uri.host]: return [Route(address=addr, port=port, transport=transport) for addr in addresses[uri.host]] except dns.resolver.Timeout: raise DNSLookupError("Timeout in lookup for routes for SIP URI %s" % uri) else: raise DNSLookupError("No routes found for SIP URI %s" % uri) @run_in_waitable_green_thread @post_dns_lookup_notifications def lookup_xcap_server(self, uri, timeout=3.0, lifetime=15.0): """ Performs a TXT query against xcap. and returns all results that look like HTTP URIs. """ log_context = dict(context='lookup_xcap_server', uri=uri) notification_center = NotificationCenter() try: # If the host part of the URI is an IP address, we cannot not do any lookup if re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", uri.host): raise DNSLookupError("Cannot perform DNS query because the host is an IP address") resolver = DNSResolver() resolver.cache = self.cache resolver.timeout = timeout resolver.lifetime = lifetime record_name = 'xcap.%s' % uri.host results = [] try: answer = resolver.query(record_name, rdatatype.TXT) except dns.resolver.Timeout as e: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='TXT', query_name=str(record_name), nameservers=resolver.nameservers, answer=None, error=e, **log_context)) raise except exception.DNSException as e: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='TXT', query_name=str(record_name), nameservers=resolver.nameservers, answer=None, error=e, **log_context)) else: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='TXT', query_name=str(record_name), nameservers=resolver.nameservers, answer=answer, error=None, **log_context)) for result_uri in list(chain(*(r.strings for r in answer.rrset))): parsed_uri = urlparse(result_uri) if parsed_uri.scheme in ('http', 'https') and parsed_uri.netloc: results.append(result_uri) if not results: raise DNSLookupError('No XCAP servers found for domain %s' % uri.host) return results except dns.resolver.Timeout: raise DNSLookupError('Timeout in lookup for XCAP servers for domain %s' % uri.host) def _lookup_a_records(self, resolver, hostnames, additional_records=[], log_context={}): notification_center = NotificationCenter() additional_addresses = dict((rset.name.to_text(), rset) for rset in additional_records if rset.rdtype == rdatatype.A) addresses = {} for hostname in hostnames: if hostname in additional_addresses: addresses[hostname] = [r.address for r in additional_addresses[hostname]] else: try: answer = resolver.query(hostname, rdatatype.A) except dns.resolver.Timeout as e: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='A', query_name=str(hostname), nameservers=resolver.nameservers, answer=None, error=e, **log_context)) raise except exception.DNSException as e: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='A', query_name=str(hostname), nameservers=resolver.nameservers, answer=None, error=e, **log_context)) addresses[hostname] = [] else: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='A', query_name=str(hostname), nameservers=resolver.nameservers, answer=answer, error=None, **log_context)) addresses[hostname] = [r.address for r in answer.rrset] return addresses def _lookup_srv_records(self, resolver, srv_names, additional_records=[], log_context={}): notification_center = NotificationCenter() additional_services = dict((rset.name.to_text(), rset) for rset in additional_records if rset.rdtype == rdatatype.SRV) services = {} for srv_name in srv_names: services[srv_name] = [] if srv_name in additional_services: addresses = self._lookup_a_records(resolver, [r.target.to_text() for r in additional_services[srv_name]], additional_records) for record in additional_services[srv_name]: services[srv_name].extend(SRVResult(record.priority, record.weight, record.port, addr) for addr in addresses.get(record.target.to_text(), ())) else: try: answer = resolver.query(srv_name, rdatatype.SRV) except dns.resolver.Timeout as e: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='SRV', query_name=str(srv_name), nameservers=resolver.nameservers, answer=None, error=e, **log_context)) raise except exception.DNSException as e: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='SRV', query_name=str(srv_name), nameservers=resolver.nameservers, answer=None, error=e, **log_context)) else: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='SRV', query_name=str(srv_name), nameservers=resolver.nameservers, answer=answer, error=None, **log_context)) addresses = self._lookup_a_records(resolver, [r.target.to_text() for r in answer.rrset], answer.response.additional, log_context) for record in answer.rrset: services[srv_name].extend(SRVResult(record.priority, record.weight, record.port, addr) for addr in addresses.get(record.target.to_text(), ())) services[srv_name].sort(key=lambda result: (result.priority, -result.weight)) return services def _lookup_naptr_record(self, resolver, domain, services, log_context={}): notification_center = NotificationCenter() pointers = [] try: answer = resolver.query(domain, rdatatype.NAPTR) except dns.resolver.Timeout as e: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='NAPTR', query_name=str(domain), nameservers=resolver.nameservers, answer=None, error=e, **log_context)) raise except exception.DNSException as e: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='NAPTR', query_name=str(domain), nameservers=resolver.nameservers, answer=None, error=e, **log_context)) else: notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='NAPTR', query_name=str(domain), nameservers=resolver.nameservers, answer=answer, error=None, **log_context)) records = [r for r in answer.rrset if r.service.lower() in services] services = self._lookup_srv_records(resolver, [r.replacement.to_text() for r in records], answer.response.additional, log_context) for record in records: pointers.extend(NAPTRResult(record.service.lower(), record.order, record.preference, r.priority, r.weight, r.port, r.address) for r in services.get(record.replacement.to_text(), ())) pointers.sort(key=lambda result: (result.order, result.preference)) return pointers @implementer(IObserver) class DNSManager(object, metaclass=Singleton): def __init__(self): default_resolver = InternalResolver() self.search = default_resolver.search self.domain = default_resolver.domain self.nameservers = default_resolver.nameservers self.google_nameservers = ['8.8.8.8', '8.8.4.4'] self.probed_domain = 'sip2sip.info.' self._channel = coros.queue() self._proc = None self._timer = None self._wakeup_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, name='SystemIPAddressDidChange') notification_center.add_observer(self, name='SystemDidWakeUpFromSleep') @property def nameservers(self): return self.__dict__['nameservers'] @nameservers.setter def nameservers(self, value): old_value = self.__dict__.get('nameservers', Null) self.__dict__['nameservers'] = value if old_value is Null: NotificationCenter().post_notification('DNSResolverDidInitialize', sender=self, data=NotificationData(nameservers=value)) elif value != old_value: NotificationCenter().post_notification('DNSNameserversDidChange', sender=self, data=NotificationData(nameservers=value)) def start(self): self._proc = proc.spawn(self._run) self._channel.send(Command('probe_dns')) def stop(self): if self._proc is not None: self._proc.kill() self._proc = None if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None if self._wakeup_timer is not None and self._wakeup_timer.active(): self._wakeup_timer.cancel() self._wakeup_timer = None def _run(self): while True: try: command = self._channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) except InterruptCommand: pass def _CH_probe_dns(self, command): if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None resolver = InternalResolver() self.domain = resolver.domain self.search = resolver.search local_nameservers = resolver.nameservers # probe local resolver resolver.timeout = 1 resolver.lifetime = 3 try: answer = resolver.query(self.probed_domain, rdatatype.NAPTR) if not any(record.rdtype == rdatatype.NAPTR for record in answer.rrset): raise exception.DNSException("No NAPTR records found") answer = resolver.query("_sip._udp.%s" % self.probed_domain, rdatatype.SRV) if not any(record.rdtype == rdatatype.SRV for record in answer.rrset): raise exception.DNSException("No SRV records found") except (dns.resolver.Timeout, exception.DNSException): pass else: self.nameservers = resolver.nameservers return # local resolver failed. probe google resolver resolver.nameservers = self.google_nameservers resolver.timeout = 2 resolver.lifetime = 4 try: answer = resolver.query(self.probed_domain, rdatatype.NAPTR) if not any(record.rdtype == rdatatype.NAPTR for record in answer.rrset): raise exception.DNSException("No NAPTR records found") except (dns.resolver.Timeout, exception.DNSException): pass else: self.nameservers = resolver.nameservers return # google resolver failed. fallback to local resolver and schedule another probe for later self.nameservers = local_nameservers self._timer = reactor.callLater(15, self._channel.send, Command('probe_dns')) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SystemIPAddressDidChange(self, notification): self._proc.kill(InterruptCommand) self._channel.send(Command('probe_dns')) def _NH_SystemDidWakeUpFromSleep(self, notification): if self._wakeup_timer is None: def wakeup_action(): self._proc.kill(InterruptCommand) self._channel.send(Command('probe_dns')) self._wakeup_timer = None self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize diff --git a/sipsimple/session.py b/sipsimple/session.py index c2d178ad..c7fdad82 100644 --- a/sipsimple/session.py +++ b/sipsimple/session.py @@ -1,2730 +1,2730 @@ """ Implements an asynchronous notification based mechanism for establishment, modification and termination of sessions using Session Initiation Protocol (SIP) standardized in RFC3261. """ __all__ = ['Session', 'SessionManager'] import random -from .threading import RLock +from threading import RLock from time import time -from .application.notification import IObserver, Notification, NotificationCenter, NotificationData -from .application.python import Null, limit -from .application.python.decorator import decorator, preserve_signature -from .application.python.types import Singleton -from .application.system import host +from application.notification import IObserver, Notification, NotificationCenter, NotificationData +from application.python import Null, limit +from application.python.decorator import decorator, preserve_signature +from application.python.types import Singleton +from application.system import host from eventlib import api, coros, proc from twisted.internet import reactor from zope.interface import implementer from sipsimple import log from sipsimple.account import AccountManager, BonjourAccount from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import DialogID, Engine, Invitation, Referral, Subscription, PJSIPError, SIPCoreError, SIPCoreInvalidStateError, SIPURI, sip_status_messages, sipfrag_re from sipsimple.core import ContactHeader, FromHeader, Header, ReasonHeader, ReferToHeader, ReplacesHeader, RouteHeader, ToHeader, WarningHeader from sipsimple.core import SDPConnection, SDPMediaStream, SDPSession from sipsimple.core import PublicGRUU, PublicGRUUIfAvailable, NoGRUU from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import ParserError from sipsimple.payloads.conference import ConferenceDocument from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import ISOTimestamp class InvitationDisconnectedError(Exception): def __init__(self, invitation, data): self.invitation = invitation self.data = data class MediaStreamDidNotInitializeError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class MediaStreamDidFailError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class SubscriptionError(Exception): def __init__(self, error, timeout, **attributes): self.error = error self.timeout = timeout self.attributes = attributes class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class ReferralError(Exception): def __init__(self, error, code=0): self.error = error self.code = code class TerminateReferral(Exception): pass class SIPReferralDidFail(Exception): def __init__(self, data): self.data = data class IllegalStateError(RuntimeError): pass class IllegalDirectionError(RuntimeError): pass class SIPInvitationTransferDidFail(Exception): def __init__(self, data): self.data = data @decorator def transition_state(required_state, new_state): def state_transitioner(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): with obj._lock: if obj.state != required_state: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) obj.state = new_state return func(obj, *args, **kwargs) return wrapper return state_transitioner @decorator def check_state(required_states): def state_checker(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): if obj.state not in required_states: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) return func(obj, *args, **kwargs) return wrapper return state_checker @decorator def check_transfer_state(direction, state): def state_checker(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): if obj.transfer_handler.direction != direction: raise IllegalDirectionError('cannot transfer in %s direction' % obj.transfer_handler.direction) if obj.transfer_handler.state != state: raise IllegalStateError('cannot transfer in %s state' % obj.transfer_handler.state) return func(obj, *args, **kwargs) return wrapper return state_checker class AddParticipantOperation(object): pass class RemoveParticipantOperation(object): pass @implementer(IObserver) class ReferralHandler(object): def __init__(self, session, participant_uri, operation): self.participant_uri = participant_uri if not isinstance(self.participant_uri, SIPURI): if not self.participant_uri.startswith(('sip:', 'sips:')): self.participant_uri = 'sip:%s' % self.participant_uri try: self.participant_uri = SIPURI.parse(self.participant_uri) except SIPCoreError: notification_center = NotificationCenter() if operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI')) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI')) return self.session = session self.operation = operation self.active = False self.route = None self._channel = coros.queue() self._referral = None def start(self): notification_center = NotificationCenter() if not self.session.remote_focus: if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus')) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus')) self.session = None return notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') proc.spawn(self._run) def _run(self): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account is BonjourAccount(): uri = SIPURI.new(self.session._invitation.remote_contact_header.uri) elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: timeout = random.uniform(15, 30) raise ReferralError(error='DNS lookup failed: %s' % e) target_uri = SIPURI.new(self.session.remote_identity.uri) timeout = time() + 30 for route in routes: self.route = route remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[NoGRUU, route] except KeyError: continue refer_to_header = ReferToHeader(str(self.participant_uri)) refer_to_header.parameters['method'] = 'INVITE' if self.operation is AddParticipantOperation else 'BYE' referral = Referral(target_uri, FromHeader(account.uri, account.display_name), ToHeader(target_uri), refer_to_header, ContactHeader(contact_uri), RouteHeader(route.uri), account.credentials) notification_center.add_observer(self, sender=referral) try: referral.send_refer(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=referral) timeout = 5 raise ReferralError(error='Internal error') self._referral = referral try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidStart': break except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=referral) self._referral = None if e.data.code in (403, 405): raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code) else: # Otherwise just try the next route continue else: break else: self.route = None raise ReferralError(error='No more routes to try') # At this point it is subscribed. Handle notifications and ending/failures. try: self.active = True while True: notification = self._channel.wait() if notification.name == 'SIPReferralGotNotify': if notification.data.event == 'refer' and notification.data.body: match = sipfrag_re.match(notification.data.body) if match: code = int(match.group('code')) reason = match.group('reason') if code/100 > 2: continue if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceGotAddParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason)) else: notification_center.post_notification('SIPConferenceGotRemoveParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason)) elif notification.name == 'SIPReferralDidEnd': break except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=self._referral) raise ReferralError(error=e.data.reason, code=e.data.code) else: notification_center.remove_observer(self, sender=self._referral) if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri)) else: notification_center.post_notification('SIPConferenceDidRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri)) finally: self.active = False except TerminateReferral: if self._referral is not None: try: self._referral.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidEnd': break except SIPReferralDidFail: pass finally: notification_center.remove_observer(self, sender=self._referral) if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error')) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error')) except ReferralError as e: if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error)) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error)) finally: notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self.session = None self._referral = None def _refresh(self): try: contact_header = ContactHeader(self.session.account.contact[NoGRUU, self.route]) except KeyError: pass else: try: self._referral.refresh(contact_header=contact_header, timeout=2) except (SIPCoreError, SIPCoreInvalidStateError): pass @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPReferralDidStart(self, notification): self._channel.send(notification) def _NH_SIPReferralDidEnd(self, notification): self._channel.send(notification) def _NH_SIPReferralDidFail(self, notification): self._channel.send_exception(SIPReferralDidFail(notification.data)) def _NH_SIPReferralGotNotify(self, notification): self._channel.send(notification) def _NH_SIPSessionDidFail(self, notification): self._channel.send_exception(TerminateReferral()) def _NH_SIPSessionWillEnd(self, notification): self._channel.send_exception(TerminateReferral()) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._refresh() @implementer(IObserver) class ConferenceHandler(object): def __init__(self, session): self.session = session self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) @run_in_green_thread def add_participant(self, participant_uri): referral_handler = ReferralHandler(self.session, participant_uri, AddParticipantOperation) referral_handler.start() @run_in_green_thread def remove_participant(self, participant_uri): referral_handler = ReferralHandler(self.session, participant_uri, RemoveParticipantOperation) referral_handler.start() def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _activate(self): self.active = True command = Command('subscribe') self._command_channel.send(command) return command def _deactivate(self): self.active = False command = Command('unsubscribe') self._command_channel.send(command) return command def _resubscribe(self): command = Command('subscribe') self._command_channel.send(command) return command def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._deactivate() command = Command('terminate') self._command_channel.send(command) command.wait() self.session = None def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): command.signal() raise proc.ProcExit() def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account is BonjourAccount(): uri = SIPURI.new(self.session._invitation.remote_contact_header.uri) elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) target_uri = SIPURI.new(self.session.remote_identity.uri) default_interval = 600 if account is BonjourAccount() else account.sip.subscribe_interval refresh_interval = getattr(command, 'refresh_interval', default_interval) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[NoGRUU, route] except KeyError: continue subscription = Subscription(target_uri, FromHeader(account.uri, account.display_name), ToHeader(target_uri), ContactHeader(contact_uri), 'conference', RouteHeader(route.uri), credentials=account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) timeout = 5 raise SubscriptionError(error='Internal error', timeout=timeout) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail as e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time timeout = random.uniform(60, 120) raise SubscriptionError(error='Authentication failed', timeout=timeout) elif e.data.code == 423: # Get the value of the Min-Expires header timeout = random.uniform(60, 120) if e.data.min_expires is not None and e.data.min_expires > refresh_interval: raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires) else: raise SubscriptionError(error='Interval too short', timeout=timeout) elif e.data.code in (405, 406, 489, 1400): command.signal(e) return else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription timeout = random.uniform(60, 180) raise SubscriptionError(error='No more routes to try', timeout=timeout) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'conference' and notification.data.body: try: conference_info = ConferenceDocument.parse(notification.data.body) except ParserError: pass else: notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info)) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) except InterruptSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) except SubscriptionError as e: if 'min_expires' in e.attributes: command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires']) else: command = Command('subscribe', command.event) self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_SIPSessionDidStart(self, notification): if self.session.remote_focus: self._activate() @run_in_green_thread def _NH_SIPSessionDidFail(self, notification): self._terminate() @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): self._terminate() def _NH_SIPSessionDidRenegotiateStreams(self, notification): if self.session.remote_focus and not self.active: self._activate() elif not self.session.remote_focus and self.active: self._deactivate() def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._resubscribe() class TransferInfo(object): def __init__(self, referred_by=None, replaced_dialog_id=None): self.referred_by = referred_by self.replaced_dialog_id = replaced_dialog_id @implementer(IObserver) class TransferHandler(object): def __init__(self, session): self.state = None self.direction = None self.new_session = None self.session = session notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.session._invitation) self._command_channel = coros.queue() self._data_channel = coros.queue() self._proc = proc.spawn(self._run) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) self.direction = None self.state = None def _CH_incoming_transfer(self, command): self.direction = 'incoming' notification_center = NotificationCenter() refer_to_hdr = command.data.headers.get('Refer-To') target = SIPURI.parse(refer_to_hdr.uri) referred_by_hdr = command.data.headers.get('Referred-By', None) if referred_by_hdr is not None: origin = referred_by_hdr.body else: origin = str(self.session.remote_identity.uri) try: while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail: self.state = 'failed' return else: if notification.name == 'SIPInvitationTransferDidStart': self.state = 'starting' refer_to_uri = SIPURI.new(target) refer_to_uri.headers = {} refer_to_uri.parameters = {} notification_center.post_notification('SIPSessionTransferNewIncoming', self.session, NotificationData(transfer_destination=refer_to_uri)) elif notification.name == 'SIPSessionTransferDidStart': break elif notification.name == 'SIPSessionTransferDidFail': self.state = 'failed' try: self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason) except SIPCoreError: return while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail: return self.state = 'started' transfer_info = TransferInfo(referred_by=origin) try: replaces_hdr = target.headers.pop('Replaces') call_id, rest = replaces_hdr.split(';', 1) params = dict((item.split('=') for item in rest.split(';'))) to_tag = params.get('to-tag') from_tag = params.get('from-tag') except (KeyError, ValueError): pass else: transfer_info.replaced_dialog_id = DialogID(call_id, local_tag=from_tag, remote_tag=to_tag) settings = SIPSimpleSettings() account = self.session.account if account is BonjourAccount(): uri = target elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = target lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: self.state = 'failed' notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=0, reason="DNS lookup failed: {}".format(e))) try: self.session._invitation.notify_transfer_progress(480) except SIPCoreError: return while True: try: self._data_channel.wait() except SIPInvitationTransferDidFail: return return self.new_session = Session(account) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.new_session) self.new_session.connect(ToHeader(target), routes=routes, streams=[MediaStreamRegistry.AudioStream()], transfer_info=transfer_info) while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail: return if notification.name == 'SIPInvitationTransferDidEnd': return except proc.ProcExit: if self.new_session is not None: notification_center.remove_observer(self, sender=self.new_session) self.new_session = None raise def _CH_outgoing_transfer(self, command): self.direction = 'outgoing' notification_center = NotificationCenter() self.state = 'starting' while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail as e: self.state = 'failed' notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=e.data.code, reason=e.data.reason)) return if notification.name == 'SIPInvitationTransferDidStart': self.state = 'started' notification_center.post_notification('SIPSessionTransferDidStart', sender=self.session) elif notification.name == 'SIPInvitationTransferDidEnd': self.state = 'ended' self.session.end() notification_center.post_notification('SIPSessionTransferDidEnd', sender=self.session) return def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session._invitation) notification_center.remove_observer(self, sender=self.session) self._proc.kill() self._proc = None self._command_channel = None self._data_channel = None self.session = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPInvitationTransferNewIncoming(self, notification): self._command_channel.send(Command('incoming_transfer', data=notification.data)) def _NH_SIPInvitationTransferNewOutgoing(self, notification): self._command_channel.send(Command('outgoing_transfer', data=notification.data)) def _NH_SIPInvitationTransferDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPInvitationTransferDidFail(self, notification): self._data_channel.send_exception(SIPInvitationTransferDidFail(notification.data)) def _NH_SIPInvitationTransferDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPInvitationTransferGotNotify(self, notification): if notification.data.event == 'refer' and notification.data.body: match = sipfrag_re.match(notification.data.body) if match: code = int(match.group('code')) reason = match.group('reason') notification.center.post_notification('SIPSessionTransferGotProgress', sender=self.session, data=NotificationData(code=code, reason=reason)) def _NH_SIPSessionTransferDidStart(self, notification): if notification.sender is self.session and self.state == 'starting': self._data_channel.send(notification) def _NH_SIPSessionTransferDidFail(self, notification): if notification.sender is self.session and self.state == 'starting': self._data_channel.send(notification) def _NH_SIPSessionGotRingIndication(self, notification): if notification.sender is self.new_session and self.session is not None: try: self.session._invitation.notify_transfer_progress(180) except SIPCoreError: pass def _NH_SIPSessionGotProvisionalResponse(self, notification): if notification.sender is self.new_session and self.session is not None: try: self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason) except SIPCoreError: pass def _NH_SIPSessionDidStart(self, notification): if notification.sender is self.new_session: notification.center.remove_observer(self, sender=notification.sender) self.new_session = None if self.session is not None: notification.center.post_notification('SIPSessionTransferDidEnd', sender=self.session) if self.state == 'started': try: self.session._invitation.notify_transfer_progress(200) except SIPCoreError: pass self.state = 'ended' self.session.end() def _NH_SIPSessionDidEnd(self, notification): if notification.sender is self.new_session: # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead notification.center.remove_observer(self, sender=notification.sender) self.new_session = None if self.session is not None: notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=500, reason='internal error')) if self.state == 'started': try: self.session._invitation.notify_transfer_progress(500) except SIPCoreError: pass self.state = 'failed' else: self._terminate() def _NH_SIPSessionDidFail(self, notification): if notification.sender is self.new_session: notification.center.remove_observer(self, sender=notification.sender) self.new_session = None if self.session is not None: notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=notification.data.code or 500, reason=notification.data.reason)) if self.state == 'started': try: self.session._invitation.notify_transfer_progress(notification.data.code or 500, notification.data.reason) except SIPCoreError: pass self.state = 'failed' else: self._terminate() class OptionalTag(str): def __eq__(self, other): return other is None or super(OptionalTag, self).__eq__(other) def __ne__(self, other): return not self == other def __repr__(self): return '{}({})'.format(self.__class__.__name__, super(OptionalTag, self).__repr__()) @implementer(IObserver) class SessionReplaceHandler(object): def __init__(self, session): self.session = session def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.session.replaced_session) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): notification.center.remove_observer(self, sender=self.session) notification.center.remove_observer(self, sender=self.session.replaced_session) self.session.replaced_session.end() self.session.replaced_session = None self.session = None def _NH_SIPSessionDidFail(self, notification): if notification.sender is self.session: notification.center.remove_observer(self, sender=self.session) notification.center.remove_observer(self, sender=self.session.replaced_session) self.session.replaced_session = None self.session = None _NH_SIPSessionDidEnd = _NH_SIPSessionDidFail @implementer(IObserver) class Session(object): media_stream_timeout = 15 short_reinvite_timeout = 5 def __init__(self, account): self.account = account self.direction = None self.end_time = None self.on_hold = False self.proposed_streams = None self.route = None self.state = None self.start_time = None self.streams = None self.transport = None self.local_focus = False self.remote_focus = False self.greenlet = None self.conference = None self.replaced_session = None self.transfer_handler = None self.transfer_info = None self._channel = coros.queue() self._hold_in_progress = False self._invitation = None self._local_identity = None self._remote_identity = None self._lock = RLock() def init_incoming(self, invitation, data): notification_center = NotificationCenter() remote_sdp = invitation.sdp.proposed_remote self.proposed_streams = [] if remote_sdp: for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, remote_sdp, index) except UnknownStreamError: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break self.direction = 'incoming' self.state = 'incoming' self.transport = invitation.transport self._invitation = invitation self.conference = ConferenceHandler(self) self.transfer_handler = TransferHandler(self) if 'isfocus' in invitation.remote_contact_header.parameters: self.remote_focus = True if 'Referred-By' in data.headers or 'Replaces' in data.headers: self.transfer_info = TransferInfo() if 'Referred-By' in data.headers: self.transfer_info.referred_by = data.headers['Referred-By'].body if 'Replaces' in data.headers: replaces_header = data.headers.get('Replaces') # Because we only allow the remote tag to be optional, it can only match established dialogs and early outgoing dialogs, but not early incoming dialogs, # which according to RFC3891 should be rejected with 481 (which will happen automatically by never matching them). if replaces_header.early_only or replaces_header.from_tag == '0': replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=OptionalTag(replaces_header.from_tag)) else: replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=replaces_header.from_tag) session_manager = SessionManager() try: replaced_session = next(session for session in session_manager.sessions if session.dialog_id == replaced_dialog_id) except StopIteration: invitation.send_response(481) return else: # Any matched dialog at this point is either established, terminated or early outgoing. if replaced_session.state in ('terminating', 'terminated'): invitation.send_response(603) return elif replaced_session.dialog_id.remote_tag is not None and replaces_header.early_only: # The replaced dialog is established, but the early-only flag is set invitation.send_response(486) return self.replaced_session = replaced_session self.transfer_info.replaced_dialog_id = replaced_dialog_id replace_handler = SessionReplaceHandler(self) replace_handler.start() notification_center.add_observer(self, sender=invitation) notification_center.post_notification('SIPSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams[:], headers=data.headers)) @transition_state(None, 'connecting') @run_in_green_thread def connect(self, to_header, routes, streams, is_focus=False, transfer_info=None, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False received_code = 0 received_reason = None unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') self.direction = 'outgoing' self.proposed_streams = streams self.route = routes[0] self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() self._local_identity = FromHeader(self.account.uri, self.account.display_name) self._remote_identity = to_header self.conference = ConferenceHandler(self) self.transfer_handler = TransferHandler(self) self.transfer_info = transfer_info notification_center.add_observer(self, sender=self._invitation) notification_center.post_notification('SIPSessionNewOutgoing', sender=self, data=NotificationData(streams=streams[:])) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 try: contact_uri = self.account.contact[PublicGRUUIfAvailable, self.route] local_ip = host.outgoing_ip_for(self.route.address) if local_ip is None: raise ValueError("could not get outgoing IP address") except (KeyError, ValueError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e)) return connection = SDPConnection(local_ip) local_sdp = SDPSession(local_ip, name=settings.user_agent) for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection local_sdp.media.append(media) from_header = FromHeader(self.account.uri, self.account.display_name) route_header = RouteHeader(self.route.uri) contact_header = ContactHeader(contact_uri) if is_focus: contact_header.parameters['isfocus'] = None if self.transfer_info is not None: if self.transfer_info.referred_by is not None: extra_headers.append(Header('Referred-By', self.transfer_info.referred_by)) if self.transfer_info.replaced_dialog_id is not None: dialog_id = self.transfer_info.replaced_dialog_id extra_headers.append(ReplacesHeader(dialog_id.call_id, dialog_id.local_tag, dialog_id.remote_tag)) self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, self.account.credentials, extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.end() return notification_center.post_notification('SIPSessionWillStart', sender=self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: # TODO: check if port is also 0 in local_sdp. In that case PJSIP disabled the stream because # negotiation failed. If there are more streams, however, the negotiation is considered successful as a # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind io # OK, but we cannot really start the stream. -Saul stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) for notification in invitation_notifications: self._channel.send(notification) while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed-out while starting' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=0, reason=None, error=error) except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator)) if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200])) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason)) code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: # TODO: we should know *exactly* when there are set -Saul code = getattr(e.data, 'code', 0) reason = getattr(e.data, 'reason', 'Session disconnected') if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in self.streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() def _reinvite_after_ice(self): # This function does not do any error checking, it's designed to be called at the end of connect and add_stream self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for index, stream in enumerate(self.streams): local_sdp.media[index] = stream.get_local_media(remote_sdp=None, index=index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False try: with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for index, stream in enumerate(self.streams): stream.update(local_sdp, remote_sdp, index) else: return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'disconnected': self.end() return except Exception: pass finally: self.state = 'connected' self.greenlet = None @check_state(['incoming', 'received_proposal']) @run_in_green_thread def send_ring_indication(self): try: self._invitation.send_response(180) except SIPCoreInvalidStateError: pass # The INVITE session might have already been cancelled; ignore the error @transition_state('incoming', 'accepting') @run_in_green_thread def accept(self, streams, is_focus=False, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() self.local_focus = is_focus connected = False unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') if self.proposed_streams: for stream in self.proposed_streams: if stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') else: for index, stream in enumerate(streams): notification_center.add_observer(self, sender=stream) stream.index = index stream.initialize(self, direction='outgoing') self.proposed_streams = streams wait_count = len(self.proposed_streams) try: while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 remote_sdp = self._invitation.sdp.proposed_remote sdp_connection = remote_sdp.connection or next((media.connection for media in remote_sdp.media if media.connection is not None)) local_ip = host.outgoing_ip_for(sdp_connection.address) if sdp_connection.address != '0.0.0.0' else sdp_connection.address if local_ip is None: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address') return connection = SDPConnection(local_ip) local_sdp = SDPSession(local_ip, name=settings.user_agent) if remote_sdp: stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection else: media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) else: for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection local_sdp.media.append(media) contact_header = ContactHeader.new(self._invitation.local_contact_header) try: local_contact_uri = self.account.contact[PublicGRUU, self._invitation.transport] except KeyError: pass else: contact_header.uri = local_contact_uri if is_focus: contact_header.parameters['isfocus'] = None self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp, extra_headers=extra_headers) notification_center.post_notification('SIPSessionWillStart', sender=self) # Local and remote SDPs will be set after the 200 OK is sent while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: if not connected: # we could not have got a SIPInvitationGotSDPUpdate if we did not get an ACK connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True)) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True)) elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) wait_count = 0 stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map.get(index, None) if stream is not None: if remote_media.port: wait_count += 1 stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): while wait_count > 0 or not connected or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True)) elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: if self._invitation.state == 'connecting': ack_received = False if isinstance(e, api.TimeoutError) and wait_count == 0 else 'unknown' # pjsip's invite session object does not inform us whether the ACK was received or not notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=ack_received)) elif self._invitation.state == 'connected' and not connected: # we didn't yet get to process the SIPInvitationChangedState (state -> connected) notification notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True)) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() reason_header = None if isinstance(e, api.TimeoutError): if wait_count > 0: error = 'media stream timed-out while starting' else: error = 'No ACK received' reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'Missing ACK' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream did not initialize' else: error = 'media stream failed: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream failed to start' self.start_time = ISOTimestamp.now() if self._invitation.state in ('incoming', 'early'): self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header) else: self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header) except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' if e.data.prev_state in ('incoming', 'early'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown')) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None)) elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=False)) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote')) notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=getattr(e.data, 'method', 'INVITE'), code=200, reason='OK')) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason)) self.greenlet = None except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown')) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None @transition_state('incoming', 'terminating') @run_in_green_thread def reject(self, code=603, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'disconnected': ack_received = notification.data.disconnect_reason != 'missing ACK' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=ack_received)) break except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party self.greenlet = None except SIPCoreError as e: self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) except api.TimeoutError: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=False)) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None)) else: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None)) finally: self.greenlet = None @transition_state('received_proposal', 'accepting_proposal') @run_in_green_thread def accept_proposal(self, streams): self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] streams = [stream for stream in streams if stream in self.proposed_streams] for stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') try: wait_count = len(streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 remote_sdp = self._invitation.sdp.proposed_remote connection = SDPConnection(local_sdp.address) stream_map = dict((stream.index, stream) for stream in streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection if index < len(local_sdp.media): local_sdp.media[index] = media else: local_sdp.media.append(media) elif index >= len(local_sdp.media): # actually == is sufficient media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) self._invitation.send_response(200, sdp=local_sdp) prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) received_invitation_state = False received_sdp_update = False while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown')) received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) for stream in streams: # TODO: check if port is 0 in local_sdp. In that case PJSIP disabled the stream because # negotiation failed. If there are more streams, however, the negotiation is considered successful as a # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind of # OK, but we cannot really start the stream. -Saul stream.start(local_sdp, remote_sdp, stream.index) with api.timeout(self.media_stream_timeout): wait_count = len(streams) while wait_count > 0 or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 else: unhandled_notifications.append(notification) except api.TimeoutError: self._fail_proposal(originator='remote', error='media stream timed-out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='remote', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='remote', error='media stream failed: {.data.reason}'.format(e)) except InvitationDisconnectedError as e: self._fail_proposal(originator='remote', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: proposed_streams = self.proposed_streams self.proposed_streams = None self.streams = self.streams + streams self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None @transition_state('received_proposal', 'rejecting_proposal') @run_in_green_thread def reject_proposal(self, code=488, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1, None): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received='unknown')) break except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: proposed_streams = self.proposed_streams self.proposed_streams = None self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None def add_stream(self, stream): self.add_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def add_streams(self, streams): streams = list(set(streams).difference(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() unhandled_notifications = [] self.proposed_streams = streams for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': # This is actually the only reason for which this notification could be received if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self._fail_proposal(originator='local', error='received stream proposal') self.handle_notification(notification) return local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.proposed_streams: # Try to reuse a disabled media stream to avoid an ever-growing SDP try: index = next(index for index, media in enumerate(local_sdp.media) if media.port == 0) reuse_media = True except StopIteration: index = len(local_sdp.media) reuse_media = False stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if reuse_media: local_sdp.media[index] = media else: local_sdp.media.append(media) self._invitation.send_reinvite(sdp=local_sdp) notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:])) received_invitation_state = False received_sdp_update = False try: with api.timeout(settings.sip.invite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) else: self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if notification.data.code >= 300: proposed_streams = self.proposed_streams for stream in proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) return elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.cancel_proposal() return accepted_streams = [] for stream in self.proposed_streams: try: remote_media = remote_sdp.media[stream.index] except IndexError: self._fail_proposal(originator='local', error='SDP media missing in answer') return else: if remote_media.port: stream.start(local_sdp, remote_sdp, stream.index) accepted_streams.append(stream) else: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): wait_count = len(accepted_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 except api.TimeoutError: self._fail_proposal(originator='local', error='media stream timed-out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='local', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='local', error='media stream failed: {.data.reason}'.format(e)) except InvitationDisconnectedError as e: self._fail_proposal(originator='local', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError as e: self._fail_proposal(originator='local', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams += accepted_streams proposed_streams = self.proposed_streams self.proposed_streams = None any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in accepted_streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=accepted_streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=accepted_streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None def remove_stream(self, stream): self.remove_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def remove_streams(self, streams): streams = list(set(streams).intersection(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() self.streams.remove(stream) media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if not (200 <= notification.data.code < 300): break elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: for stream in streams: stream.end() self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except (api.TimeoutError, MediaStreamDidFailError, SIPCoreError): for stream in streams: stream.end() self.end() else: for stream in streams: stream.end() self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=streams)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None @transition_state('sending_proposal', 'cancelling_proposal') @run_in_green_thread def cancel_proposal(self): if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if notification.data.code == 487: proposed_streams = self.proposed_streams or [] for stream in proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) elif notification.data.code == 200: self.end() elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError as e: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) proposed_streams = self.proposed_streams or [] for stream in proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=proposed_streams)) except InvitationDisconnectedError as e: for stream in self.proposed_streams or []: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) else: for stream in self.proposed_streams or []: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None self.state = 'connected' finally: self.greenlet = None if self._hold_in_progress: self._send_hold() @run_in_green_thread def hold(self): if self.on_hold or self._hold_in_progress: return self._hold_in_progress = True streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.hold() if self.state == 'connected': self._send_hold() @run_in_green_thread def unhold(self): if not self.on_hold and not self._hold_in_progress: return self._hold_in_progress = False streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.unhold() if self.state == 'connected': self._send_unhold() @run_in_green_thread def end(self): if self.state in (None, 'terminating', 'terminated'): return if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = None notification_center = NotificationCenter() if self._invitation is None: # The invitation was not yet constructed self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) return elif self._invitation.state is None: # The invitation was built but never sent streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() stream.end() self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) return invitation_state = self._invitation.state if invitation_state in ('disconnecting', 'disconnected'): return self.greenlet = api.getcurrent() self.state = 'terminating' if invitation_state == 'connected': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local')) streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() cancelling = invitation_state != 'connected' and self.direction == 'outgoing' try: self._invitation.end(timeout=1) while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': if notification.data.disconnect_reason in ('internal error', 'missing ACK'): pass elif notification.data.disconnect_reason == 'timeout': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local' if self.direction=='outgoing' else 'remote', method='INVITE', code=408, reason='Timeout')) elif cancelling: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif hasattr(notification.data, 'method'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=notification.data.method, code=200, reason=sip_status_messages[200])) elif notification.data.disconnect_reason == 'user request': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='BYE', code=notification.data.code, reason=notification.data.reason)) break except SIPCoreError as e: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e))) except InvitationDisconnectedError as e: # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state == 'connected': if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200])) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200])) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=code, reason=reason)) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) else: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request')) finally: for stream in streams: stream.end() notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' @check_state(['connected']) @check_transfer_state(None, None) @run_in_twisted_thread def transfer(self, target_uri, replaced_session=None): notification_center = NotificationCenter() notification_center.post_notification('SIPSessionTransferNewOutgoing', self, NotificationData(transfer_destination=target_uri)) try: self._invitation.transfer(target_uri, replaced_session.dialog_id if replaced_session is not None else None) except SIPCoreError as e: notification_center.post_notification('SIPSessionTransferDidFail', sender=self, data=NotificationData(code=500, reason=str(e))) @check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal']) @check_transfer_state('incoming', 'starting') def accept_transfer(self): notification_center = NotificationCenter() notification_center.post_notification('SIPSessionTransferDidStart', sender=self) @check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal']) @check_transfer_state('incoming', 'starting') def reject_transfer(self, code=603, reason=None): notification_center = NotificationCenter() notification_center.post_notification('SIPSessionTransferDidFail', self, NotificationData(code=code, reason=reason or sip_status_messages[code])) @property def dialog_id(self): return self._invitation.dialog_id if self._invitation is not None else None @property def local_identity(self): if self._invitation is not None and self._invitation.local_identity is not None: return self._invitation.local_identity else: return self._local_identity @property def peer_address(self): return self._invitation.peer_address if self._invitation is not None else None @property def remote_identity(self): if self._invitation is not None and self._invitation.remote_identity is not None: return self._invitation.remote_identity else: return self._remote_identity @property def remote_user_agent(self): return self._invitation.remote_user_agent if self._invitation is not None else None def _cancel_hold(self): notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if notification.data.code == 200: self.end() return False elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError as e: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return False return True def _send_hold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = True self.state = 'connected' hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams))) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._hold_in_progress = False else: for stream in self.streams: stream.unhold() self._send_unhold() def _send_unhold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = False self.state = 'connected' notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: for stream in self.streams: stream.hold() self._send_hold() def _fail(self, originator, code, reason, error, reason_header=None): notification_center = NotificationCenter() prev_inv_state = self._invitation.state self.state = 'terminating' if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'): notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator)) if self._invitation.state not in (None, 'disconnecting', 'disconnected'): try: if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'): if 400<=code<=699 and reason is not None: self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else []) else: self._invitation.end(extra_headers=[reason_header] if reason_header is not None else []) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': if prev_inv_state in ('connecting', 'connected'): if notification.data.disconnect_reason in ('timeout', 'missing ACK'): sip_code = 200 sip_reason = 'OK' originator = 'local' elif hasattr(notification.data, 'method'): sip_code = 200 sip_reason = 'OK' originator = 'remote' else: sip_code = notification.data.code sip_reason = notification.data.reason originator = 'local' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=originator, method='BYE', code=sip_code, reason=sip_reason)) elif self._invitation.direction == 'incoming' and prev_inv_state in ('incoming', 'early'): ack_received = notification.data.disconnect_reason != 'missing ACK' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=reason, ack_received=ack_received)) elif self._invitation.direction == 'outgoing' and prev_inv_state in ('outgoing', 'early'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=487, reason='Session Cancelled')) break except SIPCoreError: pass except api.TimeoutError: if prev_inv_state in ('connecting', 'connected'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='BYE', code=408, reason=sip_status_messages[408])) notification_center.remove_observer(self, sender=self._invitation) self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None)) self.greenlet = None def _fail_proposal(self, originator, error): notification_center = NotificationCenter() for stream in self.proposed_streams: try: notification_center.remove_observer(self, sender=stream) except KeyError: # _fail_proposal can be called from reject_proposal, which means the stream will # not have been initialized or the session registered as an observer for it. pass else: stream.deactivate() stream.end() if originator == 'remote' and self._invitation.sub_state == 'received_proposal': try: self._invitation.send_response(488 if self.proposed_streams else 500) except SIPCoreError: pass else: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=500, reason=sip_status_messages[500], ack_received='unknown')) notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:])) self.state = 'connected' self.proposed_streams = None self.greenlet = None @run_in_green_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPInvitationChangedState(self, notification): if self.state == 'terminated': return if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'): contact_header = notification.data.headers.get('Contact', None) if contact_header and 'isfocus' in contact_header[0].parameters: self.remote_focus = True if self.greenlet is not None: if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting': self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data)) else: self._channel.send(notification) else: self.greenlet = api.getcurrent() unhandled_notifications = [] try: if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self.state = 'received_proposal' try: proposed_remote_sdp = self._invitation.sdp.proposed_remote active_remote_sdp = self._invitation.sdp.active_remote if len(proposed_remote_sdp.media) < len(active_remote_sdp.media): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Streams cannot be deleted from the SDP')]) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return for stream in self.streams: if not stream.validate_update(proposed_remote_sdp, stream.index): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)]) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return added_media_indexes = set() removed_media_indexes = set() reused_media_indexes = set() for index, media_stream in enumerate(proposed_remote_sdp.media): if index >= len(active_remote_sdp.media): added_media_indexes.add(index) elif media_stream.port == 0 and active_remote_sdp.media[index].port > 0: removed_media_indexes.add(index) elif media_stream.port > 0 and active_remote_sdp.media[index].port == 0: reused_media_indexes.add(index) elif media_stream.media != active_remote_sdp.media[index].media: added_media_indexes.add(index) removed_media_indexes.add(index) if added_media_indexes | reused_media_indexes and removed_media_indexes: engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')]) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return elif added_media_indexes | reused_media_indexes: self.proposed_streams = [] for index in added_media_indexes | reused_media_indexes: media_stream = proposed_remote_sdp.media[index] if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index) except UnknownStreamError: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self._invitation.send_response(100) notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:])) else: self._invitation.send_response(488) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return else: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes] prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) for stream in removed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=proposed_remote_sdp, index=stream.index) try: self._invitation.send_response(200, sdp=local_sdp) except PJSIPError: for stream in removed_streams: self.streams.remove(stream) stream.end() if removed_streams: self.end() return else: try: self._invitation.send_response(488) except PJSIPError: self.end() return else: for stream in removed_streams: self.streams.remove(stream) stream.end() notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown')) received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) if removed_media_indexes: notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams)) except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: self.end() else: self.state = 'connected' elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request': self.state = 'received_proposal_request' try: # An empty proposal was received, generate an offer self._invitation.send_response(100) local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip) if local_sdp.connection is not None: local_sdp.connection.address = connection_address for index, stream in enumerate(self.streams): stream.reset(index) media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is not None: media.connection.address = connection_address local_sdp.media[stream.index] = media self._invitation.send_response(200, sdp=local_sdp) notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown')) received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: raise # FIXME else: self.state = 'connected' elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal': if notification.data.originator == 'local' and notification.data.code == 487: proposed_streams = self.proposed_streams self.proposed_streams = None self.state = 'connected' notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() elif notification.data.state == 'disconnected': if self.state == 'incoming': self.state = 'terminated' if notification.data.originator == 'remote': notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown')) notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: # There must have been an error involved notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: self.state = 'terminated' notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator)) for stream in self.streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if notification.data.originator == 'remote': if hasattr(notification.data, 'method'): notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method=notification.data.method, code=200, reason=sip_status_messages[200])) else: notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method='INVITE', code=notification.data.code, reason=notification.data.reason)) self.end_time = ISOTimestamp.now() notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason)) notification.center.remove_observer(self, sender=self._invitation) finally: self.greenlet = None for notification in unhandled_notifications: self.handle_notification(notification) def _NH_SIPInvitationGotSDPUpdate(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidInitialize(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_RTPStreamDidEnableEncryption(self, notification): if notification.sender.type != 'audio': return audio_stream = notification.sender if audio_stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: video_stream = next(stream for stream in self.streams or [] if stream.type=='video') except StopIteration: return if video_stream.encryption.type == 'ZRTP' and not video_stream.encryption.active: video_stream.encryption.zrtp._enable(audio_stream) def _NH_MediaStreamDidStart(self, notification): stream = notification.sender if stream.type == 'audio' and stream.encryption.type == 'ZRTP': stream.encryption.zrtp._enable() elif stream.type == 'video' and stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: audio_stream = next(stream for stream in self.streams or [] if stream.type=='audio') except StopIteration: pass else: if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active: stream.encryption.zrtp._enable(audio_stream) if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if self.greenlet is not None and self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data)) def _NH_MediaStreamDidFail(self, notification): if self.greenlet is not None: if self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) else: stream = notification.sender if self.streams == [stream]: self.end() else: try: self.remove_stream(stream) except IllegalStateError: self.end() @implementer(IObserver) class SessionManager(object, metaclass=Singleton): def __init__(self): self.sessions = [] self.state = None self._channel = coros.queue() def start(self): self.state = 'starting' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillStart', sender=self) notification_center.add_observer(self, 'SIPInvitationChangedState') notification_center.add_observer(self, 'SIPSessionNewIncoming') notification_center.add_observer(self, 'SIPSessionNewOutgoing') notification_center.add_observer(self, 'SIPSessionDidFail') notification_center.add_observer(self, 'SIPSessionDidEnd') self.state = 'started' notification_center.post_notification('SIPSessionManagerDidStart', sender=self) def stop(self): self.state = 'stopping' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillEnd', sender=self) for session in self.sessions: session.end() while self.sessions: self._channel.wait() notification_center.remove_observer(self, 'SIPInvitationChangedState') notification_center.remove_observer(self, 'SIPSessionNewIncoming') notification_center.remove_observer(self, 'SIPSessionNewOutgoing') notification_center.remove_observer(self, 'SIPSessionDidFail') notification_center.remove_observer(self, 'SIPSessionDidEnd') self.state = 'stopped' notification_center.post_notification('SIPSessionManagerDidEnd', sender=self) @run_in_twisted_thread def handle_notification(self, notification): if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming': account_manager = AccountManager() account = account_manager.find_account(notification.data.request_uri) if account is None: notification.sender.send_response(404) return notification.sender.send_response(100) session = Session(account) session.init_incoming(notification.sender, notification.data) elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'): self.sessions.append(notification.sender) elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): self.sessions.remove(notification.sender) if self.state == 'stopping': self._channel.send(notification) diff --git a/sipsimple/video.py b/sipsimple/video.py index c5a1034f..22c4e9f2 100644 --- a/sipsimple/video.py +++ b/sipsimple/video.py @@ -1,79 +1,79 @@ """Video support""" __all__ = ['IVideoProducer', 'VideoDevice', 'VideoError'] -from .application.notification import NotificationCenter, NotificationData +from application.notification import NotificationCenter, NotificationData from zope.interface import Attribute, Interface, implementer from sipsimple.core import SIPCoreError, VideoCamera class IVideoProducer(Interface): """ Interface describing an object which can produce video data. All attributes of this interface are read-only. """ producer = Attribute("The core producer object which can be connected to a consumer") class VideoError(Exception): pass @implementer(IVideoProducer) class VideoDevice(object): def __init__(self, device_name, resolution, framerate): self._camera = self._open_camera(device_name, resolution, framerate) self._camera.start() def _open_camera(self, device_name, resolution, framerate): try: return VideoCamera(device_name, resolution, framerate) except SIPCoreError: try: return VideoCamera('system_default', resolution, framerate) except SIPCoreError: return VideoCamera(None, resolution, framerate) def set_camera(self, device_name, resolution, framerate): old_camera = self._camera old_camera.close() new_camera = self._open_camera(device_name, resolution, framerate) if not self.muted: new_camera.start() self._camera = new_camera notification_center = NotificationCenter() notification_center.post_notification('VideoDeviceDidChangeCamera', sender=self, data=NotificationData(old_camera=old_camera, new_camera=new_camera)) @property def producer(self): return self._camera @property def name(self): return self._camera.name @property def real_name(self): return self._camera.real_name @property def muted(self): return self.__dict__.get('muted', False) @muted.setter def muted(self, value): if not isinstance(value, bool): raise ValueError('illegal value for muted property: %r' % (value,)) if value == self.muted: return if value: self._camera.stop() else: self._camera.start() self.__dict__['muted'] = value