Page MenuHomePhabricator

No OneTemporary

diff --git a/sipsimple/application.py b/sipsimple/application.py
index d941853d..301a2edc 100644
--- a/sipsimple/application.py
+++ b/sipsimple/application.py
@@ -1,553 +1,552 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Implements a high-level application responsable for starting and stopping
various sub-systems required to implement a fully featured SIP User Agent
application.
"""
from __future__ import absolute_import
__all__ = ["SIPApplication"]
from threading import RLock, Thread
from uuid import uuid4
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.descriptor import classproperty
from application.python.types import Singleton
from eventlib import coros, proc
from twisted.internet import reactor
from xcaplib import client as xcap_client
from zope.interface import implements
from sipsimple.account import Account, AccountManager
from sipsimple.addressbook import AddressbookManager
from sipsimple.audio import AudioDevice, RootAudioBridge
from sipsimple.configuration import ConfigurationManager
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import AudioMixer, Engine, PJSIPError, SIPCoreError, SIPURI
from sipsimple.lookup import DNSLookup, DNSLookupError, DNSManager
from sipsimple.session import SessionManager
from sipsimple.storage import ISIPSimpleStorage
from sipsimple.threading import ThreadManager, run_in_thread, run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
class ApplicationAttribute(object):
def __init__(self, value):
self.value = value
def __get__(self, obj, objtype):
return self.value
def __set__(self, obj, value):
self.value = value
def __delete__(self, obj):
raise AttributeError('cannot delete attribute')
class SIPApplication(object):
__metaclass__ = Singleton
implements(IObserver)
storage = ApplicationAttribute(value=None)
state = ApplicationAttribute(value=None)
end_reason = ApplicationAttribute(value=None)
alert_audio_device = ApplicationAttribute(value=None)
alert_audio_bridge = ApplicationAttribute(value=None)
voice_audio_device = ApplicationAttribute(value=None)
voice_audio_bridge = ApplicationAttribute(value=None)
_channel = ApplicationAttribute(value=coros.queue())
_nat_detect_channel = ApplicationAttribute(value=coros.queue())
_wakeup_timer = ApplicationAttribute(value=None)
_lock = ApplicationAttribute(value=RLock())
engine = Engine()
local_nat_type = ApplicationAttribute(value='unknown')
running = classproperty(lambda cls: cls.state == 'started')
alert_audio_mixer = classproperty(lambda cls: cls.alert_audio_bridge.mixer if cls.alert_audio_bridge else None)
voice_audio_mixer = classproperty(lambda cls: cls.voice_audio_bridge.mixer if cls.voice_audio_bridge else None)
def start(self, storage):
if not ISIPSimpleStorage.providedBy(storage):
raise TypeError("storage must implement the ISIPSimpleStorage interface")
with self._lock:
if self.state is not None:
raise RuntimeError("SIPApplication cannot be started from '%s' state" % self.state)
self.state = 'starting'
self.storage = storage
thread_manager = ThreadManager()
thread_manager.start()
configuration_manager = ConfigurationManager()
addressbook_manager = AddressbookManager()
account_manager = AccountManager()
# load configuration
try:
configuration_manager.start()
SIPSimpleSettings()
account_manager.load()
addressbook_manager.load()
except:
self.state = None
self.storage = None
raise
# start the reactor thread
self.thread = Thread(name='Reactor Thread', target=self._run_reactor)
self.thread.start()
def stop(self):
with self._lock:
if self.state in (None, 'stopping', 'stopped'):
return
prev_state = self.state
self.state = 'stopping'
self.end_reason = 'application request'
notification_center = NotificationCenter()
notification_center.post_notification('SIPApplicationWillEnd', sender=self)
if prev_state != 'starting':
self._shutdown_subsystems()
def _run_reactor(self):
from eventlib.twistedutil import join_reactor
notification_center = NotificationCenter()
self._initialize_subsystems()
reactor.run(installSignalHandlers=False)
self.state = 'stopped'
notification_center.post_notification('SIPApplicationDidEnd', sender=self, data=NotificationData(end_reason=self.end_reason))
@run_in_green_thread
def _initialize_subsystems(self):
account_manager = AccountManager()
addressbook_manager = AddressbookManager()
dns_manager = DNSManager()
engine = Engine()
notification_center = NotificationCenter()
session_manager = SessionManager()
settings = SIPSimpleSettings()
xcap_client.DEFAULT_HEADERS = {'User-Agent': settings.user_agent}
notification_center.post_notification('SIPApplicationWillStart', sender=self)
if self.state == 'stopping':
reactor.stop()
return
account = account_manager.default_account
# initialize core
notification_center.add_observer(self, sender=engine)
options = dict(# general
user_agent=settings.user_agent,
# SIP
detect_sip_loops=True,
udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None,
tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None,
tls_port=None,
# TLS
tls_verify_server=False,
tls_ca_file=None,
tls_cert_file=None,
tls_privkey_file=None,
tls_timeout=3000,
# rtp
rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end),
# audio
codecs=list(settings.rtp.audio_codec_list),
# logging
log_level=settings.logs.pjsip_level,
trace_sip=True,
)
try:
engine.start(**options)
except SIPCoreError:
self.end_reason = 'engine failed'
reactor.stop()
return
# initialize TLS
try:
engine.set_tls_options(port=settings.sip.tls_port if 'tls' in settings.sip.transport_list else None,
verify_server=account.tls.verify_server if account else False,
ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None,
cert_file=account.tls.certificate.normalized if account and account.tls.certificate else None,
privkey_file=account.tls.certificate.normalized if account and account.tls.certificate else None,
timeout=settings.tls.timeout)
except Exception, e:
notification_center = NotificationCenter()
notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e))
# initialize PJSIP internal resolver
engine.set_nameservers(dns_manager.nameservers)
# initialize audio objects
alert_device = settings.audio.alert_device
if alert_device not in (None, u'system_default') and alert_device not in engine.output_devices:
alert_device = u'system_default'
input_device = settings.audio.input_device
if input_device not in (None, u'system_default') and input_device not in engine.input_devices:
input_device = u'system_default'
output_device = settings.audio.output_device
if output_device not in (None, u'system_default') and output_device not in engine.output_devices:
output_device = u'system_default'
try:
voice_mixer = AudioMixer(input_device, output_device, settings.audio.sample_rate, settings.audio.tail_length)
except SIPCoreError:
try:
voice_mixer = AudioMixer(u'system_default', u'system_default', settings.audio.sample_rate, settings.audio.tail_length)
except SIPCoreError:
voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, settings.audio.tail_length)
self.voice_audio_device = AudioDevice(voice_mixer)
self.voice_audio_bridge = RootAudioBridge(voice_mixer)
self.voice_audio_bridge.add(self.voice_audio_device)
try:
alert_mixer = AudioMixer(None, alert_device, settings.audio.sample_rate, 0)
except SIPCoreError:
try:
alert_mixer = AudioMixer(None, u'system_default', settings.audio.sample_rate, 0)
except SIPCoreError:
alert_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0)
if settings.audio.silent:
alert_mixer.output_volume = 0
self.alert_audio_device = AudioDevice(alert_mixer)
self.alert_audio_bridge = RootAudioBridge(alert_mixer)
self.alert_audio_bridge.add(self.alert_audio_device)
settings.audio.input_device = voice_mixer.input_device
settings.audio.output_device = voice_mixer.output_device
settings.audio.alert_device = alert_mixer.output_device
settings.save()
# initialize instance id
if not settings.instance_id:
settings.instance_id = uuid4().urn
settings.save()
# initialize middleware components
dns_manager.start()
account_manager.start()
addressbook_manager.start()
session_manager.start()
notification_center.add_observer(self, name='CFGSettingsObjectDidChange')
notification_center.add_observer(self, name='SIPEngineDetectedNATType')
notification_center.add_observer(self, name='DNSNameserversDidChange')
notification_center.add_observer(self, name='SystemIPAddressDidChange')
notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
self.state = 'started'
notification_center.post_notification('SIPApplicationDidStart', sender=self)
self._detect_nat_type()
self._nat_detect_channel.send(Command('detect_nat'))
@run_in_green_thread
def _shutdown_subsystems(self):
# cleanup internals
if self._wakeup_timer is not None and self._wakeup_timer.active():
self._wakeup_timer.cancel()
self._wakeup_timer = None
# shutdown middleware components
dns_manager = DNSManager()
account_manager = AccountManager()
addressbook_manager = AddressbookManager()
session_manager = SessionManager()
procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(addressbook_manager.stop), proc.spawn(session_manager.stop)]
proc.waitall(procs)
# shutdown engine
engine = Engine()
engine.stop()
while True:
notification = self._channel.wait()
if notification.name == 'SIPEngineDidEnd':
break
# stop threads
thread_manager = ThreadManager()
thread_manager.stop()
# stop the reactor
reactor.stop()
@run_in_green_thread
def _detect_nat_type(self):
account_manager = AccountManager()
engine = Engine()
lookup = DNSLookup()
serial = 0
while True:
restart_detection = False
command = self._nat_detect_channel.wait()
if command.name != 'detect_nat':
continue
continue # disable NAT detection for now as it is not used anywhere -Dan
stun_locators = list(account.nat_traversal.stun_server_list or account.id.domain for account in account_manager.iter_accounts() if isinstance(account, Account))
for stun_item in stun_locators:
if isinstance(stun_item, basestring):
try:
stun_servers = lookup.lookup_service(SIPURI(host=stun_item), 'stun').wait()
except DNSLookupError:
continue
else:
stun_servers = [(server.host, server.port) for server in stun_item]
for stun_server, stun_port in stun_servers:
serial += 1
try:
engine.detect_nat_type(stun_server, stun_port, user_data=serial)
except PJSIPError:
continue
command = self._nat_detect_channel.wait()
if command.name == 'process_nat_detection' and command.data.user_data == serial and command.data.succeeded:
self.local_nat_type = command.data.nat_type.lower()
restart_detection = True
break
elif command.name == 'detect_nat':
self._nat_detect_channel.send(command)
restart_detection = True
break
if restart_detection:
break
else:
self.local_nat_type = 'unknown'
reactor.callLater(60, self._nat_detect_channel.send, Command('detect_nat'))
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPEngineDidEnd(self, notification):
self._channel.send(notification)
def _NH_SIPEngineDidFail(self, notification):
if not self.running:
return
self.end_reason = 'engine failed'
reactor.stop()
@run_in_thread('device-io')
def _NH_CFGSettingsObjectDidChange(self, notification):
engine = Engine()
settings = SIPSimpleSettings()
account_manager = AccountManager()
if notification.sender is settings:
if 'audio.sample_rate' in notification.data.modified:
alert_device = settings.audio.alert_device
if alert_device not in (None, u'system_default') and alert_device not in engine.output_devices:
alert_device = u'system_default'
input_device = settings.audio.input_device
if input_device not in (None, u'system_default') and input_device not in engine.input_devices:
input_device = u'system_default'
output_device = settings.audio.output_device
if output_device not in (None, u'system_default') and output_device not in engine.output_devices:
output_device = u'system_default'
try:
voice_mixer = AudioMixer(input_device, output_device, settings.audio.sample_rate, settings.audio.tail_length)
except SIPCoreError:
try:
voice_mixer = AudioMixer(u'system_default', u'system_default', settings.audio.sample_rate, settings.audio.tail_length)
except SIPCoreError:
voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, settings.audio.tail_length)
self.voice_audio_device = AudioDevice(voice_mixer)
self.voice_audio_bridge = RootAudioBridge(voice_mixer)
self.voice_audio_bridge.add(self.voice_audio_device)
try:
alert_mixer = AudioMixer(None, alert_device, settings.audio.sample_rate, 0)
except SIPCoreError:
try:
alert_mixer = AudioMixer(None, u'system_default', settings.audio.sample_rate, 0)
except SIPCoreError:
alert_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0)
self.alert_audio_device = AudioDevice(alert_mixer)
self.alert_audio_bridge = RootAudioBridge(alert_mixer)
self.alert_audio_bridge.add(self.alert_audio_device)
if settings.audio.silent:
alert_mixer.output_volume = 0
settings.audio.input_device = voice_mixer.input_device
settings.audio.output_device = voice_mixer.output_device
settings.audio.alert_device = alert_mixer.output_device
settings.save()
else:
if set(['audio.input_device', 'audio.output_device', 'audio.alert_device', 'audio.tail_length']).intersection(notification.data.modified):
input_device = settings.audio.input_device
if input_device not in (None, u'system_default') and input_device not in engine.input_devices:
input_device = u'system_default'
output_device = settings.audio.output_device
if output_device not in (None, u'system_default') and output_device not in engine.output_devices:
output_device = u'system_default'
if input_device != self.voice_audio_bridge.mixer.input_device or output_device != self.voice_audio_bridge.mixer.output_device or 'audio.tail_length' in notification.data.modified:
try:
self.voice_audio_bridge.mixer.set_sound_devices(input_device, output_device, settings.audio.tail_length)
except SIPCoreError:
try:
self.voice_audio_bridge.mixer.set_sound_devices(u'system_default', u'system_default', settings.audio.tail_length)
except SIPCoreError:
self.voice_audio_bridge.mixer.set_sound_devices(None, None, settings.audio.tail_length)
settings.audio.input_device = self.voice_audio_bridge.mixer.input_device
settings.audio.output_device = self.voice_audio_bridge.mixer.output_device
settings.save()
alert_device = settings.audio.alert_device
if alert_device not in (None, u'system_default') and alert_device not in engine.output_devices:
alert_device = u'system_default'
if alert_device != self.alert_audio_bridge.mixer.output_device:
try:
self.alert_audio_bridge.mixer.set_sound_devices(None, alert_device, 0)
except SIPCoreError:
try:
self.alert_audio_bridge.mixer.set_sound_devices(None, u'system_default', 0)
except SIPCoreError:
self.alert_audio_bridge.mixer.set_sound_devices(None, None, 0)
settings.audio.alert_device = self.alert_audio_bridge.mixer.output_device
settings.save()
if 'audio.silent' in notification.data.modified:
if settings.audio.silent:
self.alert_audio_bridge.mixer.output_volume = 0
else:
self.alert_audio_bridge.mixer.output_volume = 100
if 'user_agent' in notification.data.modified:
engine.user_agent = settings.user_agent
if 'sip.udp_port' in notification.data.modified:
engine.set_udp_port(settings.sip.udp_port)
if 'sip.tcp_port' in notification.data.modified:
engine.set_tcp_port(settings.sip.tcp_port)
- if set(('sip.tls_port', 'tls.protocol', 'tls.ca_list', 'tls.timeout', 'default_account')).intersection(notification.data.modified):
+ if set(('sip.tls_port', 'tls.ca_list', 'tls.timeout', 'default_account')).intersection(notification.data.modified):
account = account_manager.default_account
try:
engine.set_tls_options(port=settings.sip.tls_port,
- protocol=settings.tls.protocol,
verify_server=account.tls.verify_server if account else False,
ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None,
cert_file=account.tls.certificate.normalized if account and account.tls.certificate else None,
privkey_file=account.tls.certificate.normalized if account and account.tls.certificate else None,
timeout=settings.tls.timeout)
except Exception, e:
notification_center = NotificationCenter()
notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e))
if 'rtp.port_range' in notification.data.modified:
engine.rtp_port_range = (settings.rtp.port_range.start, settings.rtp.port_range.end)
if 'rtp.audio_codec_list' in notification.data.modified:
engine.codecs = list(settings.rtp.audio_codec_list)
if 'logs.pjsip_level' in notification.data.modified:
engine.log_level = settings.logs.pjsip_level
elif notification.sender is account_manager.default_account:
if set(('tls.verify_server', 'tls.certificate')).intersection(notification.data.modified):
account = account_manager.default_account
try:
engine.set_tls_options(port=settings.sip.tls_port,
verify_server=account.tls.verify_server,
ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None,
cert_file=account.tls.certificate.normalized if account.tls.certificate else None,
privkey_file=account.tls.certificate.normalized if account.tls.certificate else None,
timeout=settings.tls.timeout)
except Exception, e:
notification_center = NotificationCenter()
notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e))
@run_in_thread('device-io')
def _NH_DefaultAudioDeviceDidChange(self, notification):
if None in (self.voice_audio_bridge, self.alert_audio_bridge):
return
settings = SIPSimpleSettings()
current_input_device = self.voice_audio_bridge.mixer.input_device
current_output_device = self.voice_audio_bridge.mixer.output_device
current_alert_device = self.alert_audio_bridge.mixer.output_device
ec_tail_length = self.voice_audio_bridge.mixer.ec_tail_length
if notification.data.changed_input and u'system_default' in (current_input_device, settings.audio.input_device):
try:
self.voice_audio_bridge.mixer.set_sound_devices(u'system_default', current_output_device, ec_tail_length)
except SIPCoreError:
self.voice_audio_bridge.mixer.set_sound_devices(None, None, ec_tail_length)
if notification.data.changed_output and u'system_default' in (current_output_device, settings.audio.output_device):
try:
self.voice_audio_bridge.mixer.set_sound_devices(current_input_device, u'system_default', ec_tail_length)
except SIPCoreError:
self.voice_audio_bridge.mixer.set_sound_devices(None, None, ec_tail_length)
if notification.data.changed_output and u'system_default' in (current_alert_device, settings.audio.alert_device):
try:
self.alert_audio_bridge.mixer.set_sound_devices(None, u'system_default', 0)
except SIPCoreError:
self.alert_audio_bridge.mixer.set_sound_devices(None, None, 0)
@run_in_thread('device-io')
def _NH_AudioDevicesDidChange(self, notification):
old_devices = set(notification.data.old_devices)
new_devices = set(notification.data.new_devices)
removed_devices = old_devices - new_devices
if not removed_devices:
return
input_device = self.voice_audio_bridge.mixer.input_device
output_device = self.voice_audio_bridge.mixer.output_device
alert_device = self.alert_audio_bridge.mixer.output_device
if self.voice_audio_bridge.mixer.real_input_device in removed_devices:
input_device = u'system_default' if new_devices else None
if self.voice_audio_bridge.mixer.real_output_device in removed_devices:
output_device = u'system_default' if new_devices else None
if self.alert_audio_bridge.mixer.real_output_device in removed_devices:
alert_device = u'system_default' if new_devices else None
try:
self.voice_audio_bridge.mixer.set_sound_devices(input_device, output_device, self.voice_audio_bridge.mixer.ec_tail_length)
except SIPCoreError:
try:
self.voice_audio_bridge.mixer.set_sound_devices(u'system_default', u'system_default', self.voice_audio_bridge.mixer.ec_tail_length)
except SIPCoreError:
self.voice_audio_bridge.mixer.set_sound_devices(None, None, self.voice_audio_bridge.mixer.ec_tail_length)
try:
self.alert_audio_bridge.mixer.set_sound_devices(None, alert_device, 0)
except SIPCoreError:
try:
self.alert_audio_bridge.mixer.set_sound_devices(None, u'system_default', 0)
except SIPCoreError:
self.alert_audio_bridge.mixer.set_sound_devices(None, None, 0)
settings = SIPSimpleSettings()
settings.audio.input_device = self.voice_audio_bridge.mixer.input_device
settings.audio.output_device = self.voice_audio_bridge.mixer.output_device
settings.audio.alert_device = self.alert_audio_bridge.mixer.output_device
settings.save()
def _NH_DNSNameserversDidChange(self, notification):
if self.running:
self._nat_detect_channel.send(Command('detect_nat'))
engine = Engine()
engine.set_nameservers(notification.data.nameservers)
def _NH_SystemIPAddressDidChange(self, notification):
if self.running:
self._nat_detect_channel.send(Command('detect_nat'))
def _NH_SystemDidWakeUpFromSleep(self, notification):
if self.running and self._wakeup_timer is None:
def wakeup_action():
if self.running:
self._nat_detect_channel.send(Command('detect_nat'))
self._wakeup_timer = None
self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
def _NH_SIPEngineDetectedNATType(self, notification):
self._nat_detect_channel.send(Command('process_nat_detection', data=notification.data))

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 5:49 AM (1 d, 9 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408866
Default Alt Text
(27 KB)

Event Timeline