Page MenuHomePhabricator

No OneTemporary

This file is larger than 256 KB, so syntax highlighting was skipped.
diff --git a/sipsimple/account/__init__.py b/sipsimple/account/__init__.py
index a6934ac4..8c5f082d 100644
--- a/sipsimple/account/__init__.py
+++ b/sipsimple/account/__init__.py
@@ -1,888 +1,889 @@
"""
Implements a SIP Account management system that allows the definition of
multiple SIP accounts and their properties.
"""
__all__ = ['Account', 'BonjourAccount', 'AccountManager']
from itertools import chain
from threading import Lock
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.descriptor import classproperty
from application.python.types import Singleton
from application.system import host as Host
from eventlib import coros, proc
from gnutls.crypto import X509Certificate, X509PrivateKey
from gnutls.errors import GNUTLSError
from gnutls.interfaces.twisted import X509Credentials
from zope.interface import implementer
from sipsimple.account.bonjour import BonjourServices, _bonjour
from sipsimple.account.publication import PresencePublisher, DialogPublisher
from sipsimple.account.registration import Registrar
from sipsimple.account.subscription import MWISubscriber, PresenceWinfoSubscriber, DialogWinfoSubscriber, PresenceSubscriber, SelfPresenceSubscriber, DialogSubscriber
from sipsimple.account.xcap import XCAPManager
from sipsimple.core import Credentials, SIPURI, ContactURIFactory
from sipsimple.configuration import ConfigurationManager, Setting, SettingsGroup, SettingsObject, SettingsObjectID
from sipsimple.configuration.datatypes import AudioCodecList, MSRPConnectionModel, MSRPRelayAddress, MSRPTransport, NonNegativeInteger, Path, SIPAddress, SIPProxyAddress, SRTPKeyNegotiation, STUNServerAddressList, VideoCodecList, XCAPRoot
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.payloads import ParserError
from sipsimple.payloads.messagesummary import MessageSummary
from sipsimple.payloads.pidf import PIDFDocument
from sipsimple.payloads.rlsnotify import RLSNotify
from sipsimple.payloads.watcherinfo import WatcherInfoDocument
from sipsimple.threading import call_in_thread
from sipsimple.threading.green import call_in_green_thread, run_in_green_thread
from sipsimple.util import user_info, execute_once
class AuthSettings(SettingsGroup):
username = Setting(type=str, default=None, nillable=True)
password = Setting(type=str, default='')
class SIPSettings(SettingsGroup):
always_use_my_proxy = Setting(type=bool, default=True)
outbound_proxy = Setting(type=SIPProxyAddress, default=None, nillable=True)
register = Setting(type=bool, default=True)
register_interval = Setting(type=NonNegativeInteger, default=600)
subscribe_interval = Setting(type=NonNegativeInteger, default=600)
publish_interval = Setting(type=NonNegativeInteger, default=600)
+ tls_name = Setting(type=str, default=None, nillable=True)
class SRTPEncryptionSettings(SettingsGroup):
enabled = Setting(type=bool, default=True)
key_negotiation = Setting(type=SRTPKeyNegotiation, default='opportunistic')
class RTPSettings(SettingsGroup):
audio_codec_list = Setting(type=AudioCodecList, default=None, nillable=True)
video_codec_list = Setting(type=VideoCodecList, default=None, nillable=True)
encryption = SRTPEncryptionSettings
class NATTraversalSettings(SettingsGroup):
use_ice = Setting(type=bool, default=False)
stun_server_list = Setting(type=STUNServerAddressList, default=None, nillable=True)
msrp_relay = Setting(type=MSRPRelayAddress, default=None, nillable=True)
use_msrp_relay_for_outbound = Setting(type=bool, default=False)
class MessageSummarySettings(SettingsGroup):
enabled = Setting(type=bool, default=False)
voicemail_uri = Setting(type=SIPAddress, default=None, nillable=True)
class XCAPSettings(SettingsGroup):
enabled = Setting(type=bool, default=False)
discovered = Setting(type=bool, default=False)
xcap_root = Setting(type=XCAPRoot, default=None, nillable=True)
xcap_diff = Setting(type=bool, default=True)
class PresenceSettings(SettingsGroup):
enabled = Setting(type=bool, default=False)
class TLSSettings(SettingsGroup):
ca_list = Setting(type=Path, default=None, nillable=True)
certificate = Setting(type=Path, default=None, nillable=True)
verify_server = Setting(type=bool, default=False)
class MSRPSettings(SettingsGroup):
transport = Setting(type=MSRPTransport, default='tls')
connection_model = Setting(type=MSRPConnectionModel, default='relay')
@implementer(IObserver)
class Account(SettingsObject):
"""
Object representing a SIP account. Contains configuration settings and
attributes for accessing SIP related objects.
When the account is active, it will register, publish its presence and
subscribe to watcher-info events depending on its settings.
If the object is un-pickled and its enabled flag was set, it will
automatically activate.
When the save method is called, depending on the value of the enabled flag,
the account will activate/deactivate.
Notifications sent by instances of Account:
* CFGSettingsObjectWasCreated
* CFGSettingsObjectWasActivated
* CFGSettingsObjectWasDeleted
* CFGSettingsObjectDidChange
* SIPAccountWillActivate
* SIPAccountDidActivate
* SIPAccountWillDeactivate
* SIPAccountDidDeactivate
"""
__group__ = 'Accounts'
__id__ = SettingsObjectID(type=SIPAddress)
id = __id__
enabled = Setting(type=bool, default=False)
display_name = Setting(type=str, default=None, nillable=True)
auth = AuthSettings
sip = SIPSettings
rtp = RTPSettings
nat_traversal = NATTraversalSettings
message_summary = MessageSummarySettings
msrp = MSRPSettings
presence = PresenceSettings
xcap = XCAPSettings
tls = TLSSettings
def __new__(cls, id):
#with AccountManager.load.lock:
# if not AccountManager.load.called:
# raise RuntimeError("cannot instantiate %s before calling AccountManager.load" % cls.__name__)
return SettingsObject.__new__(cls, id)
def __init__(self, id):
self.contact = ContactURIFactory()
self.xcap_manager = XCAPManager(self)
self._started = False
self._deleted = False
self._active = False
self._activation_lock = coros.Semaphore(1)
self._registrar = Registrar(self)
self._mwi_subscriber = MWISubscriber(self)
self._pwi_subscriber = PresenceWinfoSubscriber(self)
self._dwi_subscriber = DialogWinfoSubscriber(self)
self._presence_subscriber = PresenceSubscriber(self)
self._self_presence_subscriber = SelfPresenceSubscriber(self)
self._dialog_subscriber = DialogSubscriber(self)
self._presence_publisher = PresencePublisher(self)
self._dialog_publisher = DialogPublisher(self)
self._mwi_voicemail_uri = None
self._pwi_version = None
self._dwi_version = None
self._presence_version = None
self._dialog_version = None
self.trusted_cas = []
self.ca_list = None
def start(self):
if self._started or self._deleted:
return
self._started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.add_observer(self, name='XCAPManagerDidDiscoverServerCapabilities', sender=self.xcap_manager)
notification_center.add_observer(self, sender=self._mwi_subscriber)
notification_center.add_observer(self, sender=self._pwi_subscriber)
notification_center.add_observer(self, sender=self._dwi_subscriber)
notification_center.add_observer(self, sender=self._presence_subscriber)
notification_center.add_observer(self, sender=self._self_presence_subscriber)
notification_center.add_observer(self, sender=self._dialog_subscriber)
self.xcap_manager.init()
if self.enabled:
self._activate()
def stop(self):
if not self._started:
return
self._started = False
self._deactivate()
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.remove_observer(self, name='XCAPManagerDidDiscoverServerCapabilities', sender=self.xcap_manager)
notification_center.remove_observer(self, sender=self._mwi_subscriber)
notification_center.remove_observer(self, sender=self._pwi_subscriber)
notification_center.remove_observer(self, sender=self._dwi_subscriber)
notification_center.remove_observer(self, sender=self._presence_subscriber)
notification_center.remove_observer(self, sender=self._self_presence_subscriber)
notification_center.remove_observer(self, sender=self._dialog_subscriber)
def delete(self):
if self._deleted:
return
self._deleted = True
self.stop()
self._registrar = None
self._mwi_subscriber = None
self._pwi_subscriber = None
self._dwi_subscriber = None
self._presence_subscriber = None
self._self_presence_subscriber = None
self._dialog_subscriber = None
self._presence_publisher = None
self._dialog_publisher = None
self.xcap_manager = None
SettingsObject.delete(self)
@run_in_green_thread
def reregister(self):
if self._started:
self._registrar.reregister()
@run_in_green_thread
def resubscribe(self):
if self._started:
self._mwi_subscriber.resubscribe()
self._pwi_subscriber.resubscribe()
self._dwi_subscriber.resubscribe()
self._presence_subscriber.resubscribe()
self._self_presence_subscriber.resubscribe()
self._dialog_subscriber.resubscribe()
@property
def credentials(self):
username = self.auth.username or self.id.username
username = username.encode() if username else None
password = self.auth.password.encode() if self.auth.password else None
return Credentials(username, password)
@property
def registered(self):
try:
return self._registrar.registered
except AttributeError:
return False
@property
def mwi_active(self):
try:
return self._mwi_subscriber.subscribed
except AttributeError:
return False
@property
def tls_credentials(self):
# This property can be optimized to cache the credentials it loads from disk,
# however this is not a time consuming operation (~ 3000 req/sec). -Luci
settings = SIPSimpleSettings()
tls_certificate = self.tls.certificate or settings.tls.certificate
certificate = None
private_key = None
if tls_certificate is not None:
try:
certificate_data = open(tls_certificate.normalized).read()
certificate = X509Certificate(certificate_data)
private_key = X509PrivateKey(certificate_data)
except (FileNotFoundError, GNUTLSError, UnicodeDecodeError):
pass
trusted_cas = []
ca_list = self.tls.ca_list or settings.tls.ca_list
if ca_list is not None:
if len(self.trusted_cas) > 0:
trusted_cas = self.trusted_cas
else:
crt = None
start = False
try:
ca_text = open(ca_list.normalized).read()
except (FileNotFoundError, GNUTLSError, UnicodeDecodeError):
ca_text = ''
for line in ca_text.split("\n"):
if "BEGIN CERT" in line:
start = True
crt = line + "\n"
elif "END CERT" in line:
crt = crt + line + "\n"
end = True
start = False
try:
trusted_cas.append(X509Certificate(crt))
except (GNUTLSError, ValueError) as e:
continue
elif start:
crt = crt + line + "\n"
self.trusted_cas = trusted_cas
self.ca_list = ca_list
credentials = X509Credentials(certificate, private_key, trusted_cas)
credentials.verify_peer = self.tls.verify_server or settings.tls.certificate
return credentials
@property
def uri(self):
return SIPURI(user=self.id.username, host=self.id.domain)
@property
def voicemail_uri(self):
return self._mwi_voicemail_uri or self.message_summary.voicemail_uri
@property
def presence_state(self):
try:
return self._presence_publisher.state
except AttributeError:
return None
@presence_state.setter
def presence_state(self, state):
try:
self._presence_publisher.state = state
except AttributeError:
pass
@property
def dialog_state(self):
try:
return self._dialog_publisher.state
except AttributeError:
return None
@dialog_state.setter
def dialog_state(self, state):
try:
self._dialog_publisher.state = state
except AttributeError:
pass
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if self._started and 'enabled' in notification.data.modified:
if self.enabled:
self._activate()
else:
self._deactivate()
def _NH_XCAPManagerDidDiscoverServerCapabilities(self, notification):
if self._started and self.xcap.discovered is False:
self.xcap.discovered = True
self.save()
notification.center.post_notification('SIPAccountDidDiscoverXCAPSupport', sender=self)
def _NH_MWISubscriberDidDeactivate(self, notification):
self._mwi_voicemail_uri = None
def _NH_MWISubscriptionGotNotify(self, notification):
body = notification.data.body.decode() if notification.data.body else None
if body and notification.data.content_type == MessageSummary.content_type:
try:
message_summary = MessageSummary.parse(body)
except ParserError:
pass
else:
self._mwi_voicemail_uri = message_summary.message_account and SIPAddress(message_summary.message_account.replace('sip:', '', 1)) or None
notification.center.post_notification('SIPAccountGotMessageSummary', sender=self, data=NotificationData(message_summary=message_summary))
def _NH_PresenceWinfoSubscriptionGotNotify(self, notification):
body = notification.data.body.decode() if notification.data.body else None
if body and notification.data.content_type == WatcherInfoDocument.content_type:
try:
watcher_info = WatcherInfoDocument.parse(body)
watcher_list = watcher_info['sip:' + self.id]
except (ParserError, KeyError):
pass
else:
if watcher_list.package != 'presence':
return
if self._pwi_version is None:
if watcher_info.state == 'partial':
self._pwi_subscriber.resubscribe()
elif watcher_info.version <= self._pwi_version:
return
elif watcher_info.state == 'partial' and watcher_info.version > self._pwi_version + 1:
self._pwi_subscriber.resubscribe()
self._pwi_version = watcher_info.version
data = NotificationData(version=watcher_info.version, state=watcher_info.state, watcher_list=watcher_list)
notification.center.post_notification('SIPAccountGotPresenceWinfo', sender=self, data=data)
def _NH_PresenceWinfoSubscriptionDidEnd(self, notification):
self._pwi_version = None
def _NH_PresenceWinfoSubscriptionDidFail(self, notification):
self._pwi_version = None
def _NH_DialogWinfoSubscriptionGotNotify(self, notification):
body = notification.data.body.decode() if notification.data.body else None
if body and notification.data.content_type == WatcherInfoDocument.content_type:
try:
watcher_info = WatcherInfoDocument.parse(body)
watcher_list = watcher_info['sip:' + self.id]
except (ParserError, KeyError):
pass
else:
if watcher_list.package != 'dialog':
return
if self._dwi_version is None:
if watcher_info.state == 'partial':
self._dwi_subscriber.resubscribe()
elif watcher_info.version <= self._dwi_version:
return
elif watcher_info.state == 'partial' and watcher_info.version > self._dwi_version + 1:
self._dwi_subscriber.resubscribe()
self._dwi_version = watcher_info.version
data = NotificationData(version=watcher_info.version, state=watcher_info.state, watcher_list=watcher_list)
notification.center.post_notification('SIPAccountGotDialogWinfo', sender=self, data=data)
def _NH_DialogWinfoSubscriptionDidEnd(self, notification):
self._dwi_version = None
def _NH_DialogWinfoSubscriptionDidFail(self, notification):
self._dwi_version = None
def _NH_PresenceSubscriptionGotNotify(self, notification):
body = notification.data.body.decode() if notification.data.body else None
if body and notification.data.content_type == RLSNotify.content_type:
try:
rls_notify = RLSNotify.parse('{content_type}\r\n\r\n{body}'.format(content_type=notification.data.headers['Content-Type'], body=body))
except ParserError:
pass
else:
if rls_notify.uri != self.xcap_manager.rls_presence_uri:
return
if self._presence_version is None:
if not rls_notify.full_state:
self._presence_subscriber.resubscribe()
elif rls_notify.version <= self._presence_version:
return
elif not rls_notify.full_state and rls_notify.version > self._presence_version + 1:
self._presence_subscriber.resubscribe()
self._presence_version = rls_notify.version
data = NotificationData(version=rls_notify.version, full_state=rls_notify.full_state, resource_map=dict((str(resource.uri), resource) for resource in rls_notify))
notification.center.post_notification('SIPAccountGotPresenceState', sender=self, data=data)
def _NH_PresenceSubscriptionDidEnd(self, notification):
self._presence_version = None
def _NH_PresenceSubscriptionDidFail(self, notification):
self._presence_version = None
def _NH_SelfPresenceSubscriptionGotNotify(self, notification):
body = notification.data.body.decode() if notification.data.body else None
if body and notification.data.content_type == PIDFDocument.content_type:
try:
pidf_doc = PIDFDocument.parse(body)
except ParserError:
pass
else:
if pidf_doc.entity.partition('sip:')[2] != self.id:
return
notification.center.post_notification('SIPAccountGotSelfPresenceState', sender=self, data=NotificationData(pidf=pidf_doc))
def _NH_DialogSubscriptionGotNotify(self, notification):
body = notification.data.body.decode() if notification.data.body else None
if body and notification.data.content_type == RLSNotify.content_type:
try:
rls_notify = RLSNotify.parse('{content_type}\r\n\r\n{body}'.format(content_type=notification.data.headers['Content-Type'], body=body))
except ParserError:
pass
else:
if rls_notify.uri != self.xcap_manager.rls_dialog_uri:
return
if self._dialog_version is None:
if not rls_notify.full_state:
self._dialog_subscriber.resubscribe()
elif rls_notify.version <= self._dialog_version:
return
elif not rls_notify.full_state and rls_notify.version > self._dialog_version + 1:
self._dialog_subscriber.resubscribe()
self._dialog_version = rls_notify.version
data = NotificationData(version=rls_notify.version, full_state=rls_notify.full_state, resource_map=dict((resource.uri, resource) for resource in rls_notify))
notification.center.post_notification('SIPAccountGotDialogState', sender=self, data=data)
def _NH_DialogSubscriptionDidEnd(self, notification):
self._dialog_version = None
def _NH_DialogSubscriptionDidFail(self, notification):
self._dialog_version = None
def _activate(self):
with self._activation_lock:
if self._active:
return
notification_center = NotificationCenter()
notification_center.post_notification('SIPAccountWillActivate', sender=self)
self._active = True
self._registrar.start()
self._mwi_subscriber.start()
self._pwi_subscriber.start()
self._dwi_subscriber.start()
self._presence_subscriber.start()
self._self_presence_subscriber.start()
self._dialog_subscriber.start()
self._presence_publisher.start()
self._dialog_publisher.start()
if self.xcap.enabled:
self.xcap_manager.start()
notification_center.post_notification('SIPAccountDidActivate', sender=self)
def _deactivate(self):
with self._activation_lock:
if not self._active:
return
notification_center = NotificationCenter()
notification_center.post_notification('SIPAccountWillDeactivate', sender=self)
self._active = False
handlers = [self._registrar, self._mwi_subscriber, self._pwi_subscriber, self._dwi_subscriber,
self._presence_subscriber, self._self_presence_subscriber, self._dialog_subscriber,
self._presence_publisher, self._dialog_publisher, self.xcap_manager]
proc.waitall([proc.spawn(handler.stop) for handler in handlers])
notification_center.post_notification('SIPAccountDidDeactivate', sender=self)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.id)
def __setstate__(self, data):
# This restores the password from its previous location as a top level setting
# after it was moved under the auth group.
SettingsObject.__setstate__(self, data)
if not data.get('auth', {}).get('password') and data.get('password'):
self.auth.password = data.pop('password')
self.save()
class BonjourMSRPSettings(SettingsGroup):
transport = Setting(type=MSRPTransport, default='tls')
class BonjourAccountEnabledSetting(Setting):
def __get__(self, obj, objtype):
if obj is None:
return self
return _bonjour.available and self.values.get(obj, self.default)
def __set__(self, obj, value):
if not _bonjour.available:
raise RuntimeError('mdns support is not available')
Setting.__set__(self, obj, value)
@implementer(IObserver)
class BonjourAccount(SettingsObject):
"""
Object representing a bonjour account. Contains configuration settings and
attributes for accessing bonjour related options.
When the account is active, it will send broadcast its contact address on
the LAN.
If the object is un-pickled and its enabled flag was set, it will
automatically activate.
When the save method is called, depending on the value of the enabled flag,
the account will activate/deactivate.
Notifications sent by instances of Account:
* CFGSettingsObjectWasCreated
* CFGSettingsObjectWasActivated
* CFGSettingsObjectWasDeleted
* CFGSettingsObjectDidChange
* SIPAccountWillActivate
* SIPAccountDidActivate
* SIPAccountWillDeactivate
* SIPAccountDidDeactivate
"""
__group__ = 'Accounts'
__id__ = SIPAddress('bonjour@local')
id = property(lambda self: self.__id__)
enabled = BonjourAccountEnabledSetting(type=bool, default=True)
display_name = Setting(type=str, default=user_info.fullname, nillable=False)
msrp = BonjourMSRPSettings
presence = PresenceSettings
rtp = RTPSettings
tls = TLSSettings
def __new__(cls):
# with AccountManager.load.lock:
# if not AccountManager.load.called:
# raise RuntimeError("cannot instantiate %s before calling AccountManager.load" % cls.__name__)
return SettingsObject.__new__(cls)
def __init__(self):
self.contact = ContactURIFactory()
self.credentials = None
self._started = False
self._active = False
self._activation_lock = coros.Semaphore(1)
self._bonjour_services = BonjourServices(self)
# initialize fake settings (these are here to make the bonjour account quack like a duck)
self.nat_traversal = NATTraversalSettings()
self.nat_traversal.use_ice = False
self.nat_traversal.msrp_relay = None
self.nat_traversal.use_msrp_relay_for_outbound = False
self.xcap = XCAPSettings()
self.xcap.enabled = False
self.xcap.discovered = False
self.xcap.xcap_root = None
def __repr__(self):
return '%s()' % self.__class__.__name__
def start(self):
if self._started:
return
self._started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
self._bonjour_services.start()
if self.enabled:
self._activate()
def stop(self):
if not self._started:
return
self._started = False
self._deactivate()
self._bonjour_services.stop()
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
@classproperty
def mdns_available(cls):
return _bonjour.available
@property
def registered(self):
return False
@property
def tls_credentials(self):
# This property can be optimized to cache the credentials it loads from disk,
# however this is not a time consuming operation (~ 3000 req/sec). -Luci
settings = SIPSimpleSettings()
tls_certificate = self.tls.certificate or settings.tls.certificate
if tls_certificate is not None:
certificate_data = open(tls_certificate.normalized).read()
certificate = X509Certificate(certificate_data)
private_key = X509PrivateKey(certificate_data)
else:
certificate = None
private_key = None
credentials = X509Credentials(certificate, private_key, [])
credentials.verify_peer = False
return credentials
@property
def uri(self):
return SIPURI(user=self.contact.username, host=Host.default_ip or '127.0.0.1')
@property
def presence_state(self):
return self._bonjour_services.presence_state
@presence_state.setter
def presence_state(self, state):
self._bonjour_services.presence_state = state
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if self._started:
if 'enabled' in notification.data.modified:
if self.enabled:
self._activate()
else:
self._deactivate()
elif self.enabled:
if 'display_name' in notification.data.modified:
self._bonjour_services.update_registrations()
if {'sip.transport_list', 'tls.certificate'}.intersection(notification.data.modified):
self._bonjour_services.update_registrations()
self._bonjour_services.restart_discovery()
def _activate(self):
with self._activation_lock:
if self._active:
return
notification_center = NotificationCenter()
notification_center.post_notification('SIPAccountWillActivate', sender=self)
self._active = True
self._bonjour_services.activate()
notification_center.post_notification('SIPAccountDidActivate', sender=self)
def _deactivate(self):
with self._activation_lock:
if not self._active:
return
notification_center = NotificationCenter()
notification_center.post_notification('SIPAccountWillDeactivate', sender=self)
self._active = False
self._bonjour_services.deactivate()
notification_center.post_notification('SIPAccountDidDeactivate', sender=self)
@implementer(IObserver)
class AccountManager(object, metaclass=Singleton):
"""
This is a singleton object which manages all the SIP accounts. It is
also used to manage the default account (the one used for outbound
sessions) using the default_account attribute:
manager = AccountManager()
manager.default_account = manager.get_account('alice@example.net')
The following notifications are sent:
* SIPAccountManagerDidRemoveAccount
* SIPAccountManagerDidAddAccount
* SIPAccountManagerDidChangeDefaultAccount
"""
def __init__(self):
self._lock = Lock()
self.accounts = {}
notification_center = NotificationCenter()
notification_center.add_observer(self, name='CFGSettingsObjectWasActivated')
notification_center.add_observer(self, name='CFGSettingsObjectWasCreated')
@execute_once
def load(self):
"""
Load all accounts from the configuration. The accounts will not be
started until the start method is called.
"""
configuration = ConfigurationManager()
bonjour_account = BonjourAccount()
names = configuration.get_names([Account.__group__])
[Account(id) for id in names if id != bonjour_account.id]
default_account = self.default_account
if default_account is None or not default_account.enabled:
try:
self.default_account = next((account for account in list(self.accounts.values()) if account.enabled))
except StopIteration:
self.default_account = None
def start(self):
"""
Start the accounts, which will determine the ones with the enabled flag
set to activate.
"""
notification_center = NotificationCenter()
notification_center.post_notification('SIPAccountManagerWillStart', sender=self)
proc.waitall([proc.spawn(account.start) for account in list(self.accounts.values())])
notification_center.post_notification('SIPAccountManagerDidStart', sender=self)
def stop(self):
"""
Stop the accounts, which will determine the ones that were enabled to
deactivate. This method returns only once the accounts were stopped
successfully or they timed out trying.
"""
notification_center = NotificationCenter()
notification_center.post_notification('SIPAccountManagerWillEnd', sender=self)
proc.waitall([proc.spawn(account.stop) for account in list(self.accounts.values())])
notification_center.post_notification('SIPAccountManagerDidEnd', sender=self)
def has_account(self, id):
return id in self.accounts
def get_account(self, id):
return self.accounts[id]
def get_accounts(self):
return list(self.accounts.values())
def iter_accounts(self):
return iter(list(self.accounts.values()))
def find_account(self, contact_uri):
# compare contact_address with account contact
exact_matches = (account for account in list(self.accounts.values()) if account.enabled and account.contact.username==contact_uri.user)
# compare username in contact URI with account username
loose_matches = (account for account in list(self.accounts.values()) if account.enabled and account.id.username==contact_uri.user)
return next(chain(exact_matches, loose_matches, [None]))
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_CFGSettingsObjectWasActivated(self, notification):
if isinstance(notification.sender, Account) or (isinstance(notification.sender, BonjourAccount) and _bonjour.available):
account = notification.sender
self.accounts[account.id] = account
notification.center.add_observer(self, sender=account, name='CFGSettingsObjectDidChange')
notification.center.add_observer(self, sender=account, name='CFGSettingsObjectWasDeleted')
notification.center.post_notification('SIPAccountManagerDidAddAccount', sender=self, data=NotificationData(account=account))
from sipsimple.application import SIPApplication
if SIPApplication.running:
call_in_green_thread(account.start)
def _NH_CFGSettingsObjectWasCreated(self, notification):
if isinstance(notification.sender, Account):
account = notification.sender
if account.enabled and self.default_account is None:
self.default_account = account
def _NH_CFGSettingsObjectWasDeleted(self, notification):
account = notification.sender
del self.accounts[account.id]
notification.center.remove_observer(self, sender=account, name='CFGSettingsObjectDidChange')
notification.center.remove_observer(self, sender=account, name='CFGSettingsObjectWasDeleted')
notification.center.post_notification('SIPAccountManagerDidRemoveAccount', sender=self, data=NotificationData(account=account))
def _NH_CFGSettingsObjectDidChange(self, notification):
account = notification.sender
if '__id__' in notification.data.modified:
modified_id = notification.data.modified['__id__']
self.accounts[modified_id.new] = self.accounts.pop(modified_id.old)
if 'enabled' in notification.data.modified:
if account.enabled and self.default_account is None:
self.default_account = account
elif not account.enabled and self.default_account is account:
try:
self.default_account = next((account for account in list(self.accounts.values()) if account.enabled))
except StopIteration:
self.default_account = None
@property
def default_account(self):
settings = SIPSimpleSettings()
return self.accounts.get(settings.default_account, None)
@default_account.setter
def default_account(self, account):
if account is not None and not account.enabled:
raise ValueError("account %s is not enabled" % account.id)
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
with self._lock:
old_account = self.accounts.get(settings.default_account, None)
if account is old_account:
return
if account is None:
settings.default_account = None
else:
settings.default_account = account.id
settings.save()
# we need to post the notification in the file-io thread in order to have it serialized after the
# SIPAccountManagerDidAddAccount notification that is triggered when the account is saved the first
# time, because save is executed in the file-io thread while this runs in the current thread. -Dan
call_in_thread('file-io', notification_center.post_notification, 'SIPAccountManagerDidChangeDefaultAccount', sender=self, data=NotificationData(old_account=old_account, account=account))
diff --git a/sipsimple/account/publication.py b/sipsimple/account/publication.py
index d25649dd..c4c7782b 100644
--- a/sipsimple/account/publication.py
+++ b/sipsimple/account/publication.py
@@ -1,398 +1,398 @@
"""Implements the publisher handlers"""
__all__ = ['Publisher', 'PresencePublisher', 'DialogPublisher']
import random
from abc import ABCMeta, abstractproperty
from threading import Lock
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.types import MarkerType
from application.system import host as Host
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implementer
from sipsimple.core import FromHeader, Publication, PublicationETagError, RouteHeader, SIPURI, SIPCoreError
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads.dialoginfo import DialogInfoDocument
from sipsimple.payloads.pidf import PIDFDocument
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
Command.register_defaults('publish', refresh_interval=None)
class SameState(metaclass=MarkerType): pass
class SIPPublicationDidFail(Exception):
def __init__(self, data):
self.data = data
class SIPPublicationDidNotEnd(Exception):
def __init__(self, data):
self.data = data
class PublicationError(Exception):
def __init__(self, error, retry_after, refresh_interval=None):
self.error = error
self.retry_after = retry_after
self.refresh_interval = refresh_interval
class PublisherNickname(dict):
def __missing__(self, name):
return self.setdefault(name, name[:-9] if name.endswith('Publisher') else name)
def __get__(self, obj, objtype):
return self[objtype.__name__]
def __set__(self, obj, value):
raise AttributeError('cannot set attribute')
def __delete__(self, obj):
raise AttributeError('cannot delete attribute')
@implementer(IObserver)
class Publisher(object, metaclass=ABCMeta):
__nickname__ = PublisherNickname()
__transports__ = frozenset(['tls', 'tcp', 'udp'])
def __init__(self, account):
self.account = account
self.started = False
self.active = False
self.publishing = False
self._lock = Lock()
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._publication = None
self._dns_wait = 1
self._publish_wait = 1
self._publication_timer = None
self.__dict__['state'] = None
@abstractproperty
def event(self):
return None
@abstractproperty
def payload_type(self):
return None
@property
def extra_headers(self):
return []
@property
def state(self):
return self.__dict__['state']
@state.setter
def state(self, state):
if state is not None and not isinstance(state, self.payload_type.root_element):
raise ValueError("state must be a %s document or None" % self.payload_type.root_element.__name__)
with self._lock:
old_state = self.__dict__['state']
self.__dict__['state'] = state
if state == old_state:
return
self._publish(state)
def start(self):
if self.started:
return
self.started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillStart', sender=self)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
notification_center.post_notification(self.__class__.__name__ + 'DidStart', sender=self)
notification_center.remove_observer(self, sender=self)
def stop(self):
if not self.started:
return
self.started = False
self.active = False
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillEnd', sender=self)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self._command_proc = None
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
notification_center.post_notification(self.__class__.__name__ + 'DidEnd', sender=self)
notification_center.remove_observer(self, sender=self)
def activate(self):
if not self.started:
raise RuntimeError("not started")
self.active = True
self._command_channel.send(Command('publish', state=self.state))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidActivate', sender=self)
def deactivate(self):
if not self.started:
raise RuntimeError("not started")
self.active = False
self._command_channel.send(Command('unpublish'))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
@run_in_twisted_thread
def _publish(self, state):
if not self.active:
return
if state is None:
self._command_channel.send(Command('unpublish'))
else:
self._command_channel.send(Command('publish', state=state))
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_publish(self, command):
if command.state is None or self._publication is None and command.state is SameState:
command.signal()
return
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._publication_timer is not None and self._publication_timer.active():
self._publication_timer.cancel()
self._publication_timer = None
if self._publication is None:
duration = command.refresh_interval or self.account.sip.publish_interval
from_header = FromHeader(self.account.uri, self.account.display_name)
self._publication = Publication(from_header, self.event, self.payload_type.content_type, credentials=self.account.credentials, duration=duration, extra_headers=self.extra_headers)
notification_center.add_observer(self, sender=self._publication)
notification_center.post_notification(self.__class__.__name__ + 'WillPublish', sender=self, data=NotificationData(state=command.state, duration=duration))
else:
notification_center.post_notification(self.__class__.__name__ + 'WillRefresh', sender=self, data=NotificationData(state=command.state))
try:
if Host.default_ip is None:
raise PublicationError('No IP address', retry_after=60)
# Lookup routes
valid_transports = self.__transports__.intersection(settings.sip.transport_list)
if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in valid_transports:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
else:
uri = SIPURI(host=self.account.id.domain)
lookup = DNSLookup()
try:
- routes = lookup.lookup_sip_proxy(uri, valid_transports).wait()
+ routes = lookup.lookup_sip_proxy(uri, valid_transports, tls_name=self.account.sip.tls_name).wait()
except DNSLookupError as e:
retry_after = random.uniform(self._dns_wait, 2*self._dns_wait)
self._dns_wait = limit(2*self._dns_wait, max=30)
raise PublicationError('DNS lookup failed: %s' % e, retry_after=retry_after)
else:
self._dns_wait = 1
body = None if command.state is SameState else command.state.toxml()
# Publish by trying each route in turn
publish_timeout = time() + 30
for route in routes:
if Host.default_ip is None:
raise PublicationError('No IP address', retry_after=60)
remaining_time = publish_timeout-time()
if remaining_time > 0:
try:
try:
self._publication.publish(body, RouteHeader(route.uri), timeout=limit(remaining_time, min=1, max=10))
except ValueError as e: # this happens for an initial PUBLISH with body=None
raise PublicationError(str(e), retry_after=0)
except PublicationETagError:
state = self.state # access self.state only once to avoid race conditions
if state is not None:
self._publication.publish(state.toxml(), RouteHeader(route.uri), timeout=limit(remaining_time, min=1, max=10))
else:
command.signal()
return
except SIPCoreError:
raise PublicationError('Internal error', retry_after=5)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPPublicationDidSucceed':
break
if notification.name == 'SIPPublicationDidEnd':
raise PublicationError('Publication expired', retry_after=random.uniform(60, 120)) # publication expired while we were trying to re-publish
except SIPPublicationDidFail as e:
if e.data.code == 407:
# Authentication failed, so retry the publication in some time
raise PublicationError('Authentication failed', retry_after=random.uniform(60, 120))
elif e.data.code == 412:
raise PublicationError('Conditional request failed', retry_after=0)
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > self.account.sip.publish_interval:
refresh_interval = e.data.min_expires
else:
refresh_interval = None
raise PublicationError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval)
elif e.data.code in (405, 406, 489):
raise PublicationError('Method or event not supported', retry_after=3600)
else:
# Otherwise just try the next route
continue
else:
self.publishing = True
self._publish_wait = 1
command.signal()
break
else:
# There are no more routes to try, reschedule the publication
retry_after = random.uniform(self._publish_wait, 2*self._publish_wait)
self._publish_wait = limit(self._publish_wait*2, max=30)
raise PublicationError('No more routes to try', retry_after=retry_after)
except PublicationError as e:
self.publishing = False
notification_center.remove_observer(self, sender=self._publication)
def publish(e):
if self.active:
self._command_channel.send(Command('publish', event=command.event, state=self.state, refresh_interval=e.refresh_interval))
else:
command.signal()
self._publication_timer = None
self._publication_timer = reactor.callLater(e.retry_after, publish, e)
self._publication = None
notification_center.post_notification(self.__nickname__ + 'PublicationDidFail', sender=self, data=NotificationData(reason=e.error))
else:
notification_center.post_notification(self.__nickname__ + 'PublicationDidSucceed', sender=self)
def _CH_unpublish(self, command):
# Cancel any timer which would restart the publication process
if self._publication_timer is not None and self._publication_timer.active():
self._publication_timer.cancel()
self._publication_timer = None
publishing = self.publishing
self.publishing = False
if self._publication is not None:
notification_center = NotificationCenter()
if publishing:
self._publication.end(timeout=2)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPPublicationDidEnd':
break
except (SIPPublicationDidFail, SIPPublicationDidNotEnd):
notification_center.post_notification(self.__nickname__ + 'PublicationDidNotEnd', sender=self)
else:
notification_center.post_notification(self.__nickname__ + 'PublicationDidEnd', sender=self)
notification_center.remove_observer(self, sender=self._publication)
self._publication = None
command.signal()
def _CH_terminate(self, command):
self._CH_unpublish(command)
raise proc.ProcExit
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPPublicationDidSucceed(self, notification):
if notification.sender is self._publication:
self._data_channel.send(notification)
def _NH_SIPPublicationDidFail(self, notification):
if notification.sender is self._publication:
self._data_channel.send_exception(SIPPublicationDidFail(notification.data))
def _NH_SIPPublicationDidEnd(self, notification):
if notification.sender is self._publication:
self._data_channel.send(notification)
def _NH_SIPPublicationDidNotEnd(self, notification):
if notification.sender is self._publication:
self._data_channel.send_exception(SIPPublicationDidNotEnd(notification.data))
def _NH_SIPPublicationWillExpire(self, notification):
if notification.sender is self._publication:
self._publish(SameState)
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'presence.enabled' in notification.data.modified:
if self.account.presence.enabled:
self.activate()
else:
self.deactivate()
elif self.active and {'__id__', 'auth.password', 'auth.username', 'sip.outbound_proxy', 'sip.transport_list', 'sip.publish_interval'}.intersection(notification.data.modified):
self._command_channel.send(Command('unpublish'))
self._command_channel.send(Command('publish', state=self.state))
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._command_channel.send(Command('unpublish'))
self._command_channel.send(Command('publish', state=self.state))
class PresencePublisher(Publisher):
"""A publisher for presence state"""
@property
def event(self):
return 'presence'
@property
def payload_type(self):
return PIDFDocument
def _NH_PresencePublisherDidStart(self, notification):
if self.account.presence.enabled:
self.activate()
class DialogPublisher(Publisher):
"""A publisher for dialog info state"""
@property
def event(self):
return 'dialog'
@property
def payload_type(self):
return DialogInfoDocument
def _NH_DialogPublisherDidStart(self, notification):
if self.account.presence.enabled:
self.activate()
diff --git a/sipsimple/account/registration.py b/sipsimple/account/registration.py
index 3798c57a..3b955709 100644
--- a/sipsimple/account/registration.py
+++ b/sipsimple/account/registration.py
@@ -1,324 +1,324 @@
"""Implements the registration handler"""
__all__ = ['Registrar']
import random
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.system import host as Host
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implementer
from sipsimple.core import ContactHeader, FromHeader, Header, Registration, RouteHeader, SIPURI, SIPCoreError, NoGRUU
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
Command.register_defaults('register', refresh_interval=None)
class SIPRegistrationDidFail(Exception):
def __init__(self, data):
self.data = data
class SIPRegistrationDidNotEnd(Exception):
def __init__(self, data):
self.data = data
class RegistrationError(Exception):
def __init__(self, error, retry_after, refresh_interval=None):
self.error = error
self.retry_after = retry_after
self.refresh_interval = refresh_interval
@implementer(IObserver)
class Registrar(object):
def __init__(self, account):
self.account = account
self.started = False
self.active = False
self.registered = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._registration = None
self._dns_wait = 1
self._register_wait = 1
self._registration_timer = None
def start(self):
if self.started:
return
self.started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
if self.account.sip.register:
self.activate()
def stop(self):
if not self.started:
return
self.started = False
self.active = False
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self._command_proc = None
def activate(self):
if not self.started:
raise RuntimeError("not started")
self.active = True
self._command_channel.send(Command('register'))
def deactivate(self):
if not self.started:
raise RuntimeError("not started")
self.active = False
self._command_channel.send(Command('unregister'))
def reregister(self):
if self.active:
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
def _run(self):
while True:
command = self._command_channel.wait()
#print('Registrar for %s got command %s' % (self.account.id, command.name))
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_register(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._registration_timer is not None and self._registration_timer.active():
self._registration_timer.cancel()
self._registration_timer = None
try:
if Host.default_ip is None:
raise RegistrationError('No IP address', retry_after=60)
# Initialize the registration
if self._registration is None:
duration = command.refresh_interval or self.account.sip.register_interval
try:
self._registration = Registration(FromHeader(self.account.uri, self.account.display_name),
credentials=self.account.credentials,
duration=duration,
extra_headers=[Header('Supported', 'gruu')])
except Exception as e:
raise RegistrationError('Cannot create registration: %s' % str(e), retry_after=120)
notification_center.add_observer(self, sender=self._registration)
notification_center.post_notification('SIPAccountWillRegister', sender=self.account)
else:
notification_center.post_notification('SIPAccountRegistrationWillRefresh', sender=self.account)
# Lookup routes
if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
else:
uri = SIPURI(host=self.account.id.domain)
lookup = DNSLookup()
try:
- routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
+ routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list, tls_name=self.account.sip.tls_name).wait()
except DNSLookupError as e:
retry_after = int(random.uniform(self._dns_wait, 2*self._dns_wait))
self._dns_wait = limit(2*self._dns_wait, max=30)
raise RegistrationError('DNS lookup failed: %s' % e, retry_after=retry_after)
else:
self._dns_wait = 1
# Register by trying each route in turn
register_timeout = time() + 30
i = 0
for route in routes:
i += 1
remaining_time = register_timeout-time()
if remaining_time > 0:
try:
contact_uri = self.account.contact[NoGRUU, route]
except KeyError:
continue
contact_header = ContactHeader(contact_uri)
instance_id = '"<%s>"' % settings.instance_id
contact_header.parameters[b"+sip.instance"] = instance_id.encode()
if self.account.nat_traversal.use_ice:
contact_header.parameters[b"+sip.ice"] = None
route_header = RouteHeader(route.uri)
try:
self._registration.register(contact_header, route_header, timeout=limit(remaining_time, min=1, max=10))
except SIPCoreError:
raise RegistrationError('Internal error', retry_after=5)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPRegistrationDidSucceed':
break
if notification.name == 'SIPRegistrationDidEnd':
raise RegistrationError('Registration expired', retry_after=int(random.uniform(60, 120))) # registration expired while we were trying to re-register
except SIPRegistrationDidFail as e:
notification_data = NotificationData(code=e.data.code, reason=e.data.reason, registration=self._registration, registrar=route)
notification_center.post_notification('SIPAccountRegistrationGotAnswer', sender=self.account, data=notification_data)
if e.data.code == 401:
# Authentication failed, so retry the registration in some time
raise RegistrationError('Authentication failed', retry_after=int(random.uniform(60, 120)))
elif e.data.code == 408:
# Timeout
raise RegistrationError('Request timeout', retry_after=int(random.uniform(15, 40)))
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > self.account.sip.register_interval:
refresh_interval = e.data.min_expires
else:
refresh_interval = None
raise RegistrationError('Interval too short', retry_after=int(random.uniform(60, 120)), refresh_interval=refresh_interval)
else:
if i == len(routes):
raise RegistrationError(e.data.reason, retry_after=int(random.uniform(15, 40)))
else:
# Otherwise just try the next route
continue
else:
notification_data = NotificationData(code=notification.data.code, reason=notification.data.reason, registration=self._registration, registrar=route)
notification_center.post_notification('SIPAccountRegistrationGotAnswer', sender=self.account, data=notification_data)
self.registered = True
# Save GRUU
try:
header = next(header for header in notification.data.contact_header_list if header.parameters.get('+sip.instance', '').strip('"<>') == settings.instance_id)
except StopIteration:
self.account.contact.public_gruu = None
self.account.contact.temporary_gruu = None
else:
public_gruu = header.parameters.get('pub-gruu', None)
temporary_gruu = header.parameters.get('temp-gruu', None)
try:
self.account.contact.public_gruu = SIPURI.parse(public_gruu.strip('"'))
except (AttributeError, SIPCoreError):
self.account.contact.public_gruu = None
try:
self.account.contact.temporary_gruu = SIPURI.parse(temporary_gruu.strip('"'))
except (AttributeError, SIPCoreError):
self.account.contact.temporary_gruu = None
notification_data = NotificationData(contact_header=notification.data.contact_header,
contact_header_list=notification.data.contact_header_list,
expires=notification.data.expires_in, registrar=route)
notification_center.post_notification('SIPAccountRegistrationDidSucceed', sender=self.account, data=notification_data)
self._register_wait = 1
command.signal()
break
else:
# There are no more routes to try, reschedule the registration
retry_after = int(random.uniform(self._register_wait, 2*self._register_wait))
self._register_wait = limit(self._register_wait*2, max=30)
raise RegistrationError('No more routes to try', retry_after=retry_after)
except RegistrationError as e:
self.registered = False
notification_center.discard_observer(self, sender=self._registration)
notification_center.post_notification('SIPAccountRegistrationDidFail', sender=self.account, data=NotificationData(error=e.error, retry_after=e.retry_after))
def register(e):
if self.active:
self._command_channel.send(Command('register', command.event, refresh_interval=e.refresh_interval))
self._registration_timer = None
self._registration_timer = reactor.callLater(e.retry_after, register, e)
self._registration = None
self.account.contact.public_gruu = None
self.account.contact.temporary_gruu = None
def _CH_unregister(self, command):
# Cancel any timer which would restart the registration process
if self._registration_timer is not None and self._registration_timer.active():
self._registration_timer.cancel()
self._registration_timer = None
registered = self.registered
self.registered = False
if self._registration is not None:
notification_center = NotificationCenter()
if registered:
self._registration.end(timeout=2)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPRegistrationDidEnd':
break
except (SIPRegistrationDidFail, SIPRegistrationDidNotEnd) as e:
notification_center.post_notification('SIPAccountRegistrationDidNotEnd', sender=self.account, data=NotificationData(code=e.data.code, reason=e.data.reason,
registration=self._registration))
else:
notification_center.post_notification('SIPAccountRegistrationDidEnd', sender=self.account, data=NotificationData(registration=self._registration))
notification_center.remove_observer(self, sender=self._registration)
self._registration = None
self.account.contact.public_gruu = None
self.account.contact.temporary_gruu = None
command.signal()
def _CH_terminate(self, command):
self._CH_unregister(command)
raise proc.ProcExit
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPRegistrationDidSucceed(self, notification):
if notification.sender is self._registration:
self._data_channel.send(notification)
def _NH_SIPRegistrationDidFail(self, notification):
if notification.sender is self._registration:
self._data_channel.send_exception(SIPRegistrationDidFail(notification.data))
def _NH_SIPRegistrationDidEnd(self, notification):
if notification.sender is self._registration:
self._data_channel.send(notification)
def _NH_SIPRegistrationDidNotEnd(self, notification):
if notification.sender is self._registration:
self._data_channel.send_exception(SIPRegistrationDidNotEnd(notification.data))
def _NH_SIPRegistrationWillExpire(self, notification):
if self.active:
self._command_channel.send(Command('register'))
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'sip.register' in notification.data.modified:
if self.account.sip.register:
self.activate()
else:
self.deactivate()
elif self.active and {'__id__', 'auth.password', 'auth.username', 'nat_traversal.use_ice', 'sip.outbound_proxy', 'sip.transport_list', 'sip.register_interval'}.intersection(notification.data.modified):
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
diff --git a/sipsimple/account/subscription.py b/sipsimple/account/subscription.py
index a88dbbc9..058ca5bc 100644
--- a/sipsimple/account/subscription.py
+++ b/sipsimple/account/subscription.py
@@ -1,502 +1,502 @@
"""Implements the subscription handlers"""
__all__ = ['Subscriber', 'MWISubscriber', 'PresenceWinfoSubscriber', 'DialogWinfoSubscriber', 'PresenceSubscriber', 'SelfPresenceSubscriber', 'DialogSubscriber']
import random
from abc import ABCMeta, abstractproperty
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.system import host as Host
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implementer
from sipsimple.core import ContactHeader, FromHeader, Header, RouteHeader, SIPURI, Subscription, ToHeader, SIPCoreError, NoGRUU
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
Command.register_defaults('subscribe', refresh_interval=None)
class SIPSubscriptionDidFail(Exception):
def __init__(self, data):
self.data = data
class SubscriptionError(Exception):
def __init__(self, error, retry_after, refresh_interval=None):
self.error = error
self.retry_after = retry_after
self.refresh_interval = refresh_interval
class InterruptSubscription(Exception): pass
class TerminateSubscription(Exception): pass
class Content(object):
def __init__(self, body, type):
self.body = body
self.type = type
class SubscriberNickname(dict):
def __missing__(self, name):
return self.setdefault(name, name[:-10] if name.endswith('Subscriber') else name)
def __get__(self, obj, objtype):
return self[objtype.__name__]
def __set__(self, obj, value):
raise AttributeError('cannot set attribute')
def __delete__(self, obj):
raise AttributeError('cannot delete attribute')
@implementer(IObserver)
class Subscriber(object, metaclass=ABCMeta):
__nickname__ = SubscriberNickname()
__transports__ = frozenset(['tls', 'tcp', 'udp'])
def __init__(self, account):
self.account = account
self.started = False
self.active = False
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._subscription = None
self._subscription_proc = None
self._subscription_timer = None
@abstractproperty
def event(self):
return None
@property
def subscription_uri(self):
return self.account.id
@property
def content(self):
return Content(None, None)
@property
def extra_headers(self):
return []
def start(self):
if self.started:
return
self.started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillStart', sender=self)
notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
notification_center.post_notification(self.__class__.__name__ + 'DidStart', sender=self)
notification_center.remove_observer(self, sender=self)
def stop(self):
if not self.started:
return
self.started = False
self.active = False
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillEnd', sender=self)
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self._command_proc = None
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
notification_center.post_notification(self.__class__.__name__ + 'DidEnd', sender=self)
notification_center.remove_observer(self, sender=self)
def activate(self):
if not self.started:
raise RuntimeError("not started")
self.active = True
self._command_channel.send(Command('subscribe'))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidActivate', sender=self)
def deactivate(self):
if not self.started:
raise RuntimeError("not started")
self.active = False
self._command_channel.send(Command('unsubscribe'))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
def resubscribe(self):
if self.active:
self._command_channel.send(Command('subscribe'))
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_subscribe(self, command):
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._subscription_proc = proc.spawn(self._subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._subscription_proc = None
command.signal()
def _CH_terminate(self, command):
self._CH_unsubscribe(command)
raise proc.ProcExit
def _subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
subscription_uri = self.subscription_uri
refresh_interval = command.refresh_interval or self.account.sip.subscribe_interval
valid_transports = self.__transports__.intersection(settings.sip.transport_list)
try:
if Host.default_ip is None:
raise SubscriptionError('No IP address', retry_after=60)
# Lookup routes
if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in valid_transports:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
elif self.account.sip.always_use_my_proxy:
uri = SIPURI(host=self.account.id.domain)
else:
uri = SIPURI(host=subscription_uri.domain)
lookup = DNSLookup()
try:
- routes = lookup.lookup_sip_proxy(uri, valid_transports).wait()
+ routes = lookup.lookup_sip_proxy(uri, valid_transports, tls_name=self.account.sip.tls_name).wait()
except DNSLookupError as e:
raise SubscriptionError('DNS lookup failed: %s' % e, retry_after=random.uniform(15, 30))
subscription_uri = SIPURI(user=subscription_uri.username, host=subscription_uri.domain)
content = self.content
timeout = time() + 30
for route in routes:
if Host.default_ip is None:
raise SubscriptionError('No IP address', retry_after=60)
remaining_time = timeout - time()
if remaining_time > 0:
try:
contact_uri = self.account.contact[NoGRUU, route]
except KeyError:
continue
subscription = Subscription(subscription_uri, FromHeader(self.account.uri, self.account.display_name),
ToHeader(subscription_uri),
ContactHeader(contact_uri),
self.event.encode(),
RouteHeader(route.uri),
credentials=self.account.credentials,
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(body=content.body, content_type=content.type, extra_headers=self.extra_headers, timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=subscription)
raise SubscriptionError('Internal error', retry_after=5)
self._subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPSubscriptionDidStart':
break
except SIPSubscriptionDidFail as e:
notification_center.remove_observer(self, sender=subscription)
self._subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
raise SubscriptionError('Authentication failed', retry_after=random.uniform(60, 120))
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > self.account.sip.subscribe_interval:
refresh_interval = e.data.min_expires
else:
refresh_interval = None
raise SubscriptionError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval)
elif e.data.code in (405, 406, 489):
raise SubscriptionError('Method or event not supported', retry_after=3600)
elif e.data.code == 1400:
raise SubscriptionError(e.data.reason, retry_after=3600)
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, reschedule the subscription
raise SubscriptionError('No more routes to try', retry_after=random.uniform(60, 180))
# At this point it is subscribed. Handle notifications and ending/failures.
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidStart', sender=self)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPSubscriptionGotNotify':
notification_center.post_notification(self.__nickname__ + 'SubscriptionGotNotify', sender=self, data=notification.data)
elif notification.name == 'SIPSubscriptionDidEnd':
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='remote'))
if self.active:
self._command_channel.send(Command('subscribe'))
break
except SIPSubscriptionDidFail:
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidFail', sender=self)
if self.active:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._subscription)
except InterruptSubscription as e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
notification_center.remove_observer(self, sender=self._subscription)
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
finally:
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='local'))
except TerminateSubscription as e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._subscription)
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='local'))
except SubscriptionError as e:
def subscribe(e):
if self.active:
self._command_channel.send(Command('subscribe', command.event, refresh_interval=e.refresh_interval))
self._subscription_timer = None
self._subscription_timer = reactor.callLater(e.retry_after, subscribe, e)
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidFail', sender=self)
finally:
self.subscribed = False
self._subscription = None
self._subscription_proc = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
if notification.sender is self._subscription:
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
if notification.sender is self._subscription:
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
if notification.sender is self._subscription:
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
if notification.sender is self._subscription:
self._data_channel.send(notification)
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._command_channel.send(Command('subscribe'))
class MWISubscriber(Subscriber):
"""Message Waiting Indicator subscriber"""
@property
def event(self):
return 'message-summary'
@property
def subscription_uri(self):
return self.account.message_summary.voicemail_uri or self.account.id
def _NH_MWISubscriberWillStart(self, notification):
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_MWISubscriberWillEnd(self, notification):
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_MWISubscriberDidStart(self, notification):
if self.account.message_summary.enabled:
self.activate()
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'message_summary.enabled' in notification.data.modified:
if self.account.message_summary.enabled:
self.activate()
else:
self.deactivate()
elif self.active and {'__id__', 'auth.password', 'auth.username', 'message_summary.voicemail_uri', 'sip.always_use_my_proxy', 'sip.outbound_proxy',
'sip.subscribe_interval', 'sip.transport_list'}.intersection(notification.data.modified):
self._command_channel.send(Command('subscribe'))
class AbstractPresenceSubscriber(Subscriber):
"""Abstract class defining behavior for all presence subscribers"""
__transports__ = frozenset(['tls', 'tcp'])
def _NH_AbstractPresenceSubscriberWillStart(self, notification):
notification.center.add_observer(self, name='SIPAccountDidDiscoverXCAPSupport', sender=self.account)
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_AbstractPresenceSubscriberWillEnd(self, notification):
notification.center.remove_observer(self, name='SIPAccountDidDiscoverXCAPSupport', sender=self.account)
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_AbstractPresenceSubscriberDidStart(self, notification):
if self.account.presence.enabled and self.account.xcap.discovered:
self.activate()
def _NH_SIPAccountDidDiscoverXCAPSupport(self, notification):
if self.account.presence.enabled and not self.active:
self.activate()
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started or not self.account.xcap.discovered:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'presence.enabled' in notification.data.modified:
if self.account.presence.enabled:
self.activate()
else:
self.deactivate()
elif self.active and {'__id__', 'auth.password', 'auth.username', 'sip.always_use_my_proxy', 'sip.outbound_proxy', 'sip.subscribe_interval', 'sip.transport_list'}.intersection(notification.data.modified):
self._command_channel.send(Command('subscribe'))
class PresenceWinfoSubscriber(AbstractPresenceSubscriber):
"""Presence Watcher Info subscriber"""
_NH_PresenceWinfoSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_PresenceWinfoSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_PresenceWinfoSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'presence.winfo'
class DialogWinfoSubscriber(AbstractPresenceSubscriber):
"""Dialog Watcher Info subscriber"""
_NH_DialogWinfoSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_DialogWinfoSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_DialogWinfoSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'dialog.winfo'
class PresenceSubscriber(AbstractPresenceSubscriber):
"""Presence subscriber"""
_NH_PresenceSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_PresenceSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_PresenceSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'presence'
@property
def subscription_uri(self):
return self.account.xcap_manager.rls_presence_uri
@property
def extra_headers(self):
return [Header('Supported', 'eventlist')]
class SelfPresenceSubscriber(AbstractPresenceSubscriber):
"""Self presence subscriber"""
_NH_SelfPresenceSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_SelfPresenceSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_SelfPresenceSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'presence'
@property
def subscription_uri(self):
return self.account.id
class DialogSubscriber(AbstractPresenceSubscriber):
"""Dialog subscriber"""
_NH_DialogSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_DialogSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_DialogSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'dialog'
@property
def subscription_uri(self):
return self.account.xcap_manager.rls_dialog_uri
@property
def extra_headers(self):
return [Header('Supported', 'eventlist')]
diff --git a/sipsimple/lookup.py b/sipsimple/lookup.py
index 27672da7..4093fb74 100644
--- a/sipsimple/lookup.py
+++ b/sipsimple/lookup.py
@@ -1,575 +1,577 @@
"""
Implements DNS lookups in the context of SIP, STUN and MSRP relay based
on RFC3263 and related standards. This can be used to determine the next
hop(s) and failover for routing of SIP messages and reservation of network
resources prior the starting of a SIP session.
"""
import re
from itertools import chain
from time import time
from urllib.parse import urlparse
# patch dns.entropy module which is not thread-safe
import dns
import sys
from functools import partial
from random import randint, randrange
dns.entropy = dns.__class__('dns.entropy')
dns.entropy.__file__ = dns.__file__.replace('__init__.py', 'entropy.py')
dns.entropy.__builtins__ = dns.__builtins__
dns.entropy.random_16 = partial(randrange, 2**16)
dns.entropy.between = randint
sys.modules['dns.entropy'] = dns.entropy
del partial, randint, randrange, sys
# replace standard select and socket modules with versions from eventlib
from eventlib import coros, proc
from eventlib.green import select
from eventlib.green import socket
import dns.name
import dns.resolver
import dns.query
dns.resolver.socket = socket
dns.query.socket = socket
dns.query.select = select
#TODO3: dnspython newer than 0.20 has a new API
if ('_set_polling_backend' in dir(dns.query)):
dns.query._set_polling_backend(dns.query._select_for)
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.decorator import decorator, preserve_signature
from application.python.types import Singleton
from dns import exception, rdatatype
from twisted.internet import reactor
from zope.interface import implementer
from sipsimple.core import Route
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, InterruptCommand, run_in_waitable_green_thread
def domain_iterator(domain):
"""
A generator which returns the domain and its parent domains.
"""
while domain not in ('.', ''):
yield domain
domain = (domain.split('.', 1)+[''])[1]
@decorator
def post_dns_lookup_notifications(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
notification_center = NotificationCenter()
try:
result = func(obj, *args, **kwargs)
except DNSLookupError as e:
notification_center.post_notification('DNSLookupDidFail', sender=obj, data=NotificationData(error=str(e)))
raise
else:
notification_center.post_notification('DNSLookupDidSucceed', sender=obj, data=NotificationData(result=result))
return result
return wrapper
class DNSLookupError(Exception):
"""
The error raised by DNSLookup when a lookup cannot be performed.
"""
class DNSCache(object):
"""
A simple DNS cache which uses twisted's timers to invalidate its expired
data.
"""
def __init__(self):
self.data = {}
def get(self, key):
return self.data.get(key, None)
def put(self, key, value):
expiration = value.expiration-time()
if expiration > 0:
self.data[key] = value
reactor.callLater(limit(expiration, max=3600), self.data.pop, key, None)
def flush(self, key=None):
if key is not None:
self.data.pop(key, None)
else:
self.data = {}
class InternalResolver(dns.resolver.Resolver):
def __init__(self, *args, **kw):
super(InternalResolver, self).__init__(*args, **kw)
if self.domain.to_text().endswith('local.'):
self.domain = dns.name.root
self.search = [item for item in self.search if not item.to_text().endswith('local.')]
class DNSResolver(dns.resolver.Resolver):
"""
The resolver used by DNSLookup.
The lifetime setting on it applies to all the queries made on this resolver.
Each time a query is performed, its duration is subtracted from the lifetime
value.
"""
def __init__(self):
dns.resolver.Resolver.__init__(self, configure=False)
dns_manager = DNSManager()
self.search = dns_manager.search
self.domain = dns_manager.domain
self.nameservers = dns_manager.nameservers
def query(self, *args, **kw):
start_time = time()
try:
return dns.resolver.Resolver.query(self, *args, **kw)
finally:
self.lifetime -= min(self.lifetime, time()-start_time)
class SRVResult(object):
"""
Internal object used to save the result of SRV queries.
"""
def __init__(self, priority, weight, port, address):
self.priority = priority
self.weight = weight
self.port = port
self.address = address
class NAPTRResult(object):
"""
Internal object used to save the result of NAPTR queries.
"""
def __init__(self, service, order, preference, priority, weight, port, address):
self.service = service
self.order = order
self.preference = preference
self.priority = priority
self.weight = weight
self.port = port
self.address = address
class DNSLookup(object):
cache = DNSCache()
@run_in_waitable_green_thread
@post_dns_lookup_notifications
def lookup_service(self, uri, service, timeout=3.0, lifetime=15.0):
"""
Performs an SRV query to determine the servers used for the specified
service from the domain in uri.host. If this fails and falling back is
supported, also performs an A query on uri.host, returning the default
port of the service along with the IP addresses in the answer.
The services supported are `stun' and 'msrprelay'.
The DNSLookupDidSucceed notification contains a result attribute which
is a list of (address, port) tuples. The DNSLookupDidFail notification
contains an error attribute describing the error encountered.
"""
service_srv_record_map = {"stun": ("_stun._udp", 3478, False),
"msrprelay": ("_msrps._tcp", 2855, True)}
log_context = dict(context='lookup_service', service=service, uri=uri)
try:
service_prefix, service_port, service_fallback = service_srv_record_map[service]
except KeyError:
raise DNSLookupError("Unknown service: %s" % service)
try:
# If the host part of the URI is an IP address, we will not do any lookup
if re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", uri.host.decode()):
return [(uri.host.decode(), uri.port or service_port)]
resolver = DNSResolver()
resolver.cache = self.cache
resolver.timeout = timeout
resolver.lifetime = lifetime
record_name = '%s.%s' % (service_prefix, uri.host.decode())
services = self._lookup_srv_records(resolver, [record_name], log_context=log_context)
if services[record_name]:
return [(result.address, result.port) for result in services[record_name]]
elif service_fallback:
addresses = self._lookup_a_records(resolver, [uri.host.decode()], log_context=log_context)
if addresses[uri.host.decode()]:
return [(addr, service_port) for addr in addresses[uri.host.decode()]]
except dns.resolver.Timeout:
raise DNSLookupError('Timeout in lookup for %s servers for domain %s' % (service, uri.host.decode()))
else:
raise DNSLookupError('No %s servers found for domain %s' % (service, uri.host.decode()))
@run_in_waitable_green_thread
@post_dns_lookup_notifications
- def lookup_sip_proxy(self, uri, supported_transports, timeout=3.0, lifetime=15.0):
+ def lookup_sip_proxy(self, uri, supported_transports, timeout=3.0, lifetime=15.0, tls_name=None):
"""
Performs an RFC 3263 compliant lookup of transport/ip/port combinations
for a particular SIP URI. As arguments it takes a SIPURI object
and a list of supported transports, in order of preference of the
application. It returns a list of Route objects that can be used in
order of preference.
The DNSLookupDidSucceed notification contains a result attribute which
is a list of Route objects. The DNSLookupDidFail notification contains
an error attribute describing the error encountered.
+
+ Set tls_name to the Common Name the server must present
"""
naptr_service_transport_map = {"sips+d2t": "tls",
"sip+d2t": "tcp",
"sip+d2u": "udp"}
transport_service_map = {"udp": "_sip._udp",
"tcp": "_sip._tcp",
"tls": "_sips._tcp"}
log_context = dict(context='lookup_sip_proxy', uri=uri)
if not supported_transports:
raise DNSLookupError("No transports are supported")
supported_transports = [transport.lower() for transport in supported_transports]
unknown_transports = set(supported_transports).difference(transport_service_map)
if unknown_transports:
raise DNSLookupError("Unknown transports: %s" % ', '.join(unknown_transports))
try:
# If the host part of the URI is an IP address, we will not do any lookup
transport = uri.transport.decode() if isinstance(uri.transport, bytes) else uri.transport
if re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", uri.host.decode()):
transport = 'tls' if uri.secure else transport.lower()
if transport not in supported_transports:
raise DNSLookupError("IP transport %s dictated by URI is not supported" % transport)
port = uri.port or (5061 if transport=='tls' else 5060)
route = [Route(address=uri.host, port=port, transport=transport)]
return route
resolver = DNSResolver()
resolver.cache = self.cache
resolver.timeout = timeout
resolver.lifetime = lifetime
# If the port is specified in the URI, we will only do an A lookup
if uri.port:
transport = 'tls' if uri.secure else transport.lower()
if transport not in supported_transports:
raise DNSLookupError("Host transport %s dictated by URI is not supported" % transport)
addresses = self._lookup_a_records(resolver, [uri.host.decode()], log_context=log_context)
if addresses[uri.host.decode()]:
- return [Route(address=addr, port=uri.port, transport=transport, tls_name=uri.host) for addr in addresses[uri.host.decode()]]
+ return [Route(address=addr, port=uri.port, transport=transport, tls_name=tls_name or uri.host) for addr in addresses[uri.host.decode()]]
# If the transport was already set as a parameter on the SIP URI, only do SRV lookups
elif 'transport' in uri.parameters:
transport = uri.parameters['transport'].lower()
if transport not in supported_transports:
raise DNSLookupError("Requested lookup for URI with %s transport, but it is not supported" % transport)
if uri.secure and transport != 'tls':
raise DNSLookupError("Requested lookup for SIPS URI, but with %s transport parameter" % transport)
record_name = '%s.%s' % (transport_service_map[transport], uri.host.decode())
services = self._lookup_srv_records(resolver, [record_name], log_context=log_context)
if services[record_name]:
- return [Route(address=result.address, port=result.port, transport=transport, tls_name=uri.host) for result in services[record_name]]
+ return [Route(address=result.address, port=result.port, transport=transport, tls_name=tls_name or uri.host) for result in services[record_name]]
else:
# If SRV lookup fails, try A lookup
addresses = self._lookup_a_records(resolver, [uri.host.decode()], log_context=log_context)
port = 5061 if transport=='tls' else 5060
if addresses[uri.host.decode()]:
- return [Route(address=addr, port=port, transport=transport, tls_name=uri.host) for addr in addresses[uri.host.decode()]]
+ return [Route(address=addr, port=port, transport=transport, tls_name=tls_name or uri.host) for addr in addresses[uri.host.decode()]]
# Otherwise, it means we don't have a numeric IP address, a port isn't specified and neither is a transport. So we have to do a full NAPTR lookup
else:
# If the URI is a SIPS URI, we only support the TLS transport.
if uri.secure:
if 'tls' not in supported_transports:
raise DNSLookupError("Requested lookup for SIPS URI, but TLS transport is not supported")
supported_transports = ['tls']
# First try NAPTR lookup
naptr_services = [service for service, transport in list(naptr_service_transport_map.items()) if transport in supported_transports]
try:
pointers = self._lookup_naptr_record(resolver, uri.host.decode(), naptr_services, log_context=log_context)
except dns.resolver.Timeout:
pointers = []
if pointers:
- return [Route(address=result.address, port=result.port, transport=naptr_service_transport_map[result.service], tls_name=uri.host) for result in pointers]
+ return [Route(address=result.address, port=result.port, transport=naptr_service_transport_map[result.service], tls_name=tls_name or uri.host) for result in pointers]
else:
# If that fails, try SRV lookup
routes = []
for transport in supported_transports:
record_name = '%s.%s' % (transport_service_map[transport], uri.host.decode())
try:
services = self._lookup_srv_records(resolver, [record_name], log_context=log_context)
except dns.resolver.Timeout:
continue
if services[record_name]:
- routes.extend(Route(address=result.address, port=result.port, transport=transport, tls_name=uri.host) for result in services[record_name])
+ routes.extend(Route(address=result.address, port=result.port, transport=transport, tls_name=tls_name or uri.host) for result in services[record_name])
if routes:
return routes
else:
# If SRV lookup fails, try A lookup
transport = 'tls' if uri.secure else 'udp'
if transport in supported_transports:
addresses = self._lookup_a_records(resolver, [uri.host.decode()], log_context=log_context)
port = 5061 if transport=='tls' else 5060
if addresses[uri.host.decode()]:
- return [Route(address=addr, port=port, transport=transport, tls_name=uri.host) for addr in addresses[uri.host.decode()]]
+ return [Route(address=addr, port=port, transport=transport, tls_name=tls_name or uri.host) for addr in addresses[uri.host.decode()]]
except dns.resolver.Timeout:
raise DNSLookupError("Timeout in lookup for routes for SIP URI %s" % uri)
else:
raise DNSLookupError("No routes found for SIP URI %s" % uri)
@run_in_waitable_green_thread
@post_dns_lookup_notifications
def lookup_xcap_server(self, uri, timeout=3.0, lifetime=15.0):
"""
Performs a TXT query against xcap.<uri.host> and returns all results
that look like HTTP URIs.
"""
log_context = dict(context='lookup_xcap_server', uri=uri)
notification_center = NotificationCenter()
try:
# If the host part of the URI is an IP address, we cannot not do any lookup
if re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", uri.host.decode()):
raise DNSLookupError("Cannot perform DNS query because the host is an IP address")
resolver = DNSResolver()
resolver.cache = self.cache
resolver.timeout = timeout
resolver.lifetime = lifetime
record_name = 'xcap.%s' % uri.host.decode()
results = []
try:
answer = resolver.query(record_name, rdatatype.TXT)
except dns.resolver.Timeout as e:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='TXT', query_name=str(record_name), nameservers=resolver.nameservers, answer=None, error=e, **log_context))
raise
except exception.DNSException as e:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='TXT', query_name=str(record_name), nameservers=resolver.nameservers, answer=None, error=e, **log_context))
else:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='TXT', query_name=str(record_name), nameservers=resolver.nameservers, answer=answer, error=None, **log_context))
for result_uri in list(chain(*(r.strings for r in answer.rrset))):
parsed_uri = urlparse(result_uri.decode())
if parsed_uri.scheme in ('http', 'https') and parsed_uri.netloc:
results.append(result_uri.decode())
if not results:
raise DNSLookupError('No XCAP servers found for domain %s' % uri.host.decode())
return results
except dns.resolver.Timeout:
raise DNSLookupError('Timeout in lookup for XCAP servers for domain %s' % uri.host.decode())
def _lookup_a_records(self, resolver, hostnames, additional_records=[], log_context={}):
notification_center = NotificationCenter()
additional_addresses = dict((rset.name.to_text(), rset) for rset in additional_records if rset.rdtype == rdatatype.A)
addresses = {}
for hostname in hostnames:
if hostname in additional_addresses:
addresses[hostname] = [r.address for r in additional_addresses[hostname]]
else:
try:
answer = resolver.query(hostname, rdatatype.A)
except dns.resolver.Timeout as e:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='A', query_name=str(hostname), nameservers=resolver.nameservers, answer=None, error=e, **log_context))
raise
except exception.DNSException as e:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='A', query_name=str(hostname), nameservers=resolver.nameservers, answer=None, error=e, **log_context))
addresses[hostname] = []
else:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='A', query_name=str(hostname), nameservers=resolver.nameservers, answer=answer, error=None, **log_context))
addresses[hostname] = [r.address for r in answer.rrset]
return addresses
def _lookup_srv_records(self, resolver, srv_names, additional_records=[], log_context={}):
notification_center = NotificationCenter()
additional_services = dict((rset.name.to_text(), rset) for rset in additional_records if rset.rdtype == rdatatype.SRV)
services = {}
for srv_name in srv_names:
services[srv_name] = []
if srv_name in additional_services:
addresses = self._lookup_a_records(resolver, [r.target.to_text() for r in additional_services[srv_name]], additional_records)
for record in additional_services[srv_name]:
services[srv_name].extend(SRVResult(record.priority, record.weight, record.port, addr) for addr in addresses.get(record.target.to_text(), ()))
else:
try:
answer = resolver.query(srv_name, rdatatype.SRV)
except dns.resolver.Timeout as e:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='SRV', query_name=str(srv_name), nameservers=resolver.nameservers, answer=None, error=e, **log_context))
raise
except exception.DNSException as e:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='SRV', query_name=str(srv_name), nameservers=resolver.nameservers, answer=None, error=e, **log_context))
else:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='SRV', query_name=str(srv_name), nameservers=resolver.nameservers, answer=answer, error=None, **log_context))
addresses = self._lookup_a_records(resolver, [r.target.to_text() for r in answer.rrset], answer.response.additional, log_context)
for record in answer.rrset:
services[srv_name].extend(SRVResult(record.priority, record.weight, record.port, addr) for addr in addresses.get(record.target.to_text(), ()))
services[srv_name].sort(key=lambda result: (result.priority, -result.weight))
return services
def _lookup_naptr_record(self, resolver, domain, services, log_context={}):
notification_center = NotificationCenter()
pointers = []
try:
answer = resolver.query(domain, rdatatype.NAPTR)
except dns.resolver.Timeout as e:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='NAPTR', query_name=str(domain), nameservers=resolver.nameservers, answer=None, error=e, **log_context))
raise
except exception.DNSException as e:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='NAPTR', query_name=str(domain), nameservers=resolver.nameservers, answer=None, error=e, **log_context))
else:
notification_center.post_notification('DNSLookupTrace', sender=self, data=NotificationData(query_type='NAPTR', query_name=str(domain), nameservers=resolver.nameservers, answer=answer, error=None, **log_context))
records = [r for r in answer.rrset if r.service.decode().lower() in services]
services = self._lookup_srv_records(resolver, [r.replacement.to_text() for r in records], answer.response.additional, log_context)
for record in records:
pointers.extend(NAPTRResult(record.service.decode().lower(), record.order, record.preference, r.priority, r.weight, r.port, r.address) for r in services.get(record.replacement.to_text(), ()))
pointers.sort(key=lambda result: (result.order, result.preference))
return pointers
@implementer(IObserver)
class DNSManager(object, metaclass=Singleton):
def __init__(self):
try:
default_resolver = InternalResolver()
except dns.resolver.NoResolverConfiguration:
default_resolver = Null
self.search = default_resolver.search
self.domain = default_resolver.domain
self.google_nameservers = ['8.8.8.8', '8.8.4.4']
self.nameservers = default_resolver.nameservers or []
self.probed_domain = 'sip2sip.info.'
self._channel = coros.queue()
self._proc = None
self._timer = None
self._wakeup_timer = None
notification_center = NotificationCenter()
notification_center.add_observer(self, name='SystemIPAddressDidChange')
notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
@property
def nameservers(self):
return self.__dict__['nameservers']
@nameservers.setter
def nameservers(self, value):
old_value = self.__dict__.get('nameservers', Null)
self.__dict__['nameservers'] = value
if old_value is Null:
NotificationCenter().post_notification('DNSResolverDidInitialize', sender=self, data=NotificationData(nameservers=value))
elif value != old_value:
NotificationCenter().post_notification('DNSNameserversDidChange', sender=self, data=NotificationData(nameservers=value))
def start(self):
self._proc = proc.spawn(self._run)
self._channel.send(Command('probe_dns'))
def stop(self):
if self._proc is not None:
self._proc.kill()
self._proc = None
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
if self._wakeup_timer is not None and self._wakeup_timer.active():
self._wakeup_timer.cancel()
self._wakeup_timer = None
def _run(self):
while True:
try:
command = self._channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
except InterruptCommand:
pass
def _CH_probe_dns(self, command):
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
try:
resolver = InternalResolver()
except dns.resolver.NoResolverConfiguration as e:
self._timer = reactor.callLater(15, self._channel.send, Command('probe_dns'))
return
self.domain = resolver.domain
self.search = resolver.search
local_nameservers = resolver.nameservers
# probe local resolver
resolver.timeout = 1
resolver.lifetime = 3
try:
answer = resolver.query(self.probed_domain, rdatatype.NAPTR)
if not any(record.rdtype == rdatatype.NAPTR for record in answer.rrset):
raise exception.DNSException("No NAPTR records found")
answer = resolver.query("_sip._udp.%s" % self.probed_domain, rdatatype.SRV)
if not any(record.rdtype == rdatatype.SRV for record in answer.rrset):
raise exception.DNSException("No SRV records found")
except (dns.resolver.Timeout, exception.DNSException):
pass
else:
self.nameservers = resolver.nameservers
return
# local resolver failed. probe google resolver
resolver.nameservers = self.google_nameservers
resolver.timeout = 2
resolver.lifetime = 4
try:
answer = resolver.query(self.probed_domain, rdatatype.NAPTR)
if not any(record.rdtype == rdatatype.NAPTR for record in answer.rrset):
raise exception.DNSException("No NAPTR records found")
except (dns.resolver.Timeout, exception.DNSException):
pass
else:
self.nameservers = resolver.nameservers
return
# google resolver failed. fallback to local resolver and schedule another probe for later
self.nameservers = local_nameservers
self._timer = reactor.callLater(15, self._channel.send, Command('probe_dns'))
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SystemIPAddressDidChange(self, notification):
self._proc.kill(InterruptCommand)
self._channel.send(Command('probe_dns'))
def _NH_SystemDidWakeUpFromSleep(self, notification):
if self._wakeup_timer is None:
def wakeup_action():
self._proc.kill(InterruptCommand)
self._channel.send(Command('probe_dns'))
self._wakeup_timer = None
self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
diff --git a/sipsimple/session.py b/sipsimple/session.py
index b4427239..0e3ebbc2 100644
--- a/sipsimple/session.py
+++ b/sipsimple/session.py
@@ -1,2751 +1,2754 @@
"""
Implements an asynchronous notification based mechanism for
establishment, modification and termination of sessions using Session
Initiation Protocol (SIP) standardized in RFC3261.
"""
__all__ = ['Session', 'SessionManager']
import random
from threading import RLock
from time import time
from application.notification import IObserver, Notification, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.decorator import decorator, preserve_signature
from application.python.types import Singleton
from application.system import host
from eventlib import api, coros, proc
from twisted.internet import reactor
from zope.interface import implementer
from sipsimple import log
from sipsimple.account import AccountManager, BonjourAccount
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import DialogID, Engine, Invitation, Referral, Subscription, PJSIPError, SIPCoreError, SIPCoreInvalidStateError, SIPURI, sip_status_messages, sipfrag_re
from sipsimple.core import ContactHeader, FromHeader, Header, ReasonHeader, ReferToHeader, ReplacesHeader, RouteHeader, ToHeader, WarningHeader
from sipsimple.core import SDPConnection, SDPMediaStream, SDPSession
from sipsimple.core import PublicGRUU, PublicGRUUIfAvailable, NoGRUU
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import ParserError
from sipsimple.payloads.conference import ConferenceDocument
from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
from sipsimple.util import ISOTimestamp
class InvitationDisconnectedError(Exception):
def __init__(self, invitation, data):
self.invitation = invitation
self.data = data
class MediaStreamDidNotInitializeError(Exception):
def __init__(self, stream, data):
self.stream = stream
self.data = data
class MediaStreamDidFailError(Exception):
def __init__(self, stream, data):
self.stream = stream
self.data = data
class SubscriptionError(Exception):
def __init__(self, error, timeout, **attributes):
self.error = error
self.timeout = timeout
self.attributes = attributes
class SIPSubscriptionDidFail(Exception):
def __init__(self, data):
self.data = data
class InterruptSubscription(Exception):
pass
class TerminateSubscription(Exception):
pass
class ReferralError(Exception):
def __init__(self, error, code=0):
self.error = error
self.code = code
class TerminateReferral(Exception):
pass
class SIPReferralDidFail(Exception):
def __init__(self, data):
self.data = data
class IllegalStateError(RuntimeError):
pass
class IllegalDirectionError(RuntimeError):
pass
class SIPInvitationTransferDidFail(Exception):
def __init__(self, data):
self.data = data
@decorator
def transition_state(required_state, new_state):
def state_transitioner(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
with obj._lock:
if obj.state != required_state:
raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state))
obj.state = new_state
return func(obj, *args, **kwargs)
return wrapper
return state_transitioner
@decorator
def check_state(required_states):
def state_checker(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
if obj.state not in required_states:
raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state))
return func(obj, *args, **kwargs)
return wrapper
return state_checker
@decorator
def check_transfer_state(direction, state):
def state_checker(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
if obj.transfer_handler.direction != direction:
raise IllegalDirectionError('cannot transfer in %s direction' % obj.transfer_handler.direction)
if obj.transfer_handler.state != state:
raise IllegalStateError('cannot transfer in %s state' % obj.transfer_handler.state)
return func(obj, *args, **kwargs)
return wrapper
return state_checker
class AddParticipantOperation(object):
pass
class RemoveParticipantOperation(object):
pass
@implementer(IObserver)
class ReferralHandler(object):
def __init__(self, session, participant_uri, operation):
self.participant_uri = participant_uri
if not isinstance(self.participant_uri, SIPURI):
if not self.participant_uri.startswith(('sip:', 'sips:')):
self.participant_uri = 'sip:%s' % self.participant_uri
try:
self.participant_uri = SIPURI.parse(self.participant_uri)
except SIPCoreError:
notification_center = NotificationCenter()
if operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI'))
else:
notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI'))
return
self.session = session
self.operation = operation
self.active = False
self.route = None
self._channel = coros.queue()
self._referral = None
def start(self):
notification_center = NotificationCenter()
if not self.session.remote_focus:
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus'))
else:
notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus'))
self.session = None
return
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, name='NetworkConditionsDidChange')
proc.spawn(self._run)
def _run(self):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
try:
# Lookup routes
account = self.session.account
if account is BonjourAccount():
uri = SIPURI.new(self.session._invitation.remote_contact_header.uri)
elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = SIPURI.new(self.session.remote_identity.uri)
lookup = DNSLookup()
try:
- routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
+ tls_name = account.sip.tls_name if account is not BonjourAccount() else None
+ routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list, tls_name=tls_name).wait()
except DNSLookupError as e:
timeout = random.uniform(15, 30)
raise ReferralError(error='DNS lookup failed: %s' % e)
target_uri = SIPURI.new(self.session.remote_identity.uri)
timeout = time() + 30
for route in routes:
self.route = route
remaining_time = timeout - time()
if remaining_time > 0:
try:
contact_uri = account.contact[NoGRUU, route]
except KeyError:
continue
refer_to_header = ReferToHeader(str(self.participant_uri))
refer_to_header.parameters['method'] = 'INVITE' if self.operation is AddParticipantOperation else 'BYE'
referral = Referral(target_uri, FromHeader(account.uri, account.display_name),
ToHeader(target_uri),
refer_to_header,
ContactHeader(contact_uri),
RouteHeader(route.uri),
account.credentials)
notification_center.add_observer(self, sender=referral)
try:
referral.send_refer(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=referral)
timeout = 5
raise ReferralError(error='Internal error')
self._referral = referral
try:
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidStart':
break
except SIPReferralDidFail as e:
notification_center.remove_observer(self, sender=referral)
self._referral = None
if e.data.code in (403, 405):
raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code)
else:
# Otherwise just try the next route
continue
else:
break
else:
self.route = None
raise ReferralError(error='No more routes to try')
# At this point it is subscribed. Handle notifications and ending/failures.
try:
self.active = True
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralGotNotify':
if notification.data.event == 'refer' and notification.data.body:
match = sipfrag_re.match(notification.data.body)
if match:
code = int(match.group('code'))
reason = match.group('reason')
if code/100 > 2:
continue
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceGotAddParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason))
else:
notification_center.post_notification('SIPConferenceGotRemoveParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason))
elif notification.name == 'SIPReferralDidEnd':
break
except SIPReferralDidFail as e:
notification_center.remove_observer(self, sender=self._referral)
raise ReferralError(error=e.data.reason, code=e.data.code)
else:
notification_center.remove_observer(self, sender=self._referral)
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri))
else:
notification_center.post_notification('SIPConferenceDidRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri))
finally:
self.active = False
except TerminateReferral:
if self._referral is not None:
try:
self._referral.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidEnd':
break
except SIPReferralDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._referral)
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error'))
else:
notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error'))
except ReferralError as e:
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error))
else:
notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error))
finally:
notification_center.remove_observer(self, sender=self.session)
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self.session = None
self._referral = None
def _refresh(self):
try:
contact_header = ContactHeader(self.session.account.contact[NoGRUU, self.route])
except KeyError:
pass
else:
try:
self._referral.refresh(contact_header=contact_header, timeout=2)
except (SIPCoreError, SIPCoreInvalidStateError):
pass
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPReferralDidStart(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidEnd(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidFail(self, notification):
self._channel.send_exception(SIPReferralDidFail(notification.data))
def _NH_SIPReferralGotNotify(self, notification):
self._channel.send(notification)
def _NH_SIPSessionDidFail(self, notification):
self._channel.send_exception(TerminateReferral())
def _NH_SIPSessionWillEnd(self, notification):
self._channel.send_exception(TerminateReferral())
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._refresh()
@implementer(IObserver)
class ConferenceHandler(object):
def __init__(self, session):
self.session = session
self.active = False
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._subscription = None
self._subscription_proc = None
self._subscription_timer = None
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
@run_in_green_thread
def add_participant(self, participant_uri):
referral_handler = ReferralHandler(self.session, participant_uri, AddParticipantOperation)
referral_handler.start()
@run_in_green_thread
def remove_participant(self, participant_uri):
referral_handler = ReferralHandler(self.session, participant_uri, RemoveParticipantOperation)
referral_handler.start()
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _activate(self):
self.active = True
command = Command('subscribe')
self._command_channel.send(command)
return command
def _deactivate(self):
self.active = False
command = Command('unsubscribe')
self._command_channel.send(command)
return command
def _resubscribe(self):
command = Command('subscribe')
self._command_channel.send(command)
return command
def _terminate(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.session)
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self._deactivate()
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self.session = None
def _CH_subscribe(self, command):
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._subscription_proc = proc.spawn(self._subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._subscription_proc = None
command.signal()
def _CH_terminate(self, command):
command.signal()
raise proc.ProcExit()
def _subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
try:
# Lookup routes
account = self.session.account
if account is BonjourAccount():
uri = SIPURI.new(self.session._invitation.remote_contact_header.uri)
elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = SIPURI.new(self.session.remote_identity.uri)
lookup = DNSLookup()
try:
- routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
+ tls_name = account.sip.tls_name if account is not BonjourAccount() else None
+ routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list, tls_name=tls_name).wait()
except DNSLookupError as e:
timeout = random.uniform(15, 30)
raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout)
target_uri = SIPURI.new(self.session.remote_identity.uri)
default_interval = 600 if account is BonjourAccount() else account.sip.subscribe_interval
refresh_interval = getattr(command, 'refresh_interval', default_interval)
timeout = time() + 30
for route in routes:
remaining_time = timeout - time()
if remaining_time > 0:
try:
contact_uri = account.contact[NoGRUU, route]
except KeyError:
continue
subscription = Subscription(target_uri, FromHeader(account.uri, account.display_name),
ToHeader(target_uri),
ContactHeader(contact_uri),
b'conference',
RouteHeader(route.uri),
credentials=account.credentials,
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError as e:
notification_center.remove_observer(self, sender=subscription)
timeout = 5
raise SubscriptionError(error='Internal error %s' % str(e), timeout=timeout)
self._subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.sender is subscription:
break
except SIPSubscriptionDidFail as e:
notification_center.remove_observer(self, sender=subscription)
self._subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
timeout = random.uniform(60, 120)
raise SubscriptionError(error='Authentication failed', timeout=timeout)
elif e.data.code == 423:
# Get the value of the Min-Expires header
timeout = random.uniform(60, 120)
if e.data.min_expires is not None and e.data.min_expires > refresh_interval:
raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires)
else:
raise SubscriptionError(error='Interval too short', timeout=timeout)
elif e.data.code in (405, 406, 489, 1400):
command.signal(e)
return
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, reschedule the subscription
timeout = random.uniform(60, 180)
raise SubscriptionError(error='No more routes to try', timeout=timeout)
# At this point it is subscribed. Handle notifications and ending/failures.
try:
while True:
notification = self._data_channel.wait()
if notification.sender is not self._subscription:
continue
if notification.name == 'SIPSubscriptionGotNotify':
if notification.data.event == 'conference' and notification.data.body:
try:
conference_info = ConferenceDocument.parse(notification.data.body)
except ParserError:
pass
else:
notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info))
elif notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._subscription)
except InterruptSubscription as e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
notification_center.remove_observer(self, sender=self._subscription)
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
except TerminateSubscription as e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._subscription)
except SubscriptionError as e:
if 'min_expires' in e.attributes:
command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires'])
else:
command = Command('subscribe', command.event)
self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command)
finally:
self.subscribed = False
self._subscription = None
self._subscription_proc = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
self._data_channel.send(notification)
def _NH_SIPSessionDidStart(self, notification):
if self.session.remote_focus:
self._activate()
@run_in_green_thread
def _NH_SIPSessionDidFail(self, notification):
self._terminate()
@run_in_green_thread
def _NH_SIPSessionDidEnd(self, notification):
self._terminate()
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
if self.session.remote_focus and not self.active:
self._activate()
elif not self.session.remote_focus and self.active:
self._deactivate()
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._resubscribe()
class TransferInfo(object):
def __init__(self, referred_by=None, replaced_dialog_id=None):
self.referred_by = referred_by
self.replaced_dialog_id = replaced_dialog_id
@implementer(IObserver)
class TransferHandler(object):
def __init__(self, session):
self.state = None
self.direction = None
self.new_session = None
self.session = session
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.session._invitation)
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._proc = proc.spawn(self._run)
self.completed = False
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
self.direction = None
self.state = None
def _CH_incoming_transfer(self, command):
self.direction = 'incoming'
notification_center = NotificationCenter()
refer_to_hdr = command.data.headers.get('Refer-To')
target = SIPURI.parse(refer_to_hdr.uri)
referred_by_hdr = command.data.headers.get('Referred-By', None)
if referred_by_hdr is not None:
origin = referred_by_hdr.body
else:
origin = str(self.session.remote_identity.uri)
try:
while True:
try:
notification = self._data_channel.wait()
except SIPInvitationTransferDidFail:
self.state = 'failed'
return
else:
if notification.name == 'SIPInvitationTransferDidStart':
self.state = 'starting'
refer_to_uri = SIPURI.new(target)
refer_to_uri.headers = {}
refer_to_uri.parameters = {}
notification_center.post_notification('SIPSessionTransferNewIncoming', self.session, NotificationData(transfer_destination=refer_to_uri))
elif notification.name == 'SIPSessionTransferDidStart':
break
elif notification.name == 'SIPSessionTransferDidFail':
self.state = 'failed'
try:
self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason)
except SIPCoreError:
return
while True:
try:
notification = self._data_channel.wait()
except SIPInvitationTransferDidFail:
return
self.state = 'started'
transfer_info = TransferInfo(referred_by=origin)
try:
replaces_hdr = target.headers.pop('Replaces')
call_id, rest = replaces_hdr.split(';', 1)
params = dict((item.split('=') for item in rest.split(';')))
to_tag = params.get('to-tag')
from_tag = params.get('from-tag')
except (KeyError, ValueError):
pass
else:
transfer_info.replaced_dialog_id = DialogID(call_id, local_tag=from_tag, remote_tag=to_tag)
settings = SIPSimpleSettings()
account = self.session.account
if account is BonjourAccount():
uri = target
elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = target
lookup = DNSLookup()
try:
- routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
+ tls_name = account.sip.tls_name if account is not BonjourAccount() else None
+ routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list, tls_name=tls_name).wait()
except DNSLookupError as e:
self.state = 'failed'
notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=0, reason="DNS lookup failed: {}".format(e)))
try:
self.session._invitation.notify_transfer_progress(480)
except SIPCoreError:
return
while True:
try:
self._data_channel.wait()
except SIPInvitationTransferDidFail:
return
return
self.new_session = Session(account)
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.new_session)
self.new_session.connect(ToHeader(target), routes=routes, streams=[MediaStreamRegistry.AudioStream()], transfer_info=transfer_info)
while True:
try:
notification = self._data_channel.wait()
except SIPInvitationTransferDidFail:
return
if notification.name == 'SIPInvitationTransferDidEnd':
return
except proc.ProcExit:
if self.new_session is not None:
notification_center.remove_observer(self, sender=self.new_session)
self.new_session = None
raise
def _CH_outgoing_transfer(self, command):
self.direction = 'outgoing'
notification_center = NotificationCenter()
self.completed = False
self.state = 'starting'
while True:
try:
notification = self._data_channel.wait()
except SIPInvitationTransferDidFail as e:
self.state = None
self.direction = None
notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=e.data.code, reason=e.data.reason))
return
if notification.name == 'SIPInvitationTransferDidStart':
self.state = 'started'
notification_center.post_notification('SIPSessionTransferDidStart', sender=self.session)
elif notification.name == 'SIPInvitationTransferDidEnd':
self.state = None
self.direction = None
self.session.end()
notification_center.post_notification('SIPSessionTransferDidEnd', sender=self.session)
return
def _terminate(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.session._invitation)
notification_center.remove_observer(self, sender=self.session)
self._proc.kill()
self._proc = None
self._command_channel = None
self._data_channel = None
self.session = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPInvitationTransferNewIncoming(self, notification):
self._command_channel.send(Command('incoming_transfer', data=notification.data))
def _NH_SIPInvitationTransferNewOutgoing(self, notification):
self._command_channel.send(Command('outgoing_transfer', data=notification.data))
def _NH_SIPInvitationTransferDidStart(self, notification):
self._data_channel.send(notification)
def _NH_SIPInvitationTransferDidFail(self, notification):
self.direction = None
if not self.completed:
self._data_channel.send_exception(SIPInvitationTransferDidFail(notification.data))
def _NH_SIPInvitationTransferDidEnd(self, notification):
self.direction = None
self._data_channel.send(notification)
def _NH_SIPInvitationTransferGotNotify(self, notification):
if notification.data.event == 'refer' and notification.data.body:
match = sipfrag_re.match(notification.data.body)
if match:
code = int(match.group('code'))
reason = match.group('reason')
notification.center.post_notification('SIPSessionTransferGotProgress', sender=self.session, data=NotificationData(code=code, reason=reason))
if code == 200:
self.completed = True
self.direction = None
self.state = None
notification.center.post_notification('SIPSessionTransferDidEnd', sender=self.session)
elif code >= 400:
self.state = None
self.direction = None
notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=code, reason=reason))
def _NH_SIPSessionTransferDidStart(self, notification):
if notification.sender is self.session and self.state == 'starting':
self._data_channel.send(notification)
def _NH_SIPSessionTransferDidFail(self, notification):
if notification.sender is self.session and self.state == 'starting':
self._data_channel.send(notification)
def _NH_SIPSessionGotRingIndication(self, notification):
if notification.sender is self.new_session and self.session is not None:
try:
self.session._invitation.notify_transfer_progress(180)
except SIPCoreError:
pass
def _NH_SIPSessionGotProvisionalResponse(self, notification):
if notification.sender is self.new_session and self.session is not None:
try:
self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason)
except SIPCoreError:
pass
def _NH_SIPSessionDidStart(self, notification):
if notification.sender is self.new_session:
notification.center.remove_observer(self, sender=notification.sender)
self.new_session = None
if self.session is not None:
notification.center.post_notification('SIPSessionTransferDidEnd', sender=self.session)
if self.state == 'started':
try:
self.session._invitation.notify_transfer_progress(200)
except SIPCoreError:
pass
self.state = 'ended'
self.session.end()
def _NH_SIPSessionDidEnd(self, notification):
if notification.sender is self.new_session:
# If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead
notification.center.remove_observer(self, sender=notification.sender)
self.new_session = None
if self.session is not None:
notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=500, reason='internal error'))
if self.state == 'started':
try:
self.session._invitation.notify_transfer_progress(500)
except SIPCoreError:
pass
self.state = 'failed'
else:
self._terminate()
def _NH_SIPSessionDidFail(self, notification):
if notification.sender is self.new_session:
notification.center.remove_observer(self, sender=notification.sender)
self.new_session = None
if self.session is not None:
notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=notification.data.code or 500, reason=notification.data.reason))
if self.state == 'started':
try:
self.session._invitation.notify_transfer_progress(notification.data.code or 500, notification.data.reason)
except SIPCoreError:
pass
self.state = 'failed'
else:
self._terminate()
class OptionalTag(str):
def __eq__(self, other):
return other is None or super(OptionalTag, self).__eq__(other)
def __ne__(self, other):
return not self == other
def __repr__(self):
return '{}({})'.format(self.__class__.__name__, super(OptionalTag, self).__repr__())
@implementer(IObserver)
class SessionReplaceHandler(object):
def __init__(self, session):
self.session = session
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.session.replaced_session)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
notification.center.remove_observer(self, sender=self.session)
notification.center.remove_observer(self, sender=self.session.replaced_session)
self.session.replaced_session.end()
self.session.replaced_session = None
self.session = None
def _NH_SIPSessionDidFail(self, notification):
if notification.sender is self.session:
notification.center.remove_observer(self, sender=self.session)
notification.center.remove_observer(self, sender=self.session.replaced_session)
self.session.replaced_session = None
self.session = None
_NH_SIPSessionDidEnd = _NH_SIPSessionDidFail
@implementer(IObserver)
class Session(object):
media_stream_timeout = 15
short_reinvite_timeout = 5
def __init__(self, account):
self.account = account
self.direction = None
self.end_time = None
self.on_hold = False
self.proposed_streams = None
self.route = None
self.state = None
self.start_time = None
self.streams = None
self.transport = None
self.local_focus = False
self.remote_focus = False
self.greenlet = None
self.conference = None
self.replaced_session = None
self.transfer_handler = None
self.transfer_info = None
self._channel = coros.queue()
self._hold_in_progress = False
self._invitation = None
self._local_identity = None
self._remote_identity = None
self._lock = RLock()
def init_incoming(self, invitation, data):
notification_center = NotificationCenter()
remote_sdp = invitation.sdp.proposed_remote
self.proposed_streams = []
if remote_sdp:
for index, media_stream in enumerate(remote_sdp.media):
if media_stream.port != 0:
for stream_type in MediaStreamRegistry:
try:
stream = stream_type.new_from_sdp(self, remote_sdp, index)
except UnknownStreamError as e:
continue
except InvalidStreamError as e:
log.error("Invalid stream: {}".format(e))
break
except Exception as e:
log.exception("Exception occurred while setting up stream from SDP: {}".format(e))
break
else:
stream.index = index
self.proposed_streams.append(stream)
break
self.direction = 'incoming'
self.state = 'incoming'
self.transport = invitation.transport.lower()
self._invitation = invitation
self.conference = ConferenceHandler(self)
self.transfer_handler = TransferHandler(self)
if 'isfocus' in invitation.remote_contact_header.parameters:
self.remote_focus = True
if 'Referred-By' in data.headers or 'Replaces' in data.headers:
self.transfer_info = TransferInfo()
if 'Referred-By' in data.headers:
self.transfer_info.referred_by = data.headers['Referred-By'].body
if 'Replaces' in data.headers:
replaces_header = data.headers.get('Replaces')
# Because we only allow the remote tag to be optional, it can only match established dialogs and early outgoing dialogs, but not early incoming dialogs,
# which according to RFC3891 should be rejected with 481 (which will happen automatically by never matching them).
if replaces_header.early_only or replaces_header.from_tag == '0':
replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=OptionalTag(replaces_header.from_tag))
else:
replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=replaces_header.from_tag)
session_manager = SessionManager()
try:
replaced_session = next(session for session in session_manager.sessions if session.dialog_id == replaced_dialog_id)
except StopIteration:
invitation.send_response(481)
return
else:
# Any matched dialog at this point is either established, terminated or early outgoing.
if replaced_session.state in ('terminating', 'terminated'):
invitation.send_response(603)
return
elif replaced_session.dialog_id.remote_tag is not None and replaces_header.early_only: # The replaced dialog is established, but the early-only flag is set
invitation.send_response(486)
return
self.replaced_session = replaced_session
self.transfer_info.replaced_dialog_id = replaced_dialog_id
replace_handler = SessionReplaceHandler(self)
replace_handler.start()
notification_center.add_observer(self, sender=invitation)
notification_center.post_notification('SIPSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams[:], headers=data.headers))
@transition_state(None, 'connecting')
@run_in_green_thread
def connect(self, to_header, routes, streams, is_focus=False, transfer_info=None, extra_headers=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
connected = False
received_code = 0
received_reason = None
unhandled_notifications = []
extra_headers = extra_headers or []
if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers):
raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed')
self.direction = 'outgoing'
self.proposed_streams = streams
self.route = routes[0]
self.transport = self.route.transport.lower()
self.local_focus = is_focus
self._invitation = Invitation()
self._local_identity = FromHeader(self.account.uri, self.account.display_name)
self._remote_identity = to_header
self.conference = ConferenceHandler(self)
self.transfer_handler = TransferHandler(self)
self.transfer_info = transfer_info
notification_center.add_observer(self, sender=self._invitation)
notification_center.post_notification('SIPSessionNewOutgoing', sender=self, data=NotificationData(streams=streams[:]))
for stream in self.proposed_streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
try:
contact_uri = self.account.contact[PublicGRUUIfAvailable, self.route]
local_ip = host.outgoing_ip_for(self.route.address)
if local_ip is None:
raise ValueError("could not get outgoing IP address")
except (KeyError, ValueError) as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e))
return
connection = SDPConnection(local_ip.encode())
local_sdp = SDPSession(local_ip.encode(), name=settings.user_agent.encode())
for index, stream in enumerate(self.proposed_streams):
stream.index = index
media = stream.get_local_media(remote_sdp=None, index=index)
if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates):
media.connection = connection
local_sdp.media.append(media)
from_header = FromHeader(self.account.uri, self.account.display_name)
route_header = RouteHeader(self.route.uri)
contact_header = ContactHeader(contact_uri)
if is_focus:
contact_header.parameters['isfocus'] = None
if self.transfer_info is not None:
if self.transfer_info.referred_by is not None:
extra_headers.append(Header('Referred-By', self.transfer_info.referred_by))
if self.transfer_info.replaced_dialog_id is not None:
dialog_id = self.transfer_info.replaced_dialog_id
extra_headers.append(ReplacesHeader(dialog_id.call_id, dialog_id.local_tag, dialog_id.remote_tag))
self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, self.account.credentials, extra_headers)
try:
with api.timeout(settings.sip.invite_timeout):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
break
else:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self)
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connecting':
received_code = notification.data.code
received_reason = notification.data.reason
elif notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason))
else:
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except api.TimeoutError:
self.end()
return
notification_center.post_notification('SIPSessionWillStart', sender=self)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map[index]
if remote_media.port:
# TODO: check if port is also 0 in local_sdp. In that case PJSIP disabled the stream because
# negotiation failed. If there are more streams, however, the negotiation is considered successful as a
# whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind io
# OK, but we cannot really start the stream. -Saul
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
invitation_notifications = []
with api.timeout(self.media_stream_timeout):
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
invitation_notifications.append(notification)
for notification in invitation_notifications:
self._channel.send(notification)
while not connected or self._channel:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self)
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connecting':
received_code = notification.data.code
received_reason = notification.data.reason
elif notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason))
else:
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed-out while starting'
elif isinstance(e, MediaStreamDidNotInitializeError):
error = 'media stream did not initialize: %s' % e.data.reason
else:
error = '%s media stream failed: %s' % (e.stream.type, e.data.reason)
self._fail(originator='local', code=0, reason=None, error=error)
except InvitationDisconnectedError as e:
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
# As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE
if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE':
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator))
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200]))
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason))
else:
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason))
code = e.data.code
reason = e.data.reason
elif e.data.disconnect_reason == 'timeout':
code = 408
reason = 'timeout'
else:
# TODO: we should know *exactly* when there are set -Saul
code = getattr(e.data, 'code', 0)
reason = getattr(e.data, 'reason', 'Session disconnected')
if e.data.originator == 'remote' and code // 100 == 3:
redirect_identities = e.data.headers.get('Contact', [])
else:
redirect_identities = None
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities))
self.greenlet = None
except SIPCoreError as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = ISOTimestamp.now()
any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in self.streams)
if any_stream_ice:
self._reinvite_after_ice()
notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
def _reinvite_after_ice(self):
# This function does not do any error checking, it's designed to be called at the end of connect and add_stream
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for index, stream in enumerate(self.streams):
local_sdp.media[index] = stream.get_local_media(remote_sdp=None, index=index)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
try:
with api.timeout(self.short_reinvite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for index, stream in enumerate(self.streams):
stream.update(local_sdp, remote_sdp, index)
else:
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'disconnected':
self.end()
return
except Exception:
pass
finally:
self.state = 'connected'
self.greenlet = None
@check_state(['incoming', 'received_proposal'])
@run_in_green_thread
def send_ring_indication(self):
try:
self._invitation.send_response(180)
except SIPCoreInvalidStateError:
pass # The INVITE session might have already been cancelled; ignore the error
@transition_state('incoming', 'accepting')
@run_in_green_thread
def accept(self, streams, is_focus=False, extra_headers=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
self.local_focus = is_focus
connected = False
unhandled_notifications = []
extra_headers = extra_headers or []
if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers):
raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed')
if self.proposed_streams:
for stream in self.proposed_streams:
if stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
else:
for index, stream in enumerate(streams):
notification_center.add_observer(self, sender=stream)
stream.index = index
stream.initialize(self, direction='outgoing')
self.proposed_streams = streams
wait_count = len(self.proposed_streams)
try:
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
remote_sdp = self._invitation.sdp.proposed_remote
sdp_connection = remote_sdp.connection or next((media.connection for media in remote_sdp.media if media.connection is not None))
sdp_ip = sdp_connection.address.decode() if isinstance(sdp_connection.address, bytes) else sdp_connection.address
local_ip = host.outgoing_ip_for(sdp_ip) if sdp_ip != '0.0.0.0' else sdp_ip
if local_ip is None:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address')
return
connection = SDPConnection(local_ip.encode())
local_sdp = SDPSession(local_ip.encode(), name=settings.user_agent.encode())
if remote_sdp:
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, media in enumerate(remote_sdp.media):
stream = stream_map.get(index, None)
if stream is not None:
# TODO: broken for RTP streams here
media = stream.get_local_media(remote_sdp=remote_sdp, index=index)
if not media.has_ice_attributes and not media.has_ice_candidates:
media.connection = connection
else:
media = SDPMediaStream.new(media)
media.connection = connection
media.port = 0
media.attributes = []
media.bandwidth_info = []
local_sdp.media.append(media)
else:
for index, stream in enumerate(self.proposed_streams):
stream.index = index
media = stream.get_local_media(remote_sdp=None, index=index)
if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates):
media.connection = connection
local_sdp.media.append(media)
contact_header = ContactHeader.new(self._invitation.local_contact_header)
try:
local_contact_uri = self.account.contact[PublicGRUU, self._invitation.transport]
except KeyError:
pass
else:
contact_header.uri = local_contact_uri
if is_focus:
contact_header.parameters['isfocus'] = None
self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp, extra_headers=extra_headers)
notification_center.post_notification('SIPSessionWillStart', sender=self)
# Local and remote SDPs will be set after the 200 OK is sent
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
break
else:
if not connected:
# we could not have got a SIPInvitationGotSDPUpdate if we did not get an ACK
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True))
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True))
elif notification.data.prev_state == 'connected':
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
wait_count = 0
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map.get(index, None)
if stream is not None:
if remote_media.port:
wait_count += 1
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
with api.timeout(self.media_stream_timeout):
while wait_count > 0 or not connected or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True))
elif notification.data.prev_state == 'connected':
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
else:
unhandled_notifications.append(notification)
except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e:
if self._invitation.state == 'connecting':
ack_received = False if isinstance(e, api.TimeoutError) and wait_count == 0 else 'unknown'
# pjsip's invite session object does not inform us whether the ACK was received or not
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=ack_received))
elif self._invitation.state == 'connected' and not connected:
# we didn't yet get to process the SIPInvitationChangedState (state -> connected) notification
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True))
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
reason_header = None
if isinstance(e, api.TimeoutError):
if wait_count > 0:
error = 'media stream timed-out while starting'
else:
error = 'No ACK received'
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = 'Missing ACK'
elif isinstance(e, MediaStreamDidNotInitializeError):
error = 'media stream did not initialize: %s' % e.data.reason
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = 'media stream did not initialize'
else:
error = '%s media stream failed: %s' % (e.stream.type, e.data.reason)
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = 'media stream failed to start'
self.start_time = ISOTimestamp.now()
if self._invitation.state in ('incoming', 'early'):
self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header)
else:
self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header)
except InvitationDisconnectedError as e:
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
if e.data.prev_state in ('incoming', 'early'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None))
elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=False))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None))
else:
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote'))
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=getattr(e.data, 'method', 'INVITE'), code=200, reason='OK'))
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason))
self.greenlet = None
except SIPCoreInvalidStateError:
# the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.greenlet = None
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
except SIPCoreError as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
finally:
self.greenlet = None
@transition_state('incoming', 'terminating')
@run_in_green_thread
def reject(self, code=603, reason=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.send_response(code, reason)
with api.timeout(1):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'disconnected':
ack_received = notification.data.disconnect_reason != 'missing ACK'
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=ack_received))
break
except SIPCoreInvalidStateError:
# the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party
self.greenlet = None
except SIPCoreError as e:
self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e))
except api.TimeoutError:
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=False))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None))
else:
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None))
finally:
self.greenlet = None
@transition_state('received_proposal', 'accepting_proposal')
@run_in_green_thread
def accept_proposal(self, streams):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
streams = [stream for stream in streams if stream in self.proposed_streams]
for stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
try:
wait_count = len(streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
remote_sdp = self._invitation.sdp.proposed_remote
connection = SDPConnection(local_sdp.address)
stream_map = dict((stream.index, stream) for stream in streams)
for index, media in enumerate(remote_sdp.media):
stream = stream_map.get(index, None)
if stream is not None:
media = stream.get_local_media(remote_sdp=remote_sdp, index=index)
if not media.has_ice_attributes and not media.has_ice_candidates:
media.connection = connection
if index < len(local_sdp.media):
local_sdp.media[index] = media
else:
local_sdp.media.append(media)
elif index >= len(local_sdp.media): # actually == is sufficient
media = SDPMediaStream.new(media)
media.connection = connection
media.port = 0
media.attributes = []
media.bandwidth_info = []
local_sdp.media.append(media)
self._invitation.send_response(200, sdp=local_sdp)
prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
else:
self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown'))
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
if on_hold_streams != prev_on_hold_streams:
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams),
partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams)))
for stream in streams:
# TODO: check if port is 0 in local_sdp. In that case PJSIP disabled the stream because
# negotiation failed. If there are more streams, however, the negotiation is considered successful as a
# whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind of
# OK, but we cannot really start the stream. -Saul
stream.start(local_sdp, remote_sdp, stream.index)
with api.timeout(self.media_stream_timeout):
wait_count = len(streams)
while wait_count > 0 or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
else:
unhandled_notifications.append(notification)
except api.TimeoutError:
self._fail_proposal(originator='remote', error='media stream timed-out while starting')
except MediaStreamDidNotInitializeError as e:
self._fail_proposal(originator='remote', error='media stream did not initialize: {.data.reason}'.format(e))
except MediaStreamDidFailError as e:
self._fail_proposal(originator='remote', error='media stream failed: {.data.reason}'.format(e))
except InvitationDisconnectedError as e:
self._fail_proposal(originator='remote', error='session ended')
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
except SIPCoreError as e:
self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e))
else:
proposed_streams = self.proposed_streams
self.proposed_streams = None
self.streams = self.streams + streams
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=proposed_streams))
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
finally:
self.greenlet = None
@transition_state('received_proposal', 'rejecting_proposal')
@run_in_green_thread
def reject_proposal(self, code=488, reason=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.send_response(code, reason)
with api.timeout(1, None):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received='unknown'))
break
except SIPCoreError as e:
self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e))
else:
proposed_streams = self.proposed_streams
self.proposed_streams = None
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=proposed_streams))
if self._hold_in_progress:
self._send_hold()
finally:
self.greenlet = None
def add_stream(self, stream):
self.add_streams([stream])
@transition_state('connected', 'sending_proposal')
@run_in_green_thread
def add_streams(self, streams):
streams = list(set(streams).difference(self.streams))
if not streams:
self.state = 'connected'
return
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
unhandled_notifications = []
self.proposed_streams = streams
for stream in self.proposed_streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
# This is actually the only reason for which this notification could be received
if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal':
self._fail_proposal(originator='local', error='received stream proposal')
self.handle_notification(notification)
return
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.proposed_streams:
# Try to reuse a disabled media stream to avoid an ever-growing SDP
try:
index = next(index for index, media in enumerate(local_sdp.media) if media.port == 0)
reuse_media = True
except StopIteration:
index = len(local_sdp.media)
reuse_media = False
stream.index = index
media = stream.get_local_media(remote_sdp=None, index=index)
if reuse_media:
local_sdp.media[index] = media
else:
local_sdp.media.append(media)
self._invitation.send_reinvite(sdp=local_sdp)
notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:]))
received_invitation_state = False
received_sdp_update = False
try:
with api.timeout(settings.sip.invite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for s in self.streams:
s.update(local_sdp, remote_sdp, s.index)
else:
self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
if notification.data.code >= 300:
proposed_streams = self.proposed_streams
for stream in proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.proposed_streams = None
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams))
return
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except api.TimeoutError:
self.cancel_proposal()
return
accepted_streams = []
for stream in self.proposed_streams:
try:
remote_media = remote_sdp.media[stream.index]
except IndexError:
self._fail_proposal(originator='local', error='SDP media missing in answer')
return
else:
if remote_media.port:
stream.start(local_sdp, remote_sdp, stream.index)
accepted_streams.append(stream)
else:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
with api.timeout(self.media_stream_timeout):
wait_count = len(accepted_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
except api.TimeoutError:
self._fail_proposal(originator='local', error='media stream timed-out while starting')
except MediaStreamDidNotInitializeError as e:
self._fail_proposal(originator='local', error='media stream did not initialize: {.data.reason}'.format(e))
except MediaStreamDidFailError as e:
self._fail_proposal(originator='local', error='media stream failed: {.data.reason}'.format(e))
except InvitationDisconnectedError as e:
self._fail_proposal(originator='local', error='session ended')
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
except SIPCoreError as e:
self._fail_proposal(originator='local', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams += accepted_streams
proposed_streams = self.proposed_streams
self.proposed_streams = None
any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in accepted_streams)
if any_stream_ice:
self._reinvite_after_ice()
notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=accepted_streams, proposed_streams=proposed_streams))
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=accepted_streams, removed_streams=[]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
finally:
self.greenlet = None
def remove_stream(self, stream):
self.remove_streams([stream])
@transition_state('connected', 'sending_proposal')
@run_in_green_thread
def remove_streams(self, streams):
streams = list(set(streams).intersection(self.streams))
if not streams:
self.state = 'connected'
return
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
self.streams.remove(stream)
media = local_sdp.media[stream.index]
media.port = 0
media.attributes = []
media.bandwidth_info = []
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
with api.timeout(self.short_reinvite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for s in self.streams:
s.update(local_sdp, remote_sdp, s.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
if not (200 <= notification.data.code < 300):
break
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except InvitationDisconnectedError as e:
for stream in streams:
stream.end()
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
except (api.TimeoutError, MediaStreamDidFailError, SIPCoreError):
for stream in streams:
stream.end()
self.end()
else:
for stream in streams:
stream.end()
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=streams))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
finally:
self.greenlet = None
@transition_state('sending_proposal', 'cancelling_proposal')
@run_in_green_thread
def cancel_proposal(self):
if self.greenlet is not None:
api.kill(self.greenlet, api.GreenletExit())
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.cancel_reinvite()
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason))
if notification.data.code == 487:
proposed_streams = self.proposed_streams or []
for stream in proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.proposed_streams = None
self.state = 'connected'
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams))
elif notification.data.code == 200:
self.end()
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
break
except SIPCoreError as e:
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None))
proposed_streams = self.proposed_streams or []
for stream in proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.proposed_streams = None
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=proposed_streams))
except InvitationDisconnectedError as e:
for stream in self.proposed_streams or []:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.proposed_streams = None
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
else:
for stream in self.proposed_streams or []:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.proposed_streams = None
self.greenlet = None
self.state = 'connected'
finally:
self.greenlet = None
if self._hold_in_progress:
self._send_hold()
@run_in_green_thread
def hold(self):
if self.on_hold or self._hold_in_progress:
return
self._hold_in_progress = True
streams = (self.streams or []) + (self.proposed_streams or [])
if not streams:
return
for stream in streams:
stream.hold()
if self.state == 'connected':
self._send_hold()
@run_in_green_thread
def unhold(self):
if not self.on_hold and not self._hold_in_progress:
return
self._hold_in_progress = False
streams = (self.streams or []) + (self.proposed_streams or [])
if not streams:
return
for stream in streams:
stream.unhold()
if self.state == 'connected':
self._send_unhold()
@run_in_green_thread
def end(self):
if self.state in (None, 'terminating', 'terminated'):
return
if self.greenlet is not None:
api.kill(self.greenlet, api.GreenletExit())
self.greenlet = None
notification_center = NotificationCenter()
if self._invitation is None:
# The invitation was not yet constructed
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
return
elif self._invitation.state is None:
# The invitation was built but never sent
streams = (self.streams or []) + (self.proposed_streams or [])
for stream in streams[:]:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
streams.remove(stream)
else:
stream.deactivate()
stream.end()
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
return
invitation_state = self._invitation.state
if invitation_state in ('disconnecting', 'disconnected'):
return
self.greenlet = api.getcurrent()
self.state = 'terminating'
if invitation_state == 'connected':
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local'))
streams = (self.streams or []) + (self.proposed_streams or [])
for stream in streams[:]:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
streams.remove(stream)
else:
stream.deactivate()
cancelling = invitation_state != 'connected' and self.direction == 'outgoing'
try:
self._invitation.end(timeout=1)
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected':
if notification.data.disconnect_reason in ('internal error', 'missing ACK'):
pass
elif notification.data.disconnect_reason == 'timeout':
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local' if self.direction=='outgoing' else 'remote', method='INVITE', code=408, reason='Timeout'))
elif cancelling:
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
elif hasattr(notification.data, 'method'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='remote', method=notification.data.method, code=200, reason=sip_status_messages[200]))
elif notification.data.disconnect_reason == 'user request':
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local', method='BYE', code=notification.data.code, reason=notification.data.reason))
break
except SIPCoreError as e:
if cancelling:
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None))
else:
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e)))
except InvitationDisconnectedError as e:
# As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE
if e.data.prev_state == 'connected':
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200]))
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason))
elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200]))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None))
else:
if e.data.originator == 'remote':
code = e.data.code
reason = e.data.reason
elif e.data.disconnect_reason == 'timeout':
code = 408
reason = 'timeout'
else:
code = 0
reason = None
if e.data.originator == 'remote' and code // 100 == 3:
redirect_identities = e.data.headers.get('Contact', [])
else:
redirect_identities = None
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=code, reason=reason))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities))
else:
if cancelling:
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
else:
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request'))
finally:
for stream in streams:
stream.end()
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
@check_state(['connected'])
@check_transfer_state(None, None)
@run_in_twisted_thread
def transfer(self, target_uri, replaced_session=None):
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionTransferNewOutgoing', self, NotificationData(transfer_destination=target_uri))
try:
self._invitation.transfer(target_uri, replaced_session.dialog_id if replaced_session is not None else None)
except SIPCoreError as e:
notification_center.post_notification('SIPSessionTransferDidFail', sender=self, data=NotificationData(code=500, reason=str(e)))
@check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal'])
@check_transfer_state('incoming', 'starting')
def accept_transfer(self):
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionTransferDidStart', sender=self)
@check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal'])
@check_transfer_state('incoming', 'starting')
def reject_transfer(self, code=603, reason=None):
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionTransferDidFail', self, NotificationData(code=code, reason=reason or sip_status_messages[code]))
@property
def dialog_id(self):
return self._invitation.dialog_id if self._invitation is not None else None
@property
def local_identity(self):
if self._invitation is not None and self._invitation.local_identity is not None:
return self._invitation.local_identity
else:
return self._local_identity
@property
def peer_address(self):
return self._invitation.peer_address if self._invitation is not None else None
@property
def remote_identity(self):
if self._invitation is not None and self._invitation.remote_identity is not None:
return self._invitation.remote_identity
else:
return self._remote_identity
@property
def remote_user_agent(self):
return self._invitation.remote_user_agent if self._invitation is not None else None
def _cancel_hold(self):
notification_center = NotificationCenter()
try:
self._invitation.cancel_reinvite()
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason))
if notification.data.code == 200:
self.end()
return False
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
break
except SIPCoreError as e:
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None))
except InvitationDisconnectedError as e:
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
return False
return True
def _send_hold(self):
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
with api.timeout(self.short_reinvite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except InvitationDisconnectedError as e:
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
return
except api.TimeoutError:
if not self._cancel_hold():
return
except SIPCoreError:
pass
self.greenlet = None
self.on_hold = True
self.state = 'connected'
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams)))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._hold_in_progress = False
else:
for stream in self.streams:
stream.unhold()
self._send_unhold()
def _send_unhold(self):
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
with api.timeout(self.short_reinvite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except InvitationDisconnectedError as e:
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
return
except api.TimeoutError:
if not self._cancel_hold():
return
except SIPCoreError:
pass
self.greenlet = None
self.on_hold = False
self.state = 'connected'
notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
for stream in self.streams:
stream.hold()
self._send_hold()
def _fail(self, originator, code, reason, error, reason_header=None):
notification_center = NotificationCenter()
prev_inv_state = self._invitation.state
self.state = 'terminating'
if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'):
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator))
if self._invitation.state not in (None, 'disconnecting', 'disconnected'):
try:
if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'):
if 400<=code<=699 and reason is not None:
self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else [])
else:
self._invitation.end(extra_headers=[reason_header] if reason_header is not None else [])
with api.timeout(1):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected':
if prev_inv_state in ('connecting', 'connected'):
if notification.data.disconnect_reason in ('timeout', 'missing ACK'):
sip_code = 200
sip_reason = 'OK'
originator = 'local'
elif hasattr(notification.data, 'method'):
sip_code = 200
sip_reason = 'OK'
originator = 'remote'
else:
sip_code = notification.data.code
sip_reason = notification.data.reason
originator = 'local'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=originator, method='BYE', code=sip_code, reason=sip_reason))
elif self._invitation.direction == 'incoming' and prev_inv_state in ('incoming', 'early'):
ack_received = notification.data.disconnect_reason != 'missing ACK'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=reason, ack_received=ack_received))
elif self._invitation.direction == 'outgoing' and prev_inv_state in ('outgoing', 'early'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=487, reason='Session Cancelled'))
break
except SIPCoreError:
pass
except api.TimeoutError:
if prev_inv_state in ('connecting', 'connected'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='BYE', code=408, reason=sip_status_messages[408]))
notification_center.remove_observer(self, sender=self._invitation)
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None))
self.greenlet = None
def _fail_proposal(self, originator, error):
notification_center = NotificationCenter()
for stream in self.proposed_streams:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
# _fail_proposal can be called from reject_proposal, which means the stream will
# not have been initialized or the session registered as an observer for it.
pass
else:
stream.deactivate()
stream.end()
if originator == 'remote' and self._invitation.sub_state == 'received_proposal':
try:
self._invitation.send_response(488 if self.proposed_streams else 500)
except SIPCoreError:
pass
else:
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=500, reason=sip_status_messages[500], ack_received='unknown'))
notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:]))
self.state = 'connected'
self.proposed_streams = None
self.greenlet = None
@run_in_green_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPInvitationChangedState(self, notification):
if self.state == 'terminated':
return
if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'):
contact_header = notification.data.headers.get('Contact', None)
if contact_header and 'isfocus' in contact_header[0].parameters:
self.remote_focus = True
if self.greenlet is not None:
if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting':
self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data))
else:
self._channel.send(notification)
else:
self.greenlet = api.getcurrent()
unhandled_notifications = []
try:
if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal':
self.state = 'received_proposal'
try:
proposed_remote_sdp = self._invitation.sdp.proposed_remote
active_remote_sdp = self._invitation.sdp.active_remote
if len(proposed_remote_sdp.media) < len(active_remote_sdp.media):
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Streams cannot be deleted from the SDP')])
self.state = 'connected'
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
return
for stream in self.streams:
if not stream.validate_update(proposed_remote_sdp, stream.index):
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)])
self.state = 'connected'
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
return
added_media_indexes = set()
removed_media_indexes = set()
reused_media_indexes = set()
for index, media_stream in enumerate(proposed_remote_sdp.media):
if index >= len(active_remote_sdp.media):
added_media_indexes.add(index)
elif media_stream.port == 0 and active_remote_sdp.media[index].port > 0:
removed_media_indexes.add(index)
elif media_stream.port > 0 and active_remote_sdp.media[index].port == 0:
reused_media_indexes.add(index)
elif media_stream.media != active_remote_sdp.media[index].media:
added_media_indexes.add(index)
removed_media_indexes.add(index)
if added_media_indexes | reused_media_indexes and removed_media_indexes:
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')])
self.state = 'connected'
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
return
elif added_media_indexes | reused_media_indexes:
self.proposed_streams = []
for index in added_media_indexes | reused_media_indexes:
media_stream = proposed_remote_sdp.media[index]
if media_stream.port != 0:
for stream_type in MediaStreamRegistry:
try:
stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index)
except UnknownStreamError:
continue
except InvalidStreamError as e:
log.error("Invalid stream: {}".format(e))
break
except Exception as e:
log.exception("Exception occurred while setting up stream from SDP: {}".format(e))
break
else:
stream.index = index
self.proposed_streams.append(stream)
break
if self.proposed_streams:
self._invitation.send_response(100)
notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:]))
else:
self._invitation.send_response(488)
self.state = 'connected'
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
return
else:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes]
prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
for stream in removed_streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
media = local_sdp.media[stream.index]
media.port = 0
media.attributes = []
media.bandwidth_info = []
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=proposed_remote_sdp, index=stream.index)
try:
self._invitation.send_response(200, sdp=local_sdp)
except PJSIPError:
for stream in removed_streams:
self.streams.remove(stream)
stream.end()
if removed_streams:
self.end()
return
else:
try:
self._invitation.send_response(488)
except PJSIPError:
self.end()
return
else:
for stream in removed_streams:
self.streams.remove(stream)
stream.end()
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown'))
received_invitation_state = False
received_sdp_update = False
while not received_sdp_update or not received_invitation_state or self._channel:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
else:
unhandled_notifications.append(notification)
else:
unhandled_notifications.append(notification)
on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
if on_hold_streams != prev_on_hold_streams:
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams),
partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams)))
if removed_media_indexes:
notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams))
except InvitationDisconnectedError as e:
self.greenlet = None
self.state = 'connected'
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = NotificationCenter()
self.handle_notification(notification)
except SIPCoreError:
self.end()
else:
self.state = 'connected'
elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request':
self.state = 'received_proposal_request'
try:
# An empty proposal was received, generate an offer
self._invitation.send_response(100)
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip)
if local_sdp.connection is not None:
local_sdp.connection.address = connection_address
for index, stream in enumerate(self.streams):
stream.reset(index)
media = stream.get_local_media(remote_sdp=None, index=index)
if media.connection is not None:
media.connection.address = connection_address
local_sdp.media[stream.index] = media
self._invitation.send_response(200, sdp=local_sdp)
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown'))
received_invitation_state = False
received_sdp_update = False
while not received_sdp_update or not received_invitation_state or self._channel:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
else:
unhandled_notifications.append(notification)
else:
unhandled_notifications.append(notification)
except InvitationDisconnectedError as e:
self.greenlet = None
self.state = 'connected'
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = NotificationCenter()
self.handle_notification(notification)
except SIPCoreError:
raise # FIXME
else:
self.state = 'connected'
elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal':
if notification.data.originator == 'local' and notification.data.code == 487:
proposed_streams = self.proposed_streams
self.proposed_streams = None
self.state = 'connected'
notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams))
if self._hold_in_progress:
self._send_hold()
elif notification.data.state == 'disconnected':
if self.state == 'incoming':
self.state = 'terminated'
if notification.data.originator == 'remote':
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None))
else:
# There must have been an error involved
notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None))
else:
self.state = 'terminated'
notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator))
for stream in self.streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if notification.data.originator == 'remote':
if hasattr(notification.data, 'method'):
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method=notification.data.method, code=200, reason=sip_status_messages[200]))
else:
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method='INVITE', code=notification.data.code, reason=notification.data.reason))
self.end_time = ISOTimestamp.now()
notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason))
notification.center.remove_observer(self, sender=self._invitation)
finally:
self.greenlet = None
for notification in unhandled_notifications:
self.handle_notification(notification)
def _NH_SIPInvitationGotSDPUpdate(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidInitialize(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_RTPStreamDidEnableEncryption(self, notification):
if notification.sender.type != 'audio':
return
audio_stream = notification.sender
if audio_stream.encryption.type == 'ZRTP':
# start ZRTP on the video stream, if applicable
try:
video_stream = next(stream for stream in self.streams or [] if stream.type=='video')
except StopIteration:
return
if video_stream.encryption.type == 'ZRTP' and not video_stream.encryption.active:
video_stream.encryption.zrtp._enable(audio_stream)
def _NH_MediaStreamDidStart(self, notification):
stream = notification.sender
if stream.type == 'audio' and stream.encryption.type == 'ZRTP':
stream.encryption.zrtp._enable()
elif stream.type == 'video' and stream.encryption.type == 'ZRTP':
# start ZRTP on the video stream, if applicable
try:
audio_stream = next(stream for stream in self.streams or [] if stream.type=='audio')
except StopIteration:
pass
else:
if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active:
stream.encryption.zrtp._enable(audio_stream)
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidNotInitialize(self, notification):
if self.greenlet is not None and self.state not in ('terminating', 'terminated'):
self._channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data))
def _NH_MediaStreamDidFail(self, notification):
if self.greenlet is not None:
if self.state not in ('terminating', 'terminated'):
self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data))
else:
stream = notification.sender
if self.streams == [stream]:
self.end()
else:
try:
self.remove_stream(stream)
except IllegalStateError:
self.end()
@implementer(IObserver)
class SessionManager(object, metaclass=Singleton):
def __init__(self):
self.sessions = []
self.state = None
self._channel = coros.queue()
def start(self):
self.state = 'starting'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillStart', sender=self)
notification_center.add_observer(self, 'SIPInvitationChangedState')
notification_center.add_observer(self, 'SIPSessionNewIncoming')
notification_center.add_observer(self, 'SIPSessionNewOutgoing')
notification_center.add_observer(self, 'SIPSessionDidFail')
notification_center.add_observer(self, 'SIPSessionDidEnd')
self.state = 'started'
notification_center.post_notification('SIPSessionManagerDidStart', sender=self)
def stop(self):
self.state = 'stopping'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillEnd', sender=self)
for session in self.sessions:
session.end()
while self.sessions:
self._channel.wait()
notification_center.remove_observer(self, 'SIPInvitationChangedState')
notification_center.remove_observer(self, 'SIPSessionNewIncoming')
notification_center.remove_observer(self, 'SIPSessionNewOutgoing')
notification_center.remove_observer(self, 'SIPSessionDidFail')
notification_center.remove_observer(self, 'SIPSessionDidEnd')
self.state = 'stopped'
notification_center.post_notification('SIPSessionManagerDidEnd', sender=self)
@run_in_twisted_thread
def handle_notification(self, notification):
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming':
account_manager = AccountManager()
account = account_manager.find_account(notification.data.request_uri)
if account is None:
notification.sender.send_response(404)
return
notification.sender.send_response(100)
session = Session(account)
session.init_incoming(notification.sender, notification.data)
elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'):
self.sessions.append(notification.sender)
elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'):
self.sessions.remove(notification.sender)
if self.state == 'stopping':
self._channel.send(notification)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 5:50 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408886
Default Alt Text
(274 KB)

Event Timeline