Page MenuHomePhabricator

No OneTemporary

This file is larger than 256 KB, so syntax highlighting was skipped.
diff --git a/sipsimple/account/bonjour/__init__.py b/sipsimple/account/bonjour/__init__.py
index fcd710d6..acdcc56a 100644
--- a/sipsimple/account/bonjour/__init__.py
+++ b/sipsimple/account/bonjour/__init__.py
@@ -1,506 +1,491 @@
# Copyright (C) 2008-2012 AG Projects. See LICENSE for details.
#
"""Implements Bonjour service handlers"""
__all__ = ['BonjourServices']
import re
import uuid
from threading import Lock
from weakref import WeakKeyDictionary
from application import log
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from eventlib import api, coros, proc
from eventlib.green import select
from twisted.internet import reactor
from zope.interface import implements
from sipsimple.account.bonjour import _bonjour
from sipsimple.core import FrozenSIPURI, SIPCoreError, NoGRUU
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.threading import call_in_twisted_thread, run_in_twisted_thread
from sipsimple.threading.green import Command
class RestartSelect(Exception): pass
class BonjourFile(object):
__instances__ = WeakKeyDictionary()
def __new__(cls, file):
if cls is BonjourFile:
raise TypeError("BonjourFile cannot be instantiated directly")
instance = cls.__instances__.get(file)
if instance is None:
instance = object.__new__(cls)
instance.file = file
instance.active = False
cls.__instances__[file] = instance
return instance
def fileno(self):
return self.file.fileno() if not self.closed else -1
def close(self):
self.file.close()
self.file = None
@property
def closed(self):
return self.file is None
@classmethod
def find_by_file(cls, file):
"""Return the instance matching the given DNSServiceRef file"""
try:
return cls.__instances__[file]
except KeyError:
raise KeyError("cannot find a %s matching the given DNSServiceRef file" % cls.__name__)
class BonjourDiscoveryFile(BonjourFile):
def __new__(cls, file, transport):
instance = BonjourFile.__new__(cls, file)
instance.transport = transport
return instance
class BonjourRegistrationFile(BonjourFile):
def __new__(cls, file, transport):
instance = BonjourFile.__new__(cls, file)
instance.transport = transport
return instance
class BonjourResolutionFile(BonjourFile):
def __new__(cls, file, discovery_file, service_description):
instance = BonjourFile.__new__(cls, file)
instance.discovery_file = discovery_file
instance.service_description = service_description
return instance
@property
def transport(self):
return self.discovery_file.transport
class BonjourServiceDescription(object):
def __init__(self, name, type, domain):
self.name = name
self.type = type
self.domain = domain
def __repr__(self):
return "%s(%r, %r, %r)" % (self.__class__.__name__, self.name, self.type, self.domain)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
if isinstance(other, BonjourServiceDescription):
return self.name==other.name and self.type==other.type and self.domain==other.domain
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
class BonjourNeighbourPresence(object):
def __init__(self, state, note):
self.state = state
self.note = note
class BonjourNeighbourRecord(object):
def __init__(self, service_description, host, txtrecord):
self.id = txtrecord.get('instance_id', None)
self.name = txtrecord.get('name', '').decode('utf-8') or None
self.host = re.match(r'^(?P<host>.*?)(\.local)?\.?$', host).group('host')
self.uri = FrozenSIPURI.parse(txtrecord.get('contact', service_description.name))
self.presence = BonjourNeighbourPresence(txtrecord.get('state', txtrecord.get('status', None)), txtrecord.get('note', '').decode('utf-8') or None) # status is read for legacy (remove later) -Dan
class BonjourServices(object):
implements(IObserver)
def __init__(self, account):
self.account = account
self._started = False
self._files = []
self._neighbours = {}
self._command_channel = coros.queue()
self._select_proc = None
self._discover_timer = None
self._register_timer = None
self._update_timer = None
- self._wakeup_timer = None
self._lock = Lock()
self.__dict__['presence_state'] = None
def start(self):
notification_center = NotificationCenter()
- notification_center.add_observer(self, name='SystemIPAddressDidChange')
- notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._select_proc = proc.spawn(self._process_files)
proc.spawn(self._handle_commands)
def stop(self):
notification_center = NotificationCenter()
- notification_center.remove_observer(self, name='SystemIPAddressDidChange')
- notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self._select_proc.kill()
self._command_channel.send_exception(api.GreenletExit)
def activate(self):
self._started = True
self._command_channel.send(Command('register'))
self._command_channel.send(Command('discover'))
def deactivate(self):
command = Command('stop')
self._command_channel.send(command)
command.wait()
self._started = False
def restart_discovery(self):
self._command_channel.send(Command('discover'))
def restart_registration(self):
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
def update_registrations(self):
self._command_channel.send(Command('update_registrations'))
def _get_presence_state(self):
return self.__dict__['presence_state']
def _set_presence_state(self, state):
if state is not None and not isinstance(state, BonjourPresenceState):
raise ValueError("state must be a BonjourPresenceState instance or None")
with self._lock:
old_state = self.__dict__['presence_state']
self.__dict__['presence_state'] = state
if state != old_state:
call_in_twisted_thread(self.update_registrations)
presence_state = property(_get_presence_state, _set_presence_state)
del _get_presence_state, _set_presence_state
def _register_cb(self, file, flags, error_code, name, regtype, domain):
notification_center = NotificationCenter()
file = BonjourRegistrationFile.find_by_file(file)
if error_code == _bonjour.kDNSServiceErr_NoError:
notification_center.post_notification('BonjourAccountRegistrationDidSucceed', sender=self.account, data=NotificationData(name=name, transport=file.transport))
else:
error = _bonjour.BonjourError(error_code)
notification_center.post_notification('BonjourAccountRegistrationDidFail', sender=self.account, data=NotificationData(reason=str(error), transport=file.transport))
self._files.remove(file)
self._select_proc.kill(RestartSelect)
file.close()
if self._register_timer is None:
self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register'))
def _browse_cb(self, file, flags, interface_index, error_code, service_name, regtype, reply_domain):
notification_center = NotificationCenter()
file = BonjourDiscoveryFile.find_by_file(file)
service_description = BonjourServiceDescription(service_name, regtype, reply_domain)
if error_code != _bonjour.kDNSServiceErr_NoError:
error = _bonjour.BonjourError(error_code)
notification_center.post_notification('BonjourAccountDiscoveryDidFail', sender=self.account, data=NotificationData(reason=str(error), transport=file.transport))
removed_files = [file] + [f for f in self._files if isinstance(f, BonjourResolutionFile) and f.discovery_file==file]
for f in removed_files:
self._files.remove(f)
self._select_proc.kill(RestartSelect)
for f in removed_files:
f.close()
if self._discover_timer is None:
self._discover_timer = reactor.callLater(1, self._command_channel.send, Command('discover'))
return
if reply_domain != 'local.':
return
if flags & _bonjour.kDNSServiceFlagsAdd:
try:
resolution_file = (f for f in self._files if isinstance(f, BonjourResolutionFile) and f.discovery_file==file and f.service_description==service_description).next()
except StopIteration:
try:
resolution_file = _bonjour.DNSServiceResolve(0, interface_index, service_name, regtype, reply_domain, self._resolve_cb)
except _bonjour.BonjourError, e:
notification_center.post_notification('BonjourAccountDiscoveryFailure', sender=self.account, data=NotificationData(error=str(e), transport=file.transport))
else:
resolution_file = BonjourResolutionFile(resolution_file, discovery_file=file, service_description=service_description)
self._files.append(resolution_file)
self._select_proc.kill(RestartSelect)
else:
try:
resolution_file = (f for f in self._files if isinstance(f, BonjourResolutionFile) and f.discovery_file==file and f.service_description==service_description).next()
except StopIteration:
pass
else:
self._files.remove(resolution_file)
self._select_proc.kill(RestartSelect)
resolution_file.close()
service_description = resolution_file.service_description
if service_description in self._neighbours:
record = self._neighbours.pop(service_description)
notification_center.post_notification('BonjourAccountDidRemoveNeighbour', sender=self.account, data=NotificationData(neighbour=service_description, record=record))
def _resolve_cb(self, file, flags, interface_index, error_code, fullname, host_target, port, txtrecord):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
file = BonjourResolutionFile.find_by_file(file)
if error_code == _bonjour.kDNSServiceErr_NoError:
service_description = file.service_description
try:
record = BonjourNeighbourRecord(service_description, host_target, _bonjour.TXTRecord.parse(txtrecord))
except SIPCoreError:
pass
else:
transport = record.uri.transport
supported_transport = transport in settings.sip.transport_list and (transport!='tls' or self.account.tls.certificate is not None)
if not supported_transport and service_description in self._neighbours:
record = self._neighbours.pop(service_description)
notification_center.post_notification('BonjourAccountDidRemoveNeighbour', sender=self.account, data=NotificationData(neighbour=service_description, record=record))
elif supported_transport:
try:
our_contact_uri = self.account.contact[NoGRUU, transport]
except KeyError:
return
if record.uri != our_contact_uri:
had_neighbour = service_description in self._neighbours
self._neighbours[service_description] = record
notification_name = 'BonjourAccountDidUpdateNeighbour' if had_neighbour else 'BonjourAccountDidAddNeighbour'
notification_data = NotificationData(neighbour=service_description, record=record)
notification_center.post_notification(notification_name, sender=self.account, data=notification_data)
else:
self._files.remove(file)
self._select_proc.kill(RestartSelect)
file.close()
error = _bonjour.BonjourError(error_code)
notification_center.post_notification('BonjourAccountDiscoveryFailure', sender=self.account, data=NotificationData(error=str(error), transport=file.transport))
# start a new resolve process here? -Dan
def _process_files(self):
while True:
try:
ready = select.select([f for f in self._files if not f.active and not f.closed], [], [])[0]
except RestartSelect:
continue
else:
for file in ready:
file.active = True
self._command_channel.send(Command('process_results', files=[f for f in ready if not f.closed]))
def _handle_commands(self):
while True:
command = self._command_channel.wait()
if self._started:
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_unregister(self, command):
if self._register_timer is not None and self._register_timer.active():
self._register_timer.cancel()
self._register_timer = None
if self._update_timer is not None and self._update_timer.active():
self._update_timer.cancel()
self._update_timer = None
old_files = []
for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile)):
old_files.append(file)
self._files.remove(file)
self._select_proc.kill(RestartSelect)
for file in old_files:
file.close()
notification_center = NotificationCenter()
for transport in set(file.transport for file in self._files):
notification_center.post_notification('BonjourAccountRegistrationDidEnd', sender=self.account, data=NotificationData(transport=transport))
command.signal()
def _CH_register(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._register_timer is not None and self._register_timer.active():
self._register_timer.cancel()
self._register_timer = None
supported_transports = set(transport for transport in settings.sip.transport_list if transport!='tls' or self.account.tls.certificate is not None)
registered_transports = set(file.transport for file in self._files if isinstance(file, BonjourRegistrationFile))
missing_transports = supported_transports - registered_transports
added_transports = set()
for transport in missing_transports:
notification_center.post_notification('BonjourAccountWillRegister', sender=self.account, data=NotificationData(transport=transport))
try:
contact = self.account.contact[NoGRUU, transport]
instance_id = str(uuid.UUID(settings.instance_id))
txtdata = dict(txtvers=1, name=self.account.display_name.encode('utf-8'), contact="<%s>" % str(contact), instance_id=instance_id)
state = self.account.presence_state
if self.account.presence.enabled and state is not None:
txtdata['state'] = state.state
txtdata['note'] = state.note.encode('utf-8')
file = _bonjour.DNSServiceRegister(name=str(contact),
regtype="_sipuri._%s" % (transport if transport == 'udp' else 'tcp'),
port=contact.port,
callBack=self._register_cb,
txtRecord=_bonjour.TXTRecord(items=txtdata))
except (_bonjour.BonjourError, KeyError), e:
notification_center.post_notification('BonjourAccountRegistrationDidFail', sender=self.account, data=NotificationData(reason=str(e), transport=transport))
else:
self._files.append(BonjourRegistrationFile(file, transport))
added_transports.add(transport)
if added_transports:
self._select_proc.kill(RestartSelect)
if added_transports != missing_transports:
self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register', command.event))
else:
command.signal()
def _CH_update_registrations(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._update_timer is not None and self._update_timer.active():
self._update_timer.cancel()
self._update_timer = None
available_transports = settings.sip.transport_list
old_files = []
for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile) and f.transport not in available_transports):
old_files.append(file)
self._files.remove(file)
self._select_proc.kill(RestartSelect)
for file in old_files:
file.close()
update_failure = False
for file in (f for f in self._files if isinstance(f, BonjourRegistrationFile)):
try:
contact = self.account.contact[NoGRUU, file.transport]
instance_id = str(uuid.UUID(settings.instance_id))
txtdata = dict(txtvers=1, name=self.account.display_name.encode('utf-8'), contact="<%s>" % str(contact), instance_id=instance_id)
state = self.account.presence_state
if self.account.presence.enabled and state is not None:
txtdata['state'] = state.state
txtdata['note'] = state.note.encode('utf-8')
_bonjour.DNSServiceUpdateRecord(file.file, None, flags=0, rdata=_bonjour.TXTRecord(items=txtdata), ttl=0)
except (_bonjour.BonjourError, KeyError), e:
notification_center.post_notification('BonjourAccountRegistrationUpdateDidFail', sender=self.account, data=NotificationData(reason=str(e), transport=file.transport))
update_failure = True
self._command_channel.send(Command('register'))
if update_failure:
self._update_timer = reactor.callLater(1, self._command_channel.send, Command('update_registrations', command.event))
else:
command.signal()
def _CH_discover(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._discover_timer is not None and self._discover_timer.active():
self._discover_timer.cancel()
self._discover_timer = None
supported_transports = set(transport for transport in settings.sip.transport_list if transport!='tls' or self.account.tls.certificate is not None)
discoverable_transports = set('tcp' if transport=='tls' else transport for transport in supported_transports)
old_files = []
for file in (f for f in self._files[:] if isinstance(f, (BonjourDiscoveryFile, BonjourResolutionFile)) and f.transport not in discoverable_transports):
old_files.append(file)
self._files.remove(file)
self._select_proc.kill(RestartSelect)
for file in old_files:
file.close()
for service_description in [service for service, record in self._neighbours.iteritems() if record.uri.transport not in supported_transports]:
record = self._neighbours.pop(service_description)
notification_center.post_notification('BonjourAccountDidRemoveNeighbour', sender=self.account, data=NotificationData(neighbour=service_description, record=record))
discovered_transports = set(file.transport for file in self._files if isinstance(file, BonjourDiscoveryFile))
missing_transports = discoverable_transports - discovered_transports
added_transports = set()
for transport in missing_transports:
notification_center.post_notification('BonjourAccountWillInitiateDiscovery', sender=self.account, data=NotificationData(transport=transport))
try:
file = _bonjour.DNSServiceBrowse(regtype="_sipuri._%s" % transport, callBack=self._browse_cb)
except _bonjour.BonjourError, e:
notification_center.post_notification('BonjourAccountDiscoveryDidFail', sender=self.account, data=NotificationData(reason=str(e), transport=transport))
else:
self._files.append(BonjourDiscoveryFile(file, transport))
added_transports.add(transport)
if added_transports:
self._select_proc.kill(RestartSelect)
if added_transports != missing_transports:
self._discover_timer = reactor.callLater(1, self._command_channel.send, Command('discover', command.event))
else:
command.signal()
def _CH_process_results(self, command):
for file in (f for f in command.files if not f.closed):
try:
_bonjour.DNSServiceProcessResult(file.file)
except:
# Should we close the file? The documentation doesn't say anything about this. -Luci
log.err()
for file in command.files:
file.active = False
self._files = [f for f in self._files if not f.closed]
self._select_proc.kill(RestartSelect)
def _CH_stop(self, command):
if self._discover_timer is not None and self._discover_timer.active():
self._discover_timer.cancel()
self._discover_timer = None
if self._register_timer is not None and self._register_timer.active():
self._register_timer.cancel()
self._register_timer = None
if self._update_timer is not None and self._update_timer.active():
self._update_timer.cancel()
self._update_timer = None
- if self._wakeup_timer is not None and self._wakeup_timer.active():
- self._wakeup_timer.cancel()
- self._wakeup_timer = None
files = self._files
neighbours = self._neighbours
self._files = []
self._select_proc.kill(RestartSelect)
self._neighbours = {}
for file in files:
file.close()
notification_center = NotificationCenter()
for neighbour, record in neighbours.iteritems():
notification_center.post_notification('BonjourAccountDidRemoveNeighbour', sender=self.account, data=NotificationData(neighbour=neighbour, record=record))
for transport in set(file.transport for file in files):
notification_center.post_notification('BonjourAccountRegistrationDidEnd', sender=self.account, data=NotificationData(transport=transport))
command.signal()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
- def _NH_SystemIPAddressDidChange(self, notification):
+ def _NH_NetworkConditionsDidChange(self, notification):
if self._files:
self.restart_discovery()
self.restart_registration()
- def _NH_SystemDidWakeUpFromSleep(self, notification):
- if self._wakeup_timer is None:
- def wakeup_action():
- if self._files:
- self.restart_discovery()
- self.restart_registration()
- self._wakeup_timer = None
- self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
-
class BonjourPresenceState(object):
def __init__(self, state, note=None):
self.state = state
self.note = note or u''
def __eq__(self, other):
if isinstance(other, BonjourPresenceState):
return self.state == other.state and self.note == other.note
return NotImplemented
def __ne__(self, other):
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
diff --git a/sipsimple/account/publication.py b/sipsimple/account/publication.py
index 7f171154..152d9d38 100644
--- a/sipsimple/account/publication.py
+++ b/sipsimple/account/publication.py
@@ -1,409 +1,388 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details.
#
"""Implements the publisher handlers"""
__all__ = ['Publisher', 'PresencePublisher', 'DialogPublisher']
import random
from abc import ABCMeta, abstractproperty
from threading import Lock
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.types import MarkerType
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implements
from sipsimple.core import FromHeader, Publication, PublicationETagError, RouteHeader, SIPURI, SIPCoreError
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads.dialoginfo import DialogInfoDocument
from sipsimple.payloads.pidf import PIDFDocument
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
Command.register_defaults('publish', refresh_interval=None)
class SameState: __metaclass__ = MarkerType
class SIPPublicationDidFail(Exception):
def __init__(self, data):
self.data = data
class SIPPublicationDidNotEnd(Exception):
def __init__(self, data):
self.data = data
class PublicationError(Exception):
def __init__(self, error, retry_after, refresh_interval=None):
self.error = error
self.retry_after = retry_after
self.refresh_interval = refresh_interval
class PublisherNickname(dict):
def __missing__(self, name):
return self.setdefault(name, name[:-9] if name.endswith('Publisher') else name)
def __get__(self, obj, objtype):
return self[objtype.__name__]
def __set__(self, obj, value):
raise AttributeError('cannot set attribute')
def __delete__(self, obj):
raise AttributeError('cannot delete attribute')
class Publisher(object):
__metaclass__ = ABCMeta
__nickname__ = PublisherNickname()
__transports__ = frozenset(['tls', 'tcp', 'udp'])
implements(IObserver)
def __init__(self, account):
self.account = account
self.started = False
self.active = False
self.publishing = False
self._lock = Lock()
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._publication = None
self._dns_wait = 1
self._publish_wait = 1
self._publication_timer = None
- self._wakeup_timer = None
self.__dict__['state'] = None
@abstractproperty
def event(self):
return None
@abstractproperty
def payload_type(self):
return None
@property
def extra_headers(self):
return []
def _get_state(self):
return self.__dict__['state']
def _set_state(self, state):
if state is not None and not isinstance(state, self.payload_type.root_element):
raise ValueError("state must be a %s document or None" % self.payload_type.root_element.__name__)
with self._lock:
old_state = self.__dict__['state']
self.__dict__['state'] = state
if state == old_state:
return
self._publish(state)
state = property(_get_state, _set_state)
del _get_state, _set_state
def start(self):
if self.started:
return
self.started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillStart', sender=self)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
- notification_center.add_observer(self, name='DNSNameserversDidChange')
- notification_center.add_observer(self, name='SystemIPAddressDidChange')
- notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
notification_center.post_notification(self.__class__.__name__ + 'DidStart', sender=self)
notification_center.remove_observer(self, sender=self)
def stop(self):
if not self.started:
return
self.started = False
self.active = False
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillEnd', sender=self)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
- notification_center.remove_observer(self, name='DNSNameserversDidChange')
- notification_center.remove_observer(self, name='SystemIPAddressDidChange')
- notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.remove_observer(self, name='NetworkConditionsDidChange')
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self._command_proc = None
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
notification_center.post_notification(self.__class__.__name__ + 'DidEnd', sender=self)
notification_center.remove_observer(self, sender=self)
def activate(self):
if not self.started:
raise RuntimeError("not started")
self.active = True
self._command_channel.send(Command('publish', state=self.state))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidActivate', sender=self)
def deactivate(self):
if not self.started:
raise RuntimeError("not started")
self.active = False
self._command_channel.send(Command('unpublish'))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
@run_in_twisted_thread
def _publish(self, state):
if not self.active:
return
if state is None:
self._command_channel.send(Command('unpublish'))
else:
self._command_channel.send(Command('publish', state=state))
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_publish(self, command):
if command.state is None or self._publication is None and command.state is SameState:
command.signal()
return
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._publication_timer is not None and self._publication_timer.active():
self._publication_timer.cancel()
self._publication_timer = None
if self._publication is None:
duration = command.refresh_interval or self.account.sip.publish_interval
from_header = FromHeader(self.account.uri, self.account.display_name)
self._publication = Publication(from_header, self.event, self.payload_type.content_type, credentials=self.account.credentials, duration=duration, extra_headers=self.extra_headers)
notification_center.add_observer(self, sender=self._publication)
notification_center.post_notification(self.__class__.__name__ + 'WillPublish', sender=self, data=NotificationData(state=command.state, duration=duration))
else:
notification_center.post_notification(self.__class__.__name__ + 'WillRefresh', sender=self, data=NotificationData(state=command.state))
try:
# Lookup routes
valid_transports = self.__transports__.intersection(settings.sip.transport_list)
if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in valid_transports:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
else:
uri = SIPURI(host=self.account.id.domain)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, valid_transports).wait()
except DNSLookupError, e:
retry_after = random.uniform(self._dns_wait, 2*self._dns_wait)
self._dns_wait = limit(2*self._dns_wait, max=30)
raise PublicationError('DNS lookup failed: %s' % e, retry_after=retry_after)
else:
self._dns_wait = 1
body = None if command.state is SameState else command.state.toxml()
# Publish by trying each route in turn
publish_timeout = time() + 30
for route in routes:
remaining_time = publish_timeout-time()
if remaining_time > 0:
try:
try:
self._publication.publish(body, RouteHeader(route.uri), timeout=limit(remaining_time, min=1, max=10))
except PublicationETagError:
state = self.state # access self.state only once to avoid race conditions
if state is not None:
self._publication.publish(state.toxml(), RouteHeader(route.uri), timeout=limit(remaining_time, min=1, max=10))
else:
command.signal()
return
except SIPCoreError:
raise PublicationError('Internal error', retry_after=5)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPPublicationDidSucceed':
break
except SIPPublicationDidFail, e:
if e.data.code == 407:
# Authentication failed, so retry the publication in some time
raise PublicationError('Authentication failed', retry_after=random.uniform(60, 120))
elif e.data.code == 412:
raise PublicationError('Conditional request failed', retry_after=0)
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > self.account.sip.publish_interval:
refresh_interval = e.data.min_expires
else:
refresh_interval = None
raise PublicationError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval)
elif e.data.code in (405, 406, 489):
raise PublicationError('Method or event not supported', retry_after=3600)
else:
# Otherwise just try the next route
continue
else:
self.publishing = True
self._publish_wait = 1
command.signal()
break
else:
# There are no more routes to try, reschedule the publication
retry_after = random.uniform(self._publish_wait, 2*self._publish_wait)
self._publish_wait = limit(self._publish_wait*2, max=30)
raise PublicationError('No more routes to try', retry_after=retry_after)
except PublicationError, e:
self.publishing = False
notification_center.remove_observer(self, sender=self._publication)
def publish():
if self.active:
self._command_channel.send(Command('publish', event=command.event, state=self.state, refresh_interval=e.refresh_interval))
else:
command.signal()
self._publication_timer = None
self._publication_timer = reactor.callLater(e.retry_after, publish)
self._publication = None
notification_center.post_notification(self.__nickname__ + 'PublicationDidFail', sender=self, data=NotificationData(reason=e.error))
else:
notification_center.post_notification(self.__nickname__ + 'PublicationDidSucceed', sender=self)
def _CH_unpublish(self, command):
# Cancel any timer which would restart the publication process
if self._publication_timer is not None and self._publication_timer.active():
self._publication_timer.cancel()
self._publication_timer = None
- if self._wakeup_timer is not None and self._wakeup_timer.active():
- self._wakeup_timer.cancel()
- self._wakeup_timer = None
publishing = self.publishing
self.publishing = False
if self._publication is not None:
notification_center = NotificationCenter()
if publishing:
self._publication.end(timeout=2)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPPublicationDidEnd':
break
except (SIPPublicationDidFail, SIPPublicationDidNotEnd):
notification_center.post_notification(self.__nickname__ + 'PublicationDidNotEnd', sender=self)
else:
notification_center.post_notification(self.__nickname__ + 'PublicationDidEnd', sender=self)
notification_center.remove_observer(self, sender=self._publication)
self._publication = None
command.signal()
def _CH_terminate(self, command):
self._CH_unpublish(command)
raise proc.ProcExit
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPPublicationDidSucceed(self, notification):
if notification.sender is self._publication:
self._data_channel.send(notification)
def _NH_SIPPublicationDidFail(self, notification):
if notification.sender is self._publication:
self._data_channel.send_exception(SIPPublicationDidFail(notification.data))
def _NH_SIPPublicationDidEnd(self, notification):
if notification.sender is self._publication:
self._data_channel.send(notification)
def _NH_SIPPublicationDidNotEnd(self, notification):
if notification.sender is self._publication:
self._data_channel.send_exception(SIPPublicationDidNotEnd(notification.data))
def _NH_SIPPublicationWillExpire(self, notification):
self._publish(SameState)
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'presence.enabled' in notification.data.modified:
if self.account.presence.enabled:
self.activate()
else:
self.deactivate()
elif self.active and set(['__id__', 'auth.password', 'auth.username', 'sip.outbound_proxy', 'sip.transport_list', 'sip.publish_interval']).intersection(notification.data.modified):
self._command_channel.send(Command('unpublish'))
self._command_channel.send(Command('publish', state=self.state))
- def _NH_DNSNameserversDidChange(self, notification):
+ def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._command_channel.send(Command('publish', state=self.state))
- def _NH_SystemIPAddressDidChange(self, notification):
- if self.active:
- self._command_channel.send(Command('publish', state=self.state))
-
- def _NH_SystemDidWakeUpFromSleep(self, notification):
- if self._wakeup_timer is None:
- def wakeup_action():
- if self.active:
- self._command_channel.send(Command('unpublish'))
- self._command_channel.send(Command('publish', state=self.state))
- self._wakeup_timer = None
- self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
-
class PresencePublisher(Publisher):
"""A publisher for presence state"""
@property
def event(self):
return 'presence'
@property
def payload_type(self):
return PIDFDocument
def _NH_PresencePublisherDidStart(self, notification):
if self.account.presence.enabled:
self.activate()
class DialogPublisher(Publisher):
"""A publisher for dialog info state"""
@property
def event(self):
return 'dialog'
@property
def payload_type(self):
return DialogInfoDocument
def _NH_DialogPublisherDidStart(self, notification):
if self.account.presence.enabled:
self.activate()
diff --git a/sipsimple/account/registration.py b/sipsimple/account/registration.py
index 9b31ccb7..658c00ba 100644
--- a/sipsimple/account/registration.py
+++ b/sipsimple/account/registration.py
@@ -1,322 +1,301 @@
# Copyright (C) 2008-2012 AG Projects. See LICENSE for details.
#
"""Implements the registration handler"""
__all__ = ['Registrar']
import random
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implements
from sipsimple.core import ContactHeader, FromHeader, Header, Registration, RouteHeader, SIPURI, SIPCoreError, NoGRUU
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
Command.register_defaults('register', refresh_interval=None)
class SIPRegistrationDidFail(Exception):
def __init__(self, data):
self.data = data
class SIPRegistrationDidNotEnd(Exception):
def __init__(self, data):
self.data = data
class RegistrationError(Exception):
def __init__(self, error, retry_after, refresh_interval=None):
self.error = error
self.retry_after = retry_after
self.refresh_interval = refresh_interval
class Registrar(object):
implements(IObserver)
def __init__(self, account):
self.account = account
self.started = False
self.active = False
self.registered = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._registration = None
self._dns_wait = 1
self._register_wait = 1
self._registration_timer = None
- self._wakeup_timer = None
def start(self):
if self.started:
return
self.started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
- notification_center.add_observer(self, name='DNSNameserversDidChange')
- notification_center.add_observer(self, name='SystemIPAddressDidChange')
- notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
if self.account.sip.register:
self.activate()
def stop(self):
if not self.started:
return
self.started = False
self.active = False
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
- notification_center.remove_observer(self, name='DNSNameserversDidChange')
- notification_center.remove_observer(self, name='SystemIPAddressDidChange')
- notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.remove_observer(self, name='NetworkConditionsDidChange')
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self._command_proc = None
def activate(self):
if not self.started:
raise RuntimeError("not started")
self.active = True
self._command_channel.send(Command('register'))
def deactivate(self):
if not self.started:
raise RuntimeError("not started")
self.active = False
self._command_channel.send(Command('unregister'))
def reregister(self):
if self.active:
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_register(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._registration_timer is not None and self._registration_timer.active():
self._registration_timer.cancel()
self._registration_timer = None
# Initialize the registration
if self._registration is None:
duration = command.refresh_interval or self.account.sip.register_interval
self._registration = Registration(FromHeader(self.account.uri, self.account.display_name), credentials=self.account.credentials, duration=duration, extra_headers=[Header('Supported', 'gruu')])
notification_center.add_observer(self, sender=self._registration)
notification_center.post_notification('SIPAccountWillRegister', sender=self.account)
else:
notification_center.post_notification('SIPAccountRegistrationWillRefresh', sender=self.account)
try:
# Lookup routes
if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
else:
uri = SIPURI(host=self.account.id.domain)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError, e:
retry_after = random.uniform(self._dns_wait, 2*self._dns_wait)
self._dns_wait = limit(2*self._dns_wait, max=30)
raise RegistrationError('DNS lookup failed: %s' % e, retry_after=retry_after)
else:
self._dns_wait = 1
# Register by trying each route in turn
register_timeout = time() + 30
for route in routes:
remaining_time = register_timeout-time()
if remaining_time > 0:
try:
contact_uri = self.account.contact[NoGRUU, route]
except KeyError:
continue
contact_header = ContactHeader(contact_uri)
contact_header.parameters['+sip.instance'] = '"<%s>"' % settings.instance_id
if self.account.nat_traversal.use_ice:
contact_header.parameters['+sip.ice'] = None
route_header = RouteHeader(route.uri)
try:
self._registration.register(contact_header, route_header, timeout=limit(remaining_time, min=1, max=10))
except SIPCoreError:
raise RegistrationError('Internal error', retry_after=5)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPRegistrationDidSucceed':
break
except SIPRegistrationDidFail, e:
notification_data = NotificationData(code=e.data.code, reason=e.data.reason, registration=self._registration, registrar=route)
notification_center.post_notification('SIPAccountRegistrationGotAnswer', sender=self.account, data=notification_data)
if e.data.code == 401:
# Authentication failed, so retry the registration in some time
raise RegistrationError('Authentication failed', retry_after=random.uniform(60, 120))
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > self.account.sip.register_interval:
refresh_interval = e.data.min_expires
else:
refresh_interval = None
raise RegistrationError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval)
else:
# Otherwise just try the next route
continue
else:
notification_data = NotificationData(code=notification.data.code, reason=notification.data.reason, registration=self._registration, registrar=route)
notification_center.post_notification('SIPAccountRegistrationGotAnswer', sender=self.account, data=notification_data)
self.registered = True
# Save GRUU
try:
header = next(header for header in notification.data.contact_header_list if header.parameters.get('+sip.instance', '').strip('"<>') == settings.instance_id)
except StopIteration:
self.account.contact.public_gruu = None
self.account.contact.temporary_gruu = None
else:
public_gruu = header.parameters.get('pub-gruu', None)
temporary_gruu = header.parameters.get('temp-gruu', None)
try:
self.account.contact.public_gruu = SIPURI.parse(public_gruu.strip('"'))
except (AttributeError, SIPCoreError):
self.account.contact.public_gruu = None
try:
self.account.contact.temporary_gruu = SIPURI.parse(temporary_gruu.strip('"'))
except (AttributeError, SIPCoreError):
self.account.contact.temporary_gruu = None
notification_data = NotificationData(contact_header=notification.data.contact_header,
contact_header_list=notification.data.contact_header_list,
expires=notification.data.expires_in, registrar=route)
notification_center.post_notification('SIPAccountRegistrationDidSucceed', sender=self.account, data=notification_data)
self._register_wait = 1
command.signal()
break
else:
# There are no more routes to try, reschedule the registration
retry_after = random.uniform(self._register_wait, 2*self._register_wait)
self._register_wait = limit(self._register_wait*2, max=30)
raise RegistrationError('No more routes to try', retry_after=retry_after)
except RegistrationError, e:
self.registered = False
notification_center.remove_observer(self, sender=self._registration)
notification_center.post_notification('SIPAccountRegistrationDidFail', sender=self.account, data=NotificationData(error=e.error, retry_after=e.retry_after))
def register():
if self.active:
self._command_channel.send(Command('register', command.event, refresh_interval=e.refresh_interval))
self._registration_timer = None
self._registration_timer = reactor.callLater(e.retry_after, register)
self._registration = None
self.account.contact.public_gruu = None
self.account.contact.temporary_gruu = None
def _CH_unregister(self, command):
# Cancel any timer which would restart the registration process
if self._registration_timer is not None and self._registration_timer.active():
self._registration_timer.cancel()
self._registration_timer = None
- if self._wakeup_timer is not None and self._wakeup_timer.active():
- self._wakeup_timer.cancel()
- self._wakeup_timer = None
registered = self.registered
self.registered = False
if self._registration is not None:
notification_center = NotificationCenter()
if registered:
self._registration.end(timeout=2)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPRegistrationDidEnd':
break
except (SIPRegistrationDidFail, SIPRegistrationDidNotEnd), e:
notification_center.post_notification('SIPAccountRegistrationDidNotEnd', sender=self.account, data=NotificationData(code=e.data.code, reason=e.data.reason,
registration=self._registration))
else:
notification_center.post_notification('SIPAccountRegistrationDidEnd', sender=self.account, data=NotificationData(registration=self._registration))
notification_center.remove_observer(self, sender=self._registration)
self._registration = None
self.account.contact.public_gruu = None
self.account.contact.temporary_gruu = None
command.signal()
def _CH_terminate(self, command):
self._CH_unregister(command)
raise proc.ProcExit
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPRegistrationDidSucceed(self, notification):
if notification.sender is self._registration:
self._data_channel.send(notification)
def _NH_SIPRegistrationDidFail(self, notification):
if notification.sender is self._registration:
self._data_channel.send_exception(SIPRegistrationDidFail(notification.data))
def _NH_SIPRegistrationDidEnd(self, notification):
if notification.sender is self._registration:
self._data_channel.send(notification)
def _NH_SIPRegistrationDidNotEnd(self, notification):
if notification.sender is self._registration:
self._data_channel.send_exception(SIPRegistrationDidNotEnd(notification.data))
def _NH_SIPRegistrationWillExpire(self, notification):
if self.active:
self._command_channel.send(Command('register'))
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'sip.register' in notification.data.modified:
if self.account.sip.register:
self.activate()
else:
self.deactivate()
elif self.active and set(['__id__', 'auth.password', 'auth.username', 'nat_traversal.use_ice', 'sip.outbound_proxy', 'sip.transport_list', 'sip.register_interval']).intersection(notification.data.modified):
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
- def _NH_DNSNameserversDidChange(self, notification):
+ def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._command_channel.send(Command('register'))
- def _NH_SystemIPAddressDidChange(self, notification):
- if self.active:
- self._command_channel.send(Command('register'))
-
- def _NH_SystemDidWakeUpFromSleep(self, notification):
- if self._wakeup_timer is None:
- def wakeup_action():
- if self.active:
- self._command_channel.send(Command('register'))
- self._wakeup_timer = None
- self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
-
-
diff --git a/sipsimple/account/subscription.py b/sipsimple/account/subscription.py
index 9bc10bb2..e9794705 100644
--- a/sipsimple/account/subscription.py
+++ b/sipsimple/account/subscription.py
@@ -1,518 +1,498 @@
# Copyright (C) 2008-2012 AG Projects. See LICENSE for details.
#
"""Implements the subscription handlers"""
__all__ = ['Subscriber', 'MWISubscriber', 'PresenceWinfoSubscriber', 'DialogWinfoSubscriber', 'PresenceSubscriber', 'SelfPresenceSubscriber', 'DialogSubscriber']
import random
from abc import ABCMeta, abstractproperty
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implements
from sipsimple.core import ContactHeader, FromHeader, Header, RouteHeader, SIPURI, Subscription, ToHeader, SIPCoreError, NoGRUU
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
Command.register_defaults('subscribe', refresh_interval=None)
class SIPSubscriptionDidFail(Exception):
def __init__(self, data):
self.data = data
class SubscriptionError(Exception):
def __init__(self, error, retry_after, refresh_interval=None):
self.error = error
self.retry_after = retry_after
self.refresh_interval = refresh_interval
class InterruptSubscription(Exception): pass
class TerminateSubscription(Exception): pass
class Content(object):
def __init__(self, body, type):
self.body = body
self.type = type
class SubscriberNickname(dict):
def __missing__(self, name):
return self.setdefault(name, name[:-10] if name.endswith('Subscriber') else name)
def __get__(self, obj, objtype):
return self[objtype.__name__]
def __set__(self, obj, value):
raise AttributeError('cannot set attribute')
def __delete__(self, obj):
raise AttributeError('cannot delete attribute')
class Subscriber(object):
__metaclass__ = ABCMeta
__nickname__ = SubscriberNickname()
__transports__ = frozenset(['tls', 'tcp', 'udp'])
implements(IObserver)
def __init__(self, account):
self.account = account
self.started = False
self.active = False
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._subscription = None
self._subscription_proc = None
self._subscription_timer = None
- self._wakeup_timer = None
@abstractproperty
def event(self):
return None
@property
def subscription_uri(self):
return self.account.id
@property
def content(self):
return Content(None, None)
@property
def extra_headers(self):
return []
def start(self):
if self.started:
return
self.started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillStart', sender=self)
- notification_center.add_observer(self, name='DNSNameserversDidChange')
- notification_center.add_observer(self, name='SystemIPAddressDidChange')
- notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
notification_center.post_notification(self.__class__.__name__ + 'DidStart', sender=self)
notification_center.remove_observer(self, sender=self)
def stop(self):
if not self.started:
return
self.started = False
self.active = False
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillEnd', sender=self)
- notification_center.remove_observer(self, name='DNSNameserversDidChange')
- notification_center.remove_observer(self, name='SystemIPAddressDidChange')
- notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.remove_observer(self, name='NetworkConditionsDidChange')
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self._command_proc = None
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
notification_center.post_notification(self.__class__.__name__ + 'DidEnd', sender=self)
notification_center.remove_observer(self, sender=self)
def activate(self):
if not self.started:
raise RuntimeError("not started")
self.active = True
self._command_channel.send(Command('subscribe'))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidActivate', sender=self)
def deactivate(self):
if not self.started:
raise RuntimeError("not started")
self.active = False
self._command_channel.send(Command('unsubscribe'))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
def resubscribe(self):
if self.active:
self._command_channel.send(Command('subscribe'))
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_subscribe(self, command):
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._subscription_proc = proc.spawn(self._subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
- if self._wakeup_timer is not None and self._wakeup_timer.active():
- self._wakeup_timer.cancel()
- self._wakeup_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._subscription_proc = None
command.signal()
def _CH_terminate(self, command):
self._CH_unsubscribe(command)
raise proc.ProcExit
def _subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
subscription_uri = self.subscription_uri
refresh_interval = command.refresh_interval or self.account.sip.subscribe_interval
valid_transports = self.__transports__.intersection(settings.sip.transport_list)
try:
# Lookup routes
if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in valid_transports:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
elif self.account.sip.always_use_my_proxy:
uri = SIPURI(host=self.account.id.domain)
else:
uri = SIPURI(host=subscription_uri.domain)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, valid_transports).wait()
except DNSLookupError, e:
raise SubscriptionError('DNS lookup failed: %s' % e, retry_after=random.uniform(15, 30))
subscription_uri = SIPURI(user=subscription_uri.username, host=subscription_uri.domain)
content = self.content
timeout = time() + 30
for route in routes:
remaining_time = timeout - time()
if remaining_time > 0:
try:
contact_uri = self.account.contact[NoGRUU, route]
except KeyError:
continue
subscription = Subscription(subscription_uri, FromHeader(self.account.uri, self.account.display_name),
ToHeader(subscription_uri),
ContactHeader(contact_uri),
self.event,
RouteHeader(route.uri),
credentials=self.account.credentials,
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(body=content.body, content_type=content.type, extra_headers=self.extra_headers, timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=subscription)
raise SubscriptionError('Internal error', retry_after=5)
self._subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPSubscriptionDidStart':
break
except SIPSubscriptionDidFail, e:
notification_center.remove_observer(self, sender=subscription)
self._subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
raise SubscriptionError('Authentication failed', retry_after=random.uniform(60, 120))
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > self.account.sip.subscribe_interval:
refresh_interval = e.data.min_expires
else:
refresh_interval = None
raise SubscriptionError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval)
elif e.data.code in (405, 406, 489):
raise SubscriptionError('Method or event not supported', retry_after=3600)
elif e.data.code == 1400:
raise SubscriptionError(e.data.reason, retry_after=3600)
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, reschedule the subscription
raise SubscriptionError('No more routes to try', retry_after=random.uniform(60, 180))
# At this point it is subscribed. Handle notifications and ending/failures.
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidStart', sender=self)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPSubscriptionGotNotify':
notification_center.post_notification(self.__nickname__ + 'SubscriptionGotNotify', sender=self, data=notification.data)
elif notification.name == 'SIPSubscriptionDidEnd':
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='remote'))
if self.active:
self._command_channel.send(Command('subscribe'))
break
except SIPSubscriptionDidFail:
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidFail', sender=self)
if self.active:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._subscription)
except InterruptSubscription, e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
notification_center.remove_observer(self, sender=self._subscription)
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
finally:
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='local'))
except TerminateSubscription, e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._subscription)
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='local'))
except SubscriptionError, e:
def subscribe():
if self.active:
self._command_channel.send(Command('subscribe', command.event, refresh_interval=e.refresh_interval))
self._subscription_timer = None
self._subscription_timer = reactor.callLater(e.retry_after, subscribe)
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidFail', sender=self)
finally:
self.subscribed = False
self._subscription = None
self._subscription_proc = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
if notification.sender is self._subscription:
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
if notification.sender is self._subscription:
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
if notification.sender is self._subscription:
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
if notification.sender is self._subscription:
self._data_channel.send(notification)
- def _NH_DNSNameserversDidChange(self, notification):
+ def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._command_channel.send(Command('subscribe'))
- def _NH_SystemIPAddressDidChange(self, notification):
- if self.active:
- self._command_channel.send(Command('subscribe'))
-
- def _NH_SystemDidWakeUpFromSleep(self, notification):
- if self._wakeup_timer is None:
- def wakeup_action():
- if self.active:
- self._command_channel.send(Command('subscribe'))
- self._wakeup_timer = None
- self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
-
class MWISubscriber(Subscriber):
"""Message Waiting Indicator subscriber"""
@property
def event(self):
return 'message-summary'
@property
def subscription_uri(self):
return self.account.message_summary.voicemail_uri or self.account.id
def _NH_MWISubscriberWillStart(self, notification):
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_MWISubscriberWillEnd(self, notification):
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_MWISubscriberDidStart(self, notification):
if self.account.message_summary.enabled:
self.activate()
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'message_summary.enabled' in notification.data.modified:
if self.account.message_summary.enabled:
self.activate()
else:
self.deactivate()
elif self.active and set(['__id__', 'auth.password', 'auth.username', 'message_summary.voicemail_uri', 'sip.always_use_my_proxy', 'sip.outbound_proxy',
'sip.subscribe_interval', 'sip.transport_list']).intersection(notification.data.modified):
self._command_channel.send(Command('subscribe'))
class AbstractPresenceSubscriber(Subscriber):
"""Abstract class defining behavior for all presence subscribers"""
__transports__ = frozenset(['tls', 'tcp'])
def _NH_AbstractPresenceSubscriberWillStart(self, notification):
notification.center.add_observer(self, name='SIPAccountDidDiscoverXCAPSupport', sender=self.account)
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_AbstractPresenceSubscriberWillEnd(self, notification):
notification.center.remove_observer(self, name='SIPAccountDidDiscoverXCAPSupport', sender=self.account)
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_AbstractPresenceSubscriberDidStart(self, notification):
if self.account.presence.enabled and self.account.xcap.discovered:
self.activate()
def _NH_SIPAccountDidDiscoverXCAPSupport(self, notification):
if self.account.presence.enabled and not self.active:
self.activate()
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started or not self.account.xcap.discovered:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'presence.enabled' in notification.data.modified:
if self.account.presence.enabled:
self.activate()
else:
self.deactivate()
elif self.active and set(['__id__', 'auth.password', 'auth.username', 'sip.always_use_my_proxy', 'sip.outbound_proxy',
'sip.subscribe_interval', 'sip.transport_list']).intersection(notification.data.modified):
self._command_channel.send(Command('subscribe'))
class PresenceWinfoSubscriber(AbstractPresenceSubscriber):
"""Presence Watcher Info subscriber"""
_NH_PresenceWinfoSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_PresenceWinfoSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_PresenceWinfoSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'presence.winfo'
class DialogWinfoSubscriber(AbstractPresenceSubscriber):
"""Dialog Watcher Info subscriber"""
_NH_DialogWinfoSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_DialogWinfoSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_DialogWinfoSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'dialog.winfo'
class PresenceSubscriber(AbstractPresenceSubscriber):
"""Presence subscriber"""
_NH_PresenceSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_PresenceSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_PresenceSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'presence'
@property
def subscription_uri(self):
return self.account.xcap_manager.rls_presence_uri
@property
def extra_headers(self):
return [Header('Supported', 'eventlist')]
class SelfPresenceSubscriber(AbstractPresenceSubscriber):
"""Self presence subscriber"""
_NH_SelfPresenceSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_SelfPresenceSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_SelfPresenceSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'presence'
@property
def subscription_uri(self):
return self.account.id
class DialogSubscriber(AbstractPresenceSubscriber):
"""Dialog subscriber"""
_NH_DialogSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_DialogSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_DialogSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'dialog'
@property
def subscription_uri(self):
return self.account.xcap_manager.rls_dialog_uri
@property
def extra_headers(self):
return [Header('Supported', 'eventlist')]
diff --git a/sipsimple/application.py b/sipsimple/application.py
index c60ec7c1..c468a41a 100644
--- a/sipsimple/application.py
+++ b/sipsimple/application.py
@@ -1,436 +1,454 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Implements a high-level application responsable for starting and stopping
various sub-systems required to implement a fully featured SIP User Agent
application.
"""
from __future__ import absolute_import
__all__ = ["SIPApplication"]
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.descriptor import classproperty
from application.python.types import Singleton
from eventlib import coros, proc
from operator import attrgetter
from threading import RLock, Thread
from twisted.internet import reactor
from uuid import uuid4
from xcaplib import client as xcap_client
from zope.interface import implements
from sipsimple.account import AccountManager
from sipsimple.addressbook import AddressbookManager
from sipsimple.audio import AudioDevice, RootAudioBridge
from sipsimple.configuration import ConfigurationManager
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import AudioMixer, Engine, SIPCoreError
from sipsimple.lookup import DNSManager
from sipsimple.session import SessionManager
from sipsimple.storage import ISIPSimpleStorage
from sipsimple.threading import ThreadManager, run_in_thread, run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
class ApplicationAttribute(object):
def __init__(self, value):
self.value = value
def __get__(self, obj, objtype):
return self.value
def __set__(self, obj, value):
self.value = value
def __delete__(self, obj):
raise AttributeError('cannot delete attribute')
class SIPApplication(object):
__metaclass__ = Singleton
implements(IObserver)
storage = ApplicationAttribute(value=None)
state = ApplicationAttribute(value=None)
end_reason = ApplicationAttribute(value=None)
alert_audio_device = ApplicationAttribute(value=None)
alert_audio_bridge = ApplicationAttribute(value=None)
voice_audio_device = ApplicationAttribute(value=None)
voice_audio_bridge = ApplicationAttribute(value=None)
_channel = ApplicationAttribute(value=coros.queue())
- _wakeup_timer = ApplicationAttribute(value=None)
_lock = ApplicationAttribute(value=RLock())
+ _timer = ApplicationAttribute(value=None)
engine = Engine()
running = classproperty(lambda cls: cls.state == 'started')
alert_audio_mixer = classproperty(lambda cls: cls.alert_audio_bridge.mixer if cls.alert_audio_bridge else None)
voice_audio_mixer = classproperty(lambda cls: cls.voice_audio_bridge.mixer if cls.voice_audio_bridge else None)
def start(self, storage):
if not ISIPSimpleStorage.providedBy(storage):
raise TypeError("storage must implement the ISIPSimpleStorage interface")
with self._lock:
if self.state is not None:
raise RuntimeError("SIPApplication cannot be started from '%s' state" % self.state)
self.state = 'starting'
self.storage = storage
thread_manager = ThreadManager()
thread_manager.start()
configuration_manager = ConfigurationManager()
addressbook_manager = AddressbookManager()
account_manager = AccountManager()
# load configuration
try:
configuration_manager.start()
SIPSimpleSettings()
account_manager.load()
addressbook_manager.load()
except:
self.state = None
self.storage = None
raise
# start the reactor thread
self.thread = Thread(name='Reactor Thread', target=self._run_reactor)
self.thread.start()
def stop(self):
with self._lock:
if self.state in (None, 'stopping', 'stopped'):
return
prev_state = self.state
self.state = 'stopping'
self.end_reason = 'application request'
notification_center = NotificationCenter()
notification_center.post_notification('SIPApplicationWillEnd', sender=self)
if prev_state != 'starting':
self._shutdown_subsystems()
def _run_reactor(self):
from eventlib.twistedutil import join_reactor
notification_center = NotificationCenter()
self._initialize_subsystems()
reactor.run(installSignalHandlers=False)
self.state = 'stopped'
notification_center.post_notification('SIPApplicationDidEnd', sender=self, data=NotificationData(end_reason=self.end_reason))
+ def _initialize_tls(self):
+ engine = Engine()
+ settings = SIPSimpleSettings()
+ account_manager = AccountManager()
+ account = account_manager.default_account
+ try:
+ engine.set_tls_options(port=settings.sip.tls_port,
+ verify_server=account.tls.verify_server,
+ ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None,
+ cert_file=account.tls.certificate.normalized if account.tls.certificate else None,
+ privkey_file=account.tls.certificate.normalized if account.tls.certificate else None,
+ timeout=settings.tls.timeout)
+ except Exception, e:
+ notification_center = NotificationCenter()
+ notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e))
+
@run_in_green_thread
def _initialize_subsystems(self):
account_manager = AccountManager()
addressbook_manager = AddressbookManager()
dns_manager = DNSManager()
engine = Engine()
notification_center = NotificationCenter()
session_manager = SessionManager()
settings = SIPSimpleSettings()
xcap_client.DEFAULT_HEADERS = {'User-Agent': settings.user_agent}
notification_center.post_notification('SIPApplicationWillStart', sender=self)
if self.state == 'stopping':
reactor.stop()
return
- account = account_manager.default_account
-
# initialize core
notification_center.add_observer(self, sender=engine)
options = dict(# general
user_agent=settings.user_agent,
# SIP
detect_sip_loops=True,
udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None,
tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None,
tls_port=None,
# TLS
tls_verify_server=False,
tls_ca_file=None,
tls_cert_file=None,
tls_privkey_file=None,
tls_timeout=3000,
# rtp
rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end),
# audio
codecs=list(settings.rtp.audio_codec_list),
# logging
log_level=settings.logs.pjsip_level if settings.logs.trace_pjsip else 0,
trace_sip=settings.logs.trace_sip,
)
try:
engine.start(**options)
except SIPCoreError:
self.end_reason = 'engine failed'
reactor.stop()
return
# initialize TLS
- try:
- engine.set_tls_options(port=settings.sip.tls_port if 'tls' in settings.sip.transport_list else None,
- verify_server=account.tls.verify_server if account else False,
- ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None,
- cert_file=account.tls.certificate.normalized if account and account.tls.certificate else None,
- privkey_file=account.tls.certificate.normalized if account and account.tls.certificate else None,
- timeout=settings.tls.timeout)
- except Exception, e:
- notification_center = NotificationCenter()
- notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e))
+ self._initialize_tls()
# initialize PJSIP internal resolver
engine.set_nameservers(dns_manager.nameservers)
# initialize audio objects
alert_device = settings.audio.alert_device
if alert_device not in (None, u'system_default') and alert_device not in engine.output_devices:
alert_device = u'system_default'
input_device = settings.audio.input_device
if input_device not in (None, u'system_default') and input_device not in engine.input_devices:
input_device = u'system_default'
output_device = settings.audio.output_device
if output_device not in (None, u'system_default') and output_device not in engine.output_devices:
output_device = u'system_default'
tail_length = settings.audio.echo_canceller.tail_length if settings.audio.echo_canceller.enabled else 0
voice_mixer = AudioMixer(input_device, output_device, settings.audio.sample_rate, tail_length)
voice_mixer.muted = settings.audio.muted
self.voice_audio_device = AudioDevice(voice_mixer)
self.voice_audio_bridge = RootAudioBridge(voice_mixer)
self.voice_audio_bridge.add(self.voice_audio_device)
alert_mixer = AudioMixer(None, alert_device, settings.audio.sample_rate, 0)
if settings.audio.silent:
alert_mixer.output_volume = 0
self.alert_audio_device = AudioDevice(alert_mixer)
self.alert_audio_bridge = RootAudioBridge(alert_mixer)
self.alert_audio_bridge.add(self.alert_audio_device)
settings.audio.input_device = voice_mixer.input_device
settings.audio.output_device = voice_mixer.output_device
settings.audio.alert_device = alert_mixer.output_device
settings.save()
# initialize instance id
if not settings.instance_id:
settings.instance_id = uuid4().urn
settings.save()
# initialize middleware components
dns_manager.start()
account_manager.start()
addressbook_manager.start()
session_manager.start()
notification_center.add_observer(self, name='CFGSettingsObjectDidChange')
notification_center.add_observer(self, name='DNSNameserversDidChange')
+ notification_center.add_observer(self, name='SystemIPAddressDidChange')
+ notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
self.state = 'started'
notification_center.post_notification('SIPApplicationDidStart', sender=self)
@run_in_green_thread
def _shutdown_subsystems(self):
# cleanup internals
- if self._wakeup_timer is not None and self._wakeup_timer.active():
- self._wakeup_timer.cancel()
- self._wakeup_timer = None
+ if self._timer is not None and self._timer.active():
+ self._timer.cancel()
+ self._timer = None
# shutdown middleware components
dns_manager = DNSManager()
account_manager = AccountManager()
addressbook_manager = AddressbookManager()
session_manager = SessionManager()
procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(addressbook_manager.stop), proc.spawn(session_manager.stop)]
proc.waitall(procs)
# shutdown engine
engine = Engine()
engine.stop()
while True:
notification = self._channel.wait()
if notification.name == 'SIPEngineDidEnd':
break
# stop threads
thread_manager = ThreadManager()
thread_manager.stop()
# stop the reactor
reactor.stop()
+ def _network_conditions_changed(self, restart_transports=False):
+ if self._timer is not None:
+ self._timer.restart_transports = self._timer.restart_transports or restart_transports
+ return
+ if self.running and self._timer is None:
+ def notify():
+ if self.running:
+ if self._timer.restart_transports:
+ engine = Engine()
+ notification_center = NotificationCenter()
+ settings = SIPSimpleSettings()
+ if 'tcp' in settings.sip.transport_list:
+ engine.set_tcp_port(None)
+ engine.set_tcp_port(settings.sip.tcp_port)
+ if 'tls' in settings.sip.transport_list:
+ self._initialize_tls()
+ notification_center.post_notification('NetworkConditionsDidChange', sender=self)
+ self._timer = None
+ self._timer = reactor.callLater(5, notify)
+ self._timer.restart_transports = restart_transports
+
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPEngineDidEnd(self, notification):
self._channel.send(notification)
def _NH_SIPEngineDidFail(self, notification):
if not self.running:
return
self.end_reason = 'engine failed'
notification.center.post_notification('SIPApplicationWillEnd', sender=self)
reactor.stop()
+ def _NH_SIPEngineTransportDidDisconnect(self, notification):
+ self._network_conditions_changed(restart_transports=False)
+
@run_in_thread('device-io')
def _NH_CFGSettingsObjectDidChange(self, notification):
engine = Engine()
settings = SIPSimpleSettings()
account_manager = AccountManager()
if notification.sender is settings:
if 'audio.sample_rate' in notification.data.modified:
alert_device = settings.audio.alert_device
if alert_device not in (None, u'system_default') and alert_device not in engine.output_devices:
alert_device = u'system_default'
input_device = settings.audio.input_device
if input_device not in (None, u'system_default') and input_device not in engine.input_devices:
input_device = u'system_default'
output_device = settings.audio.output_device
if output_device not in (None, u'system_default') and output_device not in engine.output_devices:
output_device = u'system_default'
tail_length = settings.audio.echo_canceller.tail_length if settings.audio.echo_canceller.enabled else 0
voice_mixer = AudioMixer(input_device, output_device, settings.audio.sample_rate, tail_length)
voice_mixer.muted = settings.audio.muted
self.voice_audio_device = AudioDevice(voice_mixer)
self.voice_audio_bridge = RootAudioBridge(voice_mixer)
self.voice_audio_bridge.add(self.voice_audio_device)
alert_mixer = AudioMixer(None, alert_device, settings.audio.sample_rate, 0)
self.alert_audio_device = AudioDevice(alert_mixer)
self.alert_audio_bridge = RootAudioBridge(alert_mixer)
self.alert_audio_bridge.add(self.alert_audio_device)
if settings.audio.silent:
alert_mixer.output_volume = 0
settings.audio.input_device = voice_mixer.input_device
settings.audio.output_device = voice_mixer.output_device
settings.audio.alert_device = alert_mixer.output_device
settings.save()
else:
if set(['audio.input_device', 'audio.output_device', 'audio.alert_device', 'audio.echo_canceller.enabled', 'audio.echo_canceller.tail_length']).intersection(notification.data.modified):
input_device = settings.audio.input_device
if input_device not in (None, u'system_default') and input_device not in engine.input_devices:
input_device = u'system_default'
output_device = settings.audio.output_device
if output_device not in (None, u'system_default') and output_device not in engine.output_devices:
output_device = u'system_default'
tail_length = settings.audio.echo_canceller.tail_length if settings.audio.echo_canceller.enabled else 0
if (input_device, output_device, tail_length) != attrgetter('input_device', 'output_device', 'ec_tail_length')(self.voice_audio_bridge.mixer):
self.voice_audio_bridge.mixer.set_sound_devices(input_device, output_device, tail_length)
settings.audio.input_device = self.voice_audio_bridge.mixer.input_device
settings.audio.output_device = self.voice_audio_bridge.mixer.output_device
settings.save()
alert_device = settings.audio.alert_device
if alert_device not in (None, u'system_default') and alert_device not in engine.output_devices:
alert_device = u'system_default'
if alert_device != self.alert_audio_bridge.mixer.output_device:
self.alert_audio_bridge.mixer.set_sound_devices(None, alert_device, 0)
settings.audio.alert_device = self.alert_audio_bridge.mixer.output_device
settings.save()
if 'audio.muted' in notification.data.modified:
self.voice_audio_bridge.mixer.muted = settings.audio.muted
if 'audio.silent' in notification.data.modified:
if settings.audio.silent:
self.alert_audio_bridge.mixer.output_volume = 0
else:
self.alert_audio_bridge.mixer.output_volume = 100
if 'user_agent' in notification.data.modified:
engine.user_agent = settings.user_agent
if 'sip.udp_port' in notification.data.modified:
engine.set_udp_port(settings.sip.udp_port)
if 'sip.tcp_port' in notification.data.modified:
engine.set_tcp_port(settings.sip.tcp_port)
if set(('sip.tls_port', 'tls.ca_list', 'tls.timeout', 'default_account')).intersection(notification.data.modified):
- account = account_manager.default_account
- try:
- engine.set_tls_options(port=settings.sip.tls_port,
- verify_server=account.tls.verify_server if account else False,
- ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None,
- cert_file=account.tls.certificate.normalized if account and account.tls.certificate else None,
- privkey_file=account.tls.certificate.normalized if account and account.tls.certificate else None,
- timeout=settings.tls.timeout)
- except Exception, e:
- notification_center = NotificationCenter()
- notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e))
+ self._initialize_tls()
if 'rtp.port_range' in notification.data.modified:
engine.rtp_port_range = (settings.rtp.port_range.start, settings.rtp.port_range.end)
if 'rtp.audio_codec_list' in notification.data.modified:
engine.codecs = list(settings.rtp.audio_codec_list)
if 'logs.trace_sip' in notification.data.modified:
engine.trace_sip = settings.logs.trace_sip
if set(('logs.trace_pjsip', 'logs.pjsip_level')).intersection(notification.data.modified):
engine.log_level = settings.logs.pjsip_level if settings.logs.trace_pjsip else 0
elif notification.sender is account_manager.default_account:
if set(('tls.verify_server', 'tls.certificate')).intersection(notification.data.modified):
- account = account_manager.default_account
- try:
- engine.set_tls_options(port=settings.sip.tls_port,
- verify_server=account.tls.verify_server,
- ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None,
- cert_file=account.tls.certificate.normalized if account.tls.certificate else None,
- privkey_file=account.tls.certificate.normalized if account.tls.certificate else None,
- timeout=settings.tls.timeout)
- except Exception, e:
- notification_center = NotificationCenter()
- notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e))
+ self._initialize_tls()
@run_in_thread('device-io')
def _NH_DefaultAudioDeviceDidChange(self, notification):
if None in (self.voice_audio_bridge, self.alert_audio_bridge):
return
settings = SIPSimpleSettings()
current_input_device = self.voice_audio_bridge.mixer.input_device
current_output_device = self.voice_audio_bridge.mixer.output_device
current_alert_device = self.alert_audio_bridge.mixer.output_device
ec_tail_length = self.voice_audio_bridge.mixer.ec_tail_length
if notification.data.changed_input and u'system_default' in (current_input_device, settings.audio.input_device):
self.voice_audio_bridge.mixer.set_sound_devices(u'system_default', current_output_device, ec_tail_length)
if notification.data.changed_output and u'system_default' in (current_output_device, settings.audio.output_device):
self.voice_audio_bridge.mixer.set_sound_devices(current_input_device, u'system_default', ec_tail_length)
if notification.data.changed_output and u'system_default' in (current_alert_device, settings.audio.alert_device):
self.alert_audio_bridge.mixer.set_sound_devices(None, u'system_default', 0)
@run_in_thread('device-io')
def _NH_AudioDevicesDidChange(self, notification):
old_devices = set(notification.data.old_devices)
new_devices = set(notification.data.new_devices)
removed_devices = old_devices - new_devices
if not removed_devices:
return
input_device = self.voice_audio_bridge.mixer.input_device
output_device = self.voice_audio_bridge.mixer.output_device
alert_device = self.alert_audio_bridge.mixer.output_device
if self.voice_audio_bridge.mixer.real_input_device in removed_devices:
input_device = u'system_default' if new_devices else None
if self.voice_audio_bridge.mixer.real_output_device in removed_devices:
output_device = u'system_default' if new_devices else None
if self.alert_audio_bridge.mixer.real_output_device in removed_devices:
alert_device = u'system_default' if new_devices else None
self.voice_audio_bridge.mixer.set_sound_devices(input_device, output_device, self.voice_audio_bridge.mixer.ec_tail_length)
self.alert_audio_bridge.mixer.set_sound_devices(None, alert_device, 0)
settings = SIPSimpleSettings()
settings.audio.input_device = self.voice_audio_bridge.mixer.input_device
settings.audio.output_device = self.voice_audio_bridge.mixer.output_device
settings.audio.alert_device = self.alert_audio_bridge.mixer.output_device
settings.save()
def _NH_DNSNameserversDidChange(self, notification):
if self.running:
engine = Engine()
engine.set_nameservers(notification.data.nameservers)
+ notification.center.post_notification('NetworkConditionsDidChange', sender=self)
+
+ def _NH_SystemIPAddressDidChange(self, notification):
+ self._network_conditions_changed(restart_transports=True)
+
+ def _NH_SystemDidWakeupFromSleep(self, notification):
+ self._network_conditions_changed(restart_transports=True)
diff --git a/sipsimple/core/_core.lib.pxi b/sipsimple/core/_core.lib.pxi
index 52827828..6a87bc9f 100644
--- a/sipsimple/core/_core.lib.pxi
+++ b/sipsimple/core/_core.lib.pxi
@@ -1,319 +1,344 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
import sys
# classes
cdef class PJLIB:
def __cinit__(self):
cdef int status
status = pj_init()
if status != 0:
raise PJSIPError("Could not initialize PJLIB", status)
self._init_done = 1
status = pjlib_util_init()
if status != 0:
raise PJSIPError("Could not initialize PJLIB-UTIL", status)
status = pjnath_init()
if status != 0:
raise PJSIPError("Could not initialize PJNATH", status)
def __dealloc__(self):
if self._init_done:
pj_shutdown()
cdef class PJCachingPool:
def __cinit__(self):
pj_caching_pool_init(&self._obj, &pj_pool_factory_default_policy, 0)
self._init_done = 1
def __dealloc__(self):
if self._init_done:
pj_caching_pool_destroy(&self._obj)
cdef class PJSIPEndpoint:
def __cinit__(self, PJCachingPool caching_pool, ip_address, udp_port, tcp_port, tls_port,
tls_verify_server, tls_ca_file, tls_cert_file, tls_privkey_file, int tls_timeout):
+ cdef pjsip_tpmgr *tpmgr
cdef pj_dns_resolver *resolver
cdef int status
+
if ip_address is not None and not _is_valid_ip(pj_AF_INET(), ip_address):
raise ValueError("Not a valid IPv4 address: %s" % ip_address)
+ self._local_ip_used = ip_address
+
status = pjsip_endpt_create(&caching_pool._obj.factory, "core", &self._obj)
if status != 0:
raise PJSIPError("Could not initialize PJSIP endpoint", status)
self._pool = pjsip_endpt_create_pool(self._obj, "PJSIPEndpoint", 4096, 4096)
if self._pool == NULL:
raise SIPCoreError("Could not allocate memory pool")
+
status = pjsip_tsx_layer_init_module(self._obj)
if status != 0:
raise PJSIPError("Could not initialize transaction layer module", status)
status = pjsip_ua_init_module(self._obj, NULL) # TODO: handle forking
if status != 0:
raise PJSIPError("Could not initialize common dialog layer module", status)
status = pjsip_evsub_init_module(self._obj)
if status != 0:
raise PJSIPError("Could not initialize event subscription module", status)
status = pjsip_100rel_init_module(self._obj)
if status != 0:
raise PJSIPError("Could not initialize 100rel module", status)
status = pjsip_replaces_init_module(self._obj)
if status != 0:
raise PJSIPError("Could not initialize replaces module", status)
status = pjsip_inv_usage_init(self._obj, &_inv_cb)
if status != 0:
raise PJSIPError("Could not initialize invitation module", status)
status = pjsip_endpt_create_resolver(self._obj, &resolver)
if status != 0:
raise PJSIPError("Could not create fake DNS resolver for endpoint", status)
status = pjsip_endpt_set_resolver(self._obj, resolver)
if status != 0:
raise PJSIPError("Could not set fake DNS resolver on endpoint", status)
- self._local_ip_used = ip_address
+
+ tpmgr = pjsip_endpt_get_tpmgr(self._obj)
+ if tpmgr == NULL:
+ raise SIPCoreError("Could not get the transport manager")
+ status = pjsip_tpmgr_set_state_cb(tpmgr, _transport_state_cb)
+ if status != 0:
+ raise PJSIPError("Could not set transport state callback", status)
if udp_port is not None:
self._start_udp_transport(udp_port)
if tcp_port is not None:
self._start_tcp_transport(tcp_port)
self._tls_verify_server = int(tls_verify_server)
if tls_ca_file is not None:
self._tls_ca_file = PJSTR(tls_ca_file.encode(sys.getfilesystemencoding()))
if tls_cert_file is not None:
self._tls_cert_file = PJSTR(tls_cert_file.encode(sys.getfilesystemencoding()))
if tls_privkey_file is not None:
self._tls_privkey_file = PJSTR(tls_privkey_file.encode(sys.getfilesystemencoding()))
if tls_timeout < 0:
raise ValueError("Invalid TLS timeout value: %d" % tls_timeout)
self._tls_timeout = tls_timeout
if tls_port is not None:
self._start_tls_transport(tls_port)
cdef int _make_local_addr(self, pj_sockaddr_in *local_addr, object ip_address, int port) except -1:
cdef pj_str_t local_ip_pj
cdef pj_str_t *local_ip_p = NULL
cdef int status
if not (0 <= port <= 65535):
raise SIPCoreError("Invalid port: %d" % port)
if ip_address is not None and ip_address is not "0.0.0.0":
local_ip_p = &local_ip_pj
_str_to_pj_str(ip_address, local_ip_p)
status = pj_sockaddr_in_init(local_addr, local_ip_p, port)
if status != 0:
raise PJSIPError("Could not create local address", status)
return 0
cdef int _start_udp_transport(self, int port) except -1:
cdef pj_sockaddr_in local_addr
self._make_local_addr(&local_addr, self._local_ip_used, port)
status = pjsip_udp_transport_start(self._obj, &local_addr, NULL, 1, &self._udp_transport)
if status != 0:
raise PJSIPError("Could not create UDP transport", status)
return 0
cdef int _stop_udp_transport(self) except -1:
pjsip_transport_shutdown(self._udp_transport)
self._udp_transport = NULL
return 0
cdef int _start_tcp_transport(self, int port) except -1:
cdef pj_sockaddr_in local_addr
self._make_local_addr(&local_addr, self._local_ip_used, port)
status = pjsip_tcp_transport_start2(self._obj, &local_addr, NULL, 1, &self._tcp_transport)
if status != 0:
raise PJSIPError("Could not create TCP transport", status)
return 0
cdef int _stop_tcp_transport(self) except -1:
self._tcp_transport.destroy(self._tcp_transport)
self._tcp_transport = NULL
return 0
cdef int _start_tls_transport(self, port) except -1:
cdef pj_sockaddr_in local_addr
cdef pjsip_tls_setting tls_setting
self._make_local_addr(&local_addr, self._local_ip_used, port)
pjsip_tls_setting_default(&tls_setting)
# The following value needs to be reasonably low, as TLS negotiation hogs the PJSIP polling loop
tls_setting.timeout.sec = self._tls_timeout / 1000
tls_setting.timeout.msec = self._tls_timeout % 1000
if self._tls_ca_file is not None:
tls_setting.ca_list_file = self._tls_ca_file.pj_str
if self._tls_cert_file is not None:
tls_setting.cert_file = self._tls_cert_file.pj_str
if self._tls_privkey_file is not None:
tls_setting.privkey_file = self._tls_privkey_file.pj_str
tls_setting.method = PJSIP_TLSV1_METHOD
tls_setting.verify_server = self._tls_verify_server
status = pjsip_tls_transport_start(self._obj, &tls_setting, &local_addr, NULL, 1, &self._tls_transport)
if status in (PJSIP_TLS_EUNKNOWN, PJSIP_TLS_EINVMETHOD, PJSIP_TLS_ECACERT, PJSIP_TLS_ECERTFILE, PJSIP_TLS_EKEYFILE, PJSIP_TLS_ECIPHER, PJSIP_TLS_ECTX):
raise PJSIPTLSError("Could not create TLS transport", status)
elif status != 0:
raise PJSIPError("Could not create TLS transport", status)
return 0
cdef int _stop_tls_transport(self) except -1:
self._tls_transport.destroy(self._tls_transport)
self._tls_transport = NULL
return 0
cdef int _set_dns_nameservers(self, list servers) except -1:
cdef int num_servers = len(servers)
cdef pj_str_t *pj_servers
cdef int status
cdef pj_dns_resolver *resolver
if num_servers == 0:
return 0
resolver = pjsip_endpt_get_resolver(self._obj)
if resolver == NULL:
raise SIPCoreError("Could not get DNS resolver on endpoint")
pj_servers = <pj_str_t *> malloc(sizeof(pj_str_t)*num_servers)
if pj_servers == NULL:
raise MemoryError()
for i, ns in enumerate(servers):
_str_to_pj_str(ns, &pj_servers[i])
status = pj_dns_resolver_set_ns(resolver, num_servers, pj_servers, NULL)
free(pj_servers)
if status != 0:
raise PJSIPError("Could not set nameservers on DNS resolver", status)
return 0
def __dealloc__(self):
+ cdef pjsip_tpmgr *tpmgr
+ tpmgr = pjsip_endpt_get_tpmgr(self._obj)
+ if tpmgr != NULL:
+ pjsip_tpmgr_set_state_cb(tpmgr, NULL)
if self._udp_transport != NULL:
self._stop_udp_transport()
if self._tcp_transport != NULL:
self._stop_tcp_transport()
if self._tls_transport != NULL:
self._stop_tls_transport()
if self._pool != NULL:
pjsip_endpt_release_pool(self._obj, self._pool)
if self._obj != NULL:
pjsip_endpt_destroy(self._obj)
cdef class PJMEDIAEndpoint:
def __cinit__(self, PJCachingPool caching_pool):
cdef int status
status = pjmedia_endpt_create(&caching_pool._obj.factory, NULL, 1, &self._obj)
if status != 0:
raise PJSIPError("Could not create PJMEDIA endpoint", status)
status = pjmedia_codec_speex_init(self._obj, PJMEDIA_SPEEX_NO_NB, -1, -1)
if status != 0:
raise PJSIPError("Could not initialize speex codec", status)
self._has_speex = 1
status = pjmedia_codec_g722_init(self._obj)
if status != 0:
raise PJSIPError("Could not initialize G.722 codec", status)
self._has_g722 = 1
pjmedia_codec_g711_init(self._obj)
if status != 0:
raise PJSIPError("Could not initialize G.711 codecs", status)
self._has_g711 = 1
status = pjmedia_codec_ilbc_init(self._obj, 20)
if status != 0:
raise PJSIPError("Could not initialize iLBC codec", status)
self._has_ilbc = 1
status = pjmedia_codec_gsm_init(self._obj)
if status != 0:
raise PJSIPError("Could not initialize GSM codec", status)
self._has_gsm = 1
status = pjmedia_codec_opus_init(self._obj)
if status != 0:
raise PJSIPError("Could not initialize opus codec", status)
self._has_opus = 1
def __dealloc__(self):
if self._has_opus:
pjmedia_codec_opus_deinit()
if self._has_gsm:
pjmedia_codec_gsm_deinit()
if self._has_ilbc:
pjmedia_codec_ilbc_deinit()
if self._has_g711:
pjmedia_codec_g711_deinit()
if self._has_g722:
pjmedia_codec_g722_deinit()
if self._has_speex:
pjmedia_codec_speex_deinit()
if self._obj != NULL:
pjmedia_endpt_destroy(self._obj)
cdef list _get_codecs(self):
cdef unsigned int count = PJMEDIA_CODEC_MGR_MAX_CODECS
cdef pjmedia_codec_info info[PJMEDIA_CODEC_MGR_MAX_CODECS]
cdef unsigned int prio[PJMEDIA_CODEC_MGR_MAX_CODECS]
cdef int i
cdef list retval
cdef int status
status = pjmedia_codec_mgr_enum_codecs(pjmedia_endpt_get_codec_mgr(self._obj), &count, info, prio)
if status != 0:
raise PJSIPError("Could not get available codecs", status)
retval = list()
for i from 0 <= i < count:
retval.append((prio[i], _pj_str_to_str(info[i].encoding_name), info[i].channel_cnt, info[i].clock_rate))
return retval
cdef list _get_all_codecs(self):
cdef list codecs
cdef tuple codec_data
codecs = self._get_codecs()
return list(set([codec_data[1] for codec_data in codecs]))
cdef list _get_current_codecs(self):
cdef list codecs
cdef tuple codec_data
cdef list retval
codecs = [codec_data for codec_data in self._get_codecs() if codec_data[0] > 0]
codecs.sort(reverse=True)
retval = list(set([codec_data[1] for codec_data in codecs]))
return retval
cdef int _set_codecs(self, list req_codecs) except -1:
cdef object new_codecs
cdef object all_codecs
cdef object codec_set
cdef list codecs
cdef tuple codec_data
cdef str codec
cdef int sample_rate
cdef int channel_count
cdef str codec_name
cdef int prio
cdef list codec_prio
cdef pj_str_t codec_pj
new_codecs = set(req_codecs)
if len(new_codecs) != len(req_codecs):
raise ValueError("Requested codec list contains doubles")
all_codecs = set(self._get_all_codecs())
codec_set = new_codecs.difference(all_codecs)
if len(codec_set) > 0:
raise SIPCoreError("Unknown codec(s): %s" % ", ".join(codec_set))
# reverse the codec data tuples so that we can easily sort on sample rate
# to make sure that bigger sample rates get higher priority
codecs = [list(reversed(codec_data)) for codec_data in self._get_codecs()]
codecs.sort(reverse=True)
codec_prio = list()
for codec in req_codecs:
for sample_rate, channel_count, codec_name, prio in codecs:
if codec == codec_name and channel_count == 1:
codec_prio.append("%s/%d/%d" % (codec_name, sample_rate, channel_count))
for prio, codec in enumerate(reversed(codec_prio)):
_str_to_pj_str(codec, &codec_pj)
status = pjmedia_codec_mgr_set_codec_priority(pjmedia_endpt_get_codec_mgr(self._obj), &codec_pj, prio + 1)
if status != 0:
raise PJSIPError("Could not set codec priority", status)
for sample_rate, channel_count, codec_name, prio in codecs:
if codec_name not in req_codecs or channel_count > 1:
codec = "%s/%d/%d" % (codec_name, sample_rate, channel_count)
_str_to_pj_str(codec, &codec_pj)
status = pjmedia_codec_mgr_set_codec_priority(pjmedia_endpt_get_codec_mgr(self._obj), &codec_pj, 0)
if status != 0:
raise PJSIPError("Could not set codec priority", status)
return 0
+
+cdef void _transport_state_cb(pjsip_transport *tp, pjsip_transport_state state, pjsip_transport_state_info_ptr_const info) with gil:
+ cdef PJSIPUA ua
+ try:
+ ua = _get_ua()
+ except:
+ return
+ if state == PJSIP_TP_STATE_DISCONNECTED and info.status != 0:
+ _add_event("SIPEngineTransportDidDisconnect", dict(transport=tp.type_name.lower(), reason=_pj_status_to_str(info.status)))
+
diff --git a/sipsimple/core/_core.pxd b/sipsimple/core/_core.pxd
index a518452e..464ce553 100644
--- a/sipsimple/core/_core.pxd
+++ b/sipsimple/core/_core.pxd
@@ -1,2158 +1,2170 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
cdef extern from *:
ctypedef char *char_ptr_const "const char *"
enum:
PJ_SVN_REV "PJ_SVN_REVISION"
# system imports
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy
# Python C imports
from cpython.float cimport PyFloat_AsDouble
from cpython.ref cimport Py_INCREF, Py_DECREF
from cpython.string cimport PyString_FromString, PyString_FromStringAndSize, PyString_AsString, PyString_Size
# PJSIP imports
cdef extern from "pjlib.h":
# constants
enum:
PJ_ERR_MSG_SIZE
enum:
PJ_ERRNO_START_SYS
PJ_EBUG
PJ_ETOOMANY
enum:
PJ_MAX_OBJ_NAME
# init / shutdown
int pj_init() nogil
void pj_shutdown() nogil
# version
char *pj_get_version() nogil
# string
struct pj_str_t:
char *ptr
int slen
# errors
pj_str_t pj_strerror(int statcode, char *buf, int bufsize) nogil
# logging
enum:
PJ_LOG_MAX_LEVEL
enum pj_log_decoration:
PJ_LOG_HAS_YEAR
PJ_LOG_HAS_MONTH
PJ_LOG_HAS_DAY_OF_MON
PJ_LOG_HAS_TIME
PJ_LOG_HAS_MICRO_SEC
PJ_LOG_HAS_SENDER
PJ_LOG_HAS_INDENT
void pj_log_set_decor(int decor) nogil
int pj_log_get_level() nogil
void pj_log_set_level(int level) nogil
void pj_log_set_log_func(void func(int level, char_ptr_const data, int len)) nogil
# memory management
struct pj_pool_t
struct pj_pool_factory_policy:
pass
pj_pool_factory_policy pj_pool_factory_default_policy
struct pj_pool_factory:
pass
struct pj_caching_pool:
pj_pool_factory factory
void pj_caching_pool_init(pj_caching_pool *ch_pool, pj_pool_factory_policy *policy, int max_capacity) nogil
void pj_caching_pool_destroy(pj_caching_pool *ch_pool) nogil
void *pj_pool_alloc(pj_pool_t *pool, int size) nogil
void pj_pool_reset(pj_pool_t *pool) nogil
pj_pool_t *pj_pool_create_on_buf(char *name, void *buf, int size) nogil
pj_str_t *pj_strdup2_with_null(pj_pool_t *pool, pj_str_t *dst, char *src) nogil
# threads
enum:
PJ_THREAD_DESC_SIZE
struct pj_mutex_t
struct pj_rwmutex_t
struct pj_thread_t
int pj_mutex_create_simple(pj_pool_t *pool, char *name, pj_mutex_t **mutex) nogil
int pj_mutex_create_recursive(pj_pool_t *pool, char *name, pj_mutex_t **mutex) nogil
int pj_mutex_lock(pj_mutex_t *mutex) nogil
int pj_mutex_unlock(pj_mutex_t *mutex) nogil
int pj_mutex_destroy(pj_mutex_t *mutex) nogil
int pj_rwmutex_create(pj_pool_t *pool, char *name, pj_rwmutex_t **mutex) nogil
int pj_rwmutex_lock_read(pj_rwmutex_t *mutex) nogil
int pj_rwmutex_lock_write(pj_rwmutex_t *mutex) nogil
int pj_rwmutex_unlock_read(pj_rwmutex_t *mutex) nogil
int pj_rwmutex_unlock_write(pj_rwmutex_t *mutex) nogil
int pj_rwmutex_destroy(pj_rwmutex_t *mutex) nogil
int pj_thread_is_registered() nogil
int pj_thread_register(char *thread_name, long *thread_desc, pj_thread_t **thread) nogil
# sockets
enum:
PJ_INET6_ADDRSTRLEN
struct pj_ioqueue_t
struct pj_addr_hdr:
unsigned int sa_family
struct pj_sockaddr:
pj_addr_hdr addr
struct pj_sockaddr_in:
pass
int pj_AF_INET() nogil
int pj_AF_INET6() nogil
int pj_sockaddr_in_init(pj_sockaddr_in *addr, pj_str_t *cp, int port) nogil
int pj_sockaddr_get_port(pj_sockaddr *addr) nogil
char *pj_sockaddr_print(pj_sockaddr *addr, char *buf, int size, unsigned int flags) nogil
int pj_sockaddr_has_addr(pj_sockaddr *addr) nogil
int pj_sockaddr_init(int af, pj_sockaddr *addr, pj_str_t *cp, unsigned int port) nogil
int pj_inet_pton(int af, pj_str_t *src, void *dst) nogil
# dns
struct pj_dns_resolver
int pj_dns_resolver_set_ns(pj_dns_resolver *resolver, unsigned count, pj_str_t *servers, int *ports) nogil
# time
struct pj_time_val:
long sec
long msec
void pj_gettimeofday(pj_time_val *tv) nogil
void pj_time_val_normalize(pj_time_val *tv) nogil
# timers
struct pj_timer_heap_t
struct pj_timer_entry:
void *user_data
int id
pj_timer_entry *pj_timer_entry_init(pj_timer_entry *entry, int id, void *user_data,
void cb(pj_timer_heap_t *timer_heap, pj_timer_entry *entry) with gil) nogil
# lists
struct pj_list:
void *prev
void *next
void pj_list_init(pj_list *node) nogil
void pj_list_insert_after(pj_list *pos, pj_list *node) nogil
# random
void pj_srand(unsigned int seed) nogil
# maths
struct pj_math_stat:
int n
int max
int min
int last
int mean
cdef extern from "pjlib-util.h":
# init
int pjlib_util_init() nogil
cdef extern from "pjnath.h":
# init
int pjnath_init() nogil
# STUN
enum:
PJ_STUN_PORT
struct pj_stun_config:
pass
struct pj_stun_sock_cfg:
pj_sockaddr bound_addr
void pj_stun_config_init(pj_stun_config *cfg, pj_pool_factory *factory, unsigned int options,
pj_ioqueue_t *ioqueue, pj_timer_heap_t *timer_heap) nogil
# NAT detection
struct pj_stun_nat_detect_result:
int status
char *status_text
char *nat_type_name
ctypedef pj_stun_nat_detect_result *pj_stun_nat_detect_result_ptr_const "const pj_stun_nat_detect_result *"
int pj_stun_detect_nat_type(pj_sockaddr_in *server, pj_stun_config *stun_cfg, void *user_data,
void pj_stun_nat_detect_cb(void *user_data,
pj_stun_nat_detect_result_ptr_const res) with gil) nogil
# ICE
struct pj_ice_strans
struct pj_ice_sess_cand:
int type
int comp_id
int prio
pj_sockaddr addr
pj_sockaddr rel_addr
struct pj_ice_sess_check:
pj_ice_sess_cand *lcand
pj_ice_sess_cand *rcand
int state
int nominated
ctypedef pj_ice_sess_check *pj_ice_sess_check_ptr_const "const pj_ice_sess_check *"
struct pj_ice_sess_comp:
pj_ice_sess_check *valid_check
struct pj_ice_sess_checklist:
int count
pj_ice_sess_check *checks
struct pj_ice_sess:
int comp_cnt
pj_ice_sess_comp *comp
int lcand_cnt
pj_ice_sess_cand *lcand
int rcand_cnt
pj_ice_sess_cand *rcand
pj_ice_sess_checklist valid_list
struct pj_ice_strans_cfg_stun:
pj_stun_sock_cfg cfg
pj_str_t server
unsigned int port
struct pj_ice_strans_cfg:
int af
pj_stun_config stun_cfg
pj_ice_strans_cfg_stun stun
enum pj_ice_strans_op:
PJ_ICE_STRANS_OP_INIT
PJ_ICE_STRANS_OP_NEGOTIATION
enum pj_ice_strans_state:
PJ_ICE_STRANS_STATE_NULL
PJ_ICE_STRANS_STATE_INIT
PJ_ICE_STRANS_STATE_READY
PJ_ICE_STRANS_STATE_SESS_READY
PJ_ICE_STRANS_STATE_NEGO
PJ_ICE_STRANS_STATE_RUNNING
PJ_ICE_STRANS_STATE_FAILED
enum pj_ice_cand_type:
PJ_ICE_CAND_TYPE_HOST
PJ_ICE_CAND_TYPE_SRFLX
PJ_ICE_CAND_TYPE_PRFLX
PJ_ICE_CAND_TYPE_RELAYED
enum pj_ice_sess_check_state:
PJ_ICE_SESS_CHECK_STATE_FROZEN
PJ_ICE_SESS_CHECK_STATE_WAITING
PJ_ICE_SESS_CHECK_STATE_IN_PROGRESS
PJ_ICE_SESS_CHECK_STATE_SUCCEEDED
PJ_ICE_SESS_CHECK_STATE_FAILED
void pj_ice_strans_cfg_default(pj_ice_strans_cfg *cfg) nogil
pj_ice_sess* pj_ice_strans_get_session(pj_ice_strans *ice_st)
pj_time_val pj_ice_strans_get_start_time(pj_ice_strans *ice_st)
pj_ice_sess_check_ptr_const pj_ice_strans_get_valid_pair(pj_ice_strans *ice_st, unsigned comp_id)
cdef extern from "pjmedia.h":
enum:
PJMEDIA_ENOSNDREC
PJMEDIA_ENOSNDPLAY
enum:
PJMEDIA_AUD_DEFAULT_CAPTURE_DEV
PJMEDIA_AUD_DEFAULT_PLAYBACK_DEV
PJMEDIA_AUD_DEV_CAP_EC
PJMEDIA_AUD_DEV_CAP_EC_TAIL
enum pjmedia_dir:
PJMEDIA_DIR_PLAYBACK
PJMEDIA_DIR_CAPTURE
PJMEDIA_DIR_CAPTURE_PLAYBACK
# codec manager
struct pjmedia_codec_mgr
enum:
PJMEDIA_CODEC_MGR_MAX_CODECS
struct pjmedia_codec_info:
pj_str_t encoding_name
unsigned int clock_rate
unsigned int channel_cnt
int pjmedia_codec_mgr_enum_codecs(pjmedia_codec_mgr *mgr, unsigned int *count,
pjmedia_codec_info *info, unsigned int *prio) nogil
int pjmedia_codec_mgr_set_codec_priority(pjmedia_codec_mgr *mgr, pj_str_t *codec_id, unsigned int prio) nogil
# endpoint
struct pjmedia_endpt
int pjmedia_endpt_create(pj_pool_factory *pf, pj_ioqueue_t *ioqueue, int worker_cnt, pjmedia_endpt **p_endpt) nogil
int pjmedia_endpt_destroy(pjmedia_endpt *endpt) nogil
pj_ioqueue_t *pjmedia_endpt_get_ioqueue(pjmedia_endpt *endpt) nogil
pjmedia_codec_mgr *pjmedia_endpt_get_codec_mgr(pjmedia_endpt *endpt) nogil
# codecs
int pjmedia_codec_g711_init(pjmedia_endpt *endpt) nogil
int pjmedia_codec_g711_deinit() nogil
# sound devices
struct pjmedia_aud_dev_info:
char *name
int input_count
int output_count
struct pjmedia_aud_param:
pjmedia_dir dir
int play_id
int rec_id
int clock_rate
int channel_count
int samples_per_frame
int bits_per_sample
int flags
int ec_enabled
int ec_tail_ms
struct pjmedia_aud_stream
int pjmedia_aud_dev_count() nogil
int pjmedia_aud_dev_get_info(int index, pjmedia_aud_dev_info *info) nogil
int pjmedia_aud_stream_get_param(pjmedia_aud_stream *strm, pjmedia_aud_param *param) nogil
enum pjmedia_aud_dev_event:
PJMEDIA_AUD_DEV_DEFAULT_INPUT_CHANGED
PJMEDIA_AUD_DEV_DEFAULT_OUTPUT_CHANGED
PJMEDIA_AUD_DEV_LIST_WILL_REFRESH
PJMEDIA_AUD_DEV_LIST_DID_REFRESH
ctypedef void (*pjmedia_aud_dev_observer_callback)(pjmedia_aud_dev_event event)
int pjmedia_aud_dev_set_observer_cb(pjmedia_aud_dev_observer_callback cb) nogil
int pjmedia_aud_dev_default_param(int index, pjmedia_aud_param *param) nogil
int pjmedia_aud_dev_refresh() nogil
# sound port
struct pjmedia_port
struct pjmedia_snd_port
struct pjmedia_snd_port_param:
pjmedia_aud_param base
ctypedef pjmedia_snd_port_param *pjmedia_snd_port_param_ptr_const "const pjmedia_snd_port_param *"
int pjmedia_snd_port_create2(pj_pool_t *pool, pjmedia_snd_port_param_ptr_const prm, pjmedia_snd_port **p_port) nogil
void pjmedia_snd_port_param_default(pjmedia_snd_port_param *prm)
int pjmedia_snd_port_connect(pjmedia_snd_port *snd_port, pjmedia_port *port) nogil
int pjmedia_snd_port_disconnect(pjmedia_snd_port *snd_port) nogil
int pjmedia_snd_port_set_ec(pjmedia_snd_port *snd_port, pj_pool_t *pool, unsigned int tail_ms, int options) nogil
int pjmedia_snd_port_reset_ec_state(pjmedia_snd_port *snd_port) nogil
int pjmedia_snd_port_destroy(pjmedia_snd_port *snd_port) nogil
pjmedia_aud_stream *pjmedia_snd_port_get_snd_stream(pjmedia_snd_port *snd_port) nogil
int pjmedia_null_port_create(pj_pool_t *pool, unsigned int sampling_rate, unsigned int channel_count,
unsigned int samples_per_frame, unsigned int bits_per_sample, pjmedia_port **p_port) nogil
int pjmedia_mixer_port_create(pj_pool_t *pool, unsigned int sampling_rate, unsigned int channel_count,
unsigned int samples_per_frame, unsigned int bits_per_sample, pjmedia_port **p_port) nogil
# master port
struct pjmedia_master_port
int pjmedia_master_port_create(pj_pool_t *pool, pjmedia_port *u_port, pjmedia_port *d_port,
unsigned int options, pjmedia_master_port **p_m) nogil
int pjmedia_master_port_start(pjmedia_master_port *m) nogil
int pjmedia_master_port_destroy(pjmedia_master_port *m, int destroy_ports) nogil
# conference bridge
enum pjmedia_conf_option:
PJMEDIA_CONF_NO_DEVICE
struct pjmedia_conf
int pjmedia_conf_create(pj_pool_t *pool, int max_slots, int sampling_rate, int channel_count,
int samples_per_frame, int bits_per_sample, int options, pjmedia_conf **p_conf) nogil
int pjmedia_conf_destroy(pjmedia_conf *conf) nogil
pjmedia_port *pjmedia_conf_get_master_port(pjmedia_conf *conf) nogil
int pjmedia_conf_add_port(pjmedia_conf *conf, pj_pool_t *pool, pjmedia_port *strm_port,
pj_str_t *name, unsigned int *p_slot) nogil
int pjmedia_conf_remove_port(pjmedia_conf *conf, unsigned int slot) nogil
int pjmedia_conf_connect_port(pjmedia_conf *conf, unsigned int src_slot, unsigned int sink_slot, int level) nogil
int pjmedia_conf_disconnect_port(pjmedia_conf *conf, unsigned int src_slot, unsigned int sink_slot) nogil
int pjmedia_conf_adjust_rx_level(pjmedia_conf *conf, unsigned slot, int adj_level) nogil
int pjmedia_conf_adjust_tx_level(pjmedia_conf *conf, unsigned slot, int adj_level) nogil
# sdp
enum:
PJMEDIA_MAX_SDP_FMT
enum:
PJMEDIA_MAX_SDP_ATTR
enum:
PJMEDIA_MAX_SDP_MEDIA
struct pjmedia_sdp_attr:
pj_str_t name
pj_str_t value
struct pjmedia_sdp_conn:
pj_str_t net_type
pj_str_t addr_type
pj_str_t addr
struct pjmedia_sdp_media_desc:
pj_str_t media
unsigned int port
unsigned int port_count
pj_str_t transport
unsigned int fmt_count
pj_str_t fmt[PJMEDIA_MAX_SDP_FMT]
struct pjmedia_sdp_media:
pjmedia_sdp_media_desc desc
pjmedia_sdp_conn *conn
unsigned int attr_count
pjmedia_sdp_attr *attr[PJMEDIA_MAX_SDP_ATTR]
struct pjmedia_sdp_session_origin:
pj_str_t user
unsigned int id
unsigned int version
pj_str_t net_type
pj_str_t addr_type
pj_str_t addr
struct pjmedia_sdp_session_time:
unsigned int start
unsigned int stop
struct pjmedia_sdp_session:
pjmedia_sdp_session_origin origin
pj_str_t name
pjmedia_sdp_conn *conn
pjmedia_sdp_session_time time
unsigned int attr_count
pjmedia_sdp_attr *attr[PJMEDIA_MAX_SDP_ATTR]
unsigned int media_count
pjmedia_sdp_media *media[PJMEDIA_MAX_SDP_MEDIA]
ctypedef pjmedia_sdp_session *pjmedia_sdp_session_ptr_const "const pjmedia_sdp_session *"
pjmedia_sdp_media *pjmedia_sdp_media_clone(pj_pool_t *pool, pjmedia_sdp_media *rhs) nogil
pjmedia_sdp_session *pjmedia_sdp_session_clone(pj_pool_t *pool, pjmedia_sdp_session_ptr_const sdp) nogil
int pjmedia_sdp_print(pjmedia_sdp_session_ptr_const sdp, char *buf, int length)
int pjmedia_sdp_parse(pj_pool_t *pool, char *buf, int length, pjmedia_sdp_session **p_sdp) nogil
# sdp negotiation
enum pjmedia_sdp_neg_state:
PJMEDIA_SDP_NEG_STATE_NULL
PJMEDIA_SDP_NEG_STATE_LOCAL_OFFER
PJMEDIA_SDP_NEG_STATE_REMOTE_OFFER
PJMEDIA_SDP_NEG_STATE_WAIT_NEGO
PJMEDIA_SDP_NEG_STATE_DONE
struct pjmedia_sdp_neg
int pjmedia_sdp_neg_create_w_local_offer(pj_pool_t *pool, pjmedia_sdp_session_ptr_const local, pjmedia_sdp_neg **neg) nogil
int pjmedia_sdp_neg_create_w_remote_offer(pj_pool_t *pool, pjmedia_sdp_session_ptr_const initial, pjmedia_sdp_session_ptr_const remote, pjmedia_sdp_neg **neg) nogil
int pjmedia_sdp_neg_set_local_answer(pj_pool_t *pool, pjmedia_sdp_neg *neg, pjmedia_sdp_session_ptr_const local) nogil
int pjmedia_sdp_neg_set_remote_answer(pj_pool_t *pool, pjmedia_sdp_neg *neg, pjmedia_sdp_session_ptr_const remote) nogil
int pjmedia_sdp_neg_set_remote_offer(pj_pool_t *pool, pjmedia_sdp_neg *neg, pjmedia_sdp_session_ptr_const remote) nogil
int pjmedia_sdp_neg_get_neg_remote(pjmedia_sdp_neg *neg, pjmedia_sdp_session_ptr_const *remote) nogil
int pjmedia_sdp_neg_get_neg_local(pjmedia_sdp_neg *neg, pjmedia_sdp_session_ptr_const *local) nogil
int pjmedia_sdp_neg_get_active_remote(pjmedia_sdp_neg *neg, pjmedia_sdp_session_ptr_const *remote) nogil
int pjmedia_sdp_neg_get_active_local(pjmedia_sdp_neg *neg, pjmedia_sdp_session_ptr_const *local) nogil
int pjmedia_sdp_neg_modify_local_offer (pj_pool_t *pool, pjmedia_sdp_neg *neg, pjmedia_sdp_session_ptr_const local) nogil
int pjmedia_sdp_neg_cancel_offer(pjmedia_sdp_neg *neg) nogil
int pjmedia_sdp_neg_negotiate(pj_pool_t *poll, pjmedia_sdp_neg *neg, int allow_asym) nogil
pjmedia_sdp_neg_state pjmedia_sdp_neg_get_state(pjmedia_sdp_neg *neg) nogil
char *pjmedia_sdp_neg_state_str(pjmedia_sdp_neg_state state) nogil
# transport
enum pjmedia_transport_type:
PJMEDIA_TRANSPORT_TYPE_ICE
PJMEDIA_TRANSPORT_TYPE_SRTP
struct pjmedia_sock_info:
pj_sockaddr rtp_addr_name
struct pjmedia_transport:
char *name
pjmedia_transport_type type
struct pjmedia_transport_specific_info:
pjmedia_transport_type type
char *buffer
struct pjmedia_transport_info:
pjmedia_sock_info sock_info
pj_sockaddr src_rtp_name
int specific_info_cnt
pjmedia_transport_specific_info *spc_info
struct pjmedia_srtp_info:
int active
void pjmedia_transport_info_init(pjmedia_transport_info *info) nogil
int pjmedia_transport_udp_create3(pjmedia_endpt *endpt, int af, char *name, pj_str_t *addr, int port,
unsigned int options, pjmedia_transport **p_tp) nogil
int pjmedia_transport_get_info(pjmedia_transport *tp, pjmedia_transport_info *info) nogil
int pjmedia_transport_close(pjmedia_transport *tp) nogil
int pjmedia_transport_media_create(pjmedia_transport *tp, pj_pool_t *sdp_pool, unsigned int options,
pjmedia_sdp_session *rem_sdp, unsigned int media_index) nogil
int pjmedia_transport_encode_sdp(pjmedia_transport *tp, pj_pool_t *sdp_pool, pjmedia_sdp_session *sdp,
pjmedia_sdp_session *rem_sdp, unsigned int media_index) nogil
int pjmedia_transport_media_start(pjmedia_transport *tp, pj_pool_t *tmp_pool, pjmedia_sdp_session *sdp_local,
pjmedia_sdp_session *sdp_remote, unsigned int media_index) nogil
int pjmedia_transport_media_stop(pjmedia_transport *tp) nogil
int pjmedia_endpt_create_sdp(pjmedia_endpt *endpt, pj_pool_t *pool, unsigned int stream_cnt,
pjmedia_sock_info *sock_info, pjmedia_sdp_session **p_sdp) nogil
# SRTP
enum pjmedia_srtp_use:
PJMEDIA_SRTP_MANDATORY
struct pjmedia_srtp_setting:
pjmedia_srtp_use use
void pjmedia_srtp_setting_default(pjmedia_srtp_setting *opt) nogil
int pjmedia_transport_srtp_create(pjmedia_endpt *endpt, pjmedia_transport *tp,
pjmedia_srtp_setting *opt, pjmedia_transport **p_tp) nogil
# ICE
struct pjmedia_ice_cb:
void on_ice_complete(pjmedia_transport *tp, pj_ice_strans_op op, int status) with gil
void on_ice_state(pjmedia_transport *tp, pj_ice_strans_state prev, pj_ice_strans_state curr) with gil
void on_ice_stop(pjmedia_transport *tp, char *reason, int err) with gil
int pjmedia_ice_create2(pjmedia_endpt *endpt, char *name, unsigned int comp_cnt, pj_ice_strans_cfg *cfg,
pjmedia_ice_cb *cb, unsigned int options, pjmedia_transport **p_tp) nogil
pj_ice_strans *pjmedia_ice_get_strans(pjmedia_transport *tp) with gil
# stream
enum pjmedia_dir:
PJMEDIA_DIR_ENCODING
PJMEDIA_DIR_DECODING
struct pjmedia_codec_param_setting:
unsigned int vad
struct pjmedia_codec_param:
pjmedia_codec_param_setting setting
struct pjmedia_stream_info:
pjmedia_codec_info fmt
pjmedia_codec_param *param
unsigned int tx_event_pt
struct pjmedia_rtcp_stream_stat_loss_type:
unsigned int burst
unsigned int random
struct pjmedia_rtcp_stream_stat:
unsigned int pkt
unsigned int bytes
unsigned int discard
unsigned int loss
unsigned int reorder
unsigned int dup
pj_math_stat loss_period
pjmedia_rtcp_stream_stat_loss_type loss_type
pj_math_stat jitter
struct pjmedia_rtcp_stat:
pjmedia_rtcp_stream_stat tx
pjmedia_rtcp_stream_stat rx
pj_math_stat rtt
struct pjmedia_stream
int pjmedia_stream_info_from_sdp(pjmedia_stream_info *si, pj_pool_t *pool, pjmedia_endpt *endpt,
pjmedia_sdp_session *local, pjmedia_sdp_session *remote, unsigned int stream_idx) nogil
int pjmedia_stream_create(pjmedia_endpt *endpt, pj_pool_t *pool, pjmedia_stream_info *info,
pjmedia_transport *tp, void *user_data, pjmedia_stream **p_stream) nogil
int pjmedia_stream_destroy(pjmedia_stream *stream) nogil
int pjmedia_stream_get_port(pjmedia_stream *stream, pjmedia_port **p_port) nogil
int pjmedia_stream_start(pjmedia_stream *stream) nogil
int pjmedia_stream_dial_dtmf(pjmedia_stream *stream, pj_str_t *ascii_digit) nogil
int pjmedia_stream_set_dtmf_callback(pjmedia_stream *stream,
void cb(pjmedia_stream *stream, void *user_data, int digit) with gil,
void *user_data) nogil
int pjmedia_stream_pause(pjmedia_stream *stream, pjmedia_dir dir) nogil
int pjmedia_stream_resume(pjmedia_stream *stream, pjmedia_dir dir) nogil
int pjmedia_stream_get_stat(pjmedia_stream *stream, pjmedia_rtcp_stat *stat) nogil
# wav player
enum:
PJMEDIA_FILE_NO_LOOP
int pjmedia_port_destroy(pjmedia_port *port) nogil
int pjmedia_wav_player_port_create(pj_pool_t *pool, char *filename, unsigned int ptime, unsigned int flags,
unsigned int buff_size, pjmedia_port **p_port) nogil
int pjmedia_wav_player_set_eof_cb(pjmedia_port *port, void *user_data,
int cb(pjmedia_port *port, void *usr_data) with gil) nogil
int pjmedia_wav_player_port_set_pos(pjmedia_port *port, unsigned int offset) nogil
# wav recorder
enum pjmedia_file_writer_option:
PJMEDIA_FILE_WRITE_PCM
int pjmedia_wav_writer_port_create(pj_pool_t *pool, char *filename, unsigned int clock_rate,
unsigned int channel_count, unsigned int samples_per_frame,
unsigned int bits_per_sample, unsigned int flags, int buff_size,
pjmedia_port **p_port) nogil
# tone generator
enum:
PJMEDIA_TONEGEN_MAX_DIGITS
struct pjmedia_tone_desc:
short freq1
short freq2
short on_msec
short off_msec
short volume
short flags
struct pjmedia_tone_digit:
char digit
short on_msec
short off_msec
short volume
int pjmedia_tonegen_create(pj_pool_t *pool, unsigned int clock_rate, unsigned int channel_count,
unsigned int samples_per_frame, unsigned int bits_per_sample,
unsigned int options, pjmedia_port **p_port) nogil
int pjmedia_tonegen_play(pjmedia_port *tonegen, unsigned int count, pjmedia_tone_desc *tones, unsigned int options) nogil
int pjmedia_tonegen_play_digits(pjmedia_port *tonegen, unsigned int count,
pjmedia_tone_digit *digits, unsigned int options) nogil
int pjmedia_tonegen_stop(pjmedia_port *tonegen) nogil
int pjmedia_tonegen_is_busy(pjmedia_port *tonegen) nogil
cdef extern from "pjmedia-codec.h":
# codecs
enum:
PJMEDIA_SPEEX_NO_NB
int pjmedia_codec_gsm_init(pjmedia_endpt *endpt) nogil
int pjmedia_codec_gsm_deinit() nogil
int pjmedia_codec_g722_init(pjmedia_endpt *endpt) nogil
int pjmedia_codec_g722_deinit() nogil
int pjmedia_codec_opus_init(pjmedia_endpt *endpt) nogil
int pjmedia_codec_opus_deinit() nogil
int pjmedia_codec_ilbc_init(pjmedia_endpt *endpt, int mode) nogil
int pjmedia_codec_ilbc_deinit() nogil
int pjmedia_codec_speex_init(pjmedia_endpt *endpt, int options, int quality, int complexity) nogil
int pjmedia_codec_speex_deinit() nogil
cdef extern from "pjsip.h":
# messages
enum pjsip_status_code:
PJSIP_SC_TSX_TIMEOUT
PJSIP_SC_TSX_TRANSPORT_ERROR
PJSIP_TLS_EUNKNOWN
PJSIP_TLS_EINVMETHOD
PJSIP_TLS_ECACERT
PJSIP_TLS_ECERTFILE
PJSIP_TLS_EKEYFILE
PJSIP_TLS_ECIPHER
PJSIP_TLS_ECTX
struct pjsip_transport
enum pjsip_uri_context_e:
PJSIP_URI_IN_CONTACT_HDR
struct pjsip_param:
pj_str_t name
pj_str_t value
struct pjsip_uri
struct pjsip_sip_uri:
pj_str_t user
pj_str_t passwd
pj_str_t host
int port
pj_str_t user_param
pj_str_t method_param
pj_str_t transport_param
int ttl_param
int lr_param
pj_str_t maddr_param
pjsip_param other_param
pjsip_param header_param
struct pjsip_name_addr:
pj_str_t display
pjsip_uri *uri
struct pjsip_media_type:
pj_str_t type
pj_str_t subtype
pjsip_param param
enum pjsip_method_e:
PJSIP_OPTIONS_METHOD
PJSIP_CANCEL_METHOD
PJSIP_OTHER_METHOD
struct pjsip_method:
pjsip_method_e id
pj_str_t name
struct pjsip_host_port:
pj_str_t host
int port
enum pjsip_hdr_e:
PJSIP_H_VIA
PJSIP_H_CALL_ID
PJSIP_H_CONTACT
PJSIP_H_CSEQ
PJSIP_H_EXPIRES
PJSIP_H_FROM
struct pjsip_hdr:
pjsip_hdr_e type
pj_str_t name
ctypedef pjsip_hdr *pjsip_hdr_ptr_const "const pjsip_hdr*"
struct pjsip_generic_array_hdr:
unsigned int count
pj_str_t *values
struct pjsip_generic_string_hdr:
pj_str_t name
pj_str_t hvalue
struct pjsip_cid_hdr:
pj_str_t id
struct pjsip_contact_hdr:
int star
pjsip_uri *uri
int q1000
int expires
pjsip_param other_param
struct pjsip_clen_hdr:
int len
struct pjsip_ctype_hdr:
pjsip_media_type media
struct pjsip_cseq_hdr:
int cseq
pjsip_method method
struct pjsip_generic_int_hdr:
int ivalue
ctypedef pjsip_generic_int_hdr pjsip_expires_hdr
struct pjsip_fromto_hdr:
pjsip_uri *uri
pj_str_t tag
pjsip_param other_param
struct pjsip_routing_hdr:
pjsip_name_addr name_addr
pjsip_param other_param
ctypedef pjsip_routing_hdr pjsip_route_hdr
struct pjsip_retry_after_hdr:
int ivalue
pjsip_param param
pj_str_t comment
struct pjsip_via_hdr:
pj_str_t transport
pjsip_host_port sent_by
int ttl_param
int rport_param
pj_str_t maddr_param
pj_str_t recvd_param
pj_str_t branch_param
pjsip_param other_param
pj_str_t comment
enum:
PJSIP_MAX_ACCEPT_COUNT
struct pjsip_msg_body:
pjsip_media_type content_type
struct pjsip_request_line:
pjsip_method method
pjsip_uri *uri
struct pjsip_status_line:
int code
pj_str_t reason
union pjsip_msg_line:
pjsip_request_line req
pjsip_status_line status
enum pjsip_msg_type_e:
PJSIP_REQUEST_MSG
PJSIP_RESPONSE_MSG
struct pjsip_msg:
pjsip_msg_type_e type
pjsip_msg_line line
pjsip_hdr hdr
pjsip_msg_body *body
struct pjsip_buffer:
char *start
char *cur
struct pjsip_tx_data_tp_info:
char *dst_name
int dst_port
pjsip_transport *transport
struct pjsip_tx_data:
pjsip_msg *msg
pj_pool_t *pool
pjsip_buffer buf
pjsip_tx_data_tp_info tp_info
struct pjsip_rx_data_tp_info:
pj_pool_t *pool
pjsip_transport *transport
struct pjsip_rx_data_pkt_info:
pj_time_val timestamp
char *packet
int len
char *src_name
int src_port
struct pjsip_rx_data_msg_info:
pjsip_msg *msg
pjsip_fromto_hdr *from_hdr "from"
pjsip_fromto_hdr *to_hdr "to"
pjsip_via_hdr *via
struct pjsip_rx_data:
pjsip_rx_data_pkt_info pkt_info
pjsip_rx_data_tp_info tp_info
pjsip_rx_data_msg_info msg_info
void *pjsip_hdr_clone(pj_pool_t *pool, void *hdr) nogil
void pjsip_msg_add_hdr(pjsip_msg *msg, pjsip_hdr *hdr) nogil
void *pjsip_msg_find_hdr(pjsip_msg *msg, pjsip_hdr_e type, void *start) nogil
void *pjsip_msg_find_hdr_by_name(pjsip_msg *msg, pj_str_t *name, void *start) nogil
void *pjsip_msg_find_remove_hdr_by_name(pjsip_msg *msg, pj_str_t *name, void *start) nogil
pjsip_generic_string_hdr *pjsip_generic_string_hdr_create(pj_pool_t *pool, pj_str_t *hname, pj_str_t *hvalue) nogil
pjsip_contact_hdr *pjsip_contact_hdr_create(pj_pool_t *pool) nogil
pjsip_expires_hdr *pjsip_expires_hdr_create(pj_pool_t *pool, int value) nogil
pjsip_msg_body *pjsip_msg_body_create(pj_pool_t *pool, pj_str_t *type, pj_str_t *subtype, pj_str_t *text) nogil
pjsip_route_hdr *pjsip_route_hdr_init(pj_pool_t *pool, void *mem) nogil
void pjsip_sip_uri_init(pjsip_sip_uri *url, int secure) nogil
int pjsip_tx_data_dec_ref(pjsip_tx_data *tdata) nogil
void pjsip_tx_data_add_ref(pjsip_tx_data *tdata) nogil
pj_str_t *pjsip_uri_get_scheme(pjsip_uri *uri) nogil
void *pjsip_uri_get_uri(pjsip_uri *uri) nogil
int pjsip_uri_print(pjsip_uri_context_e context, void *uri, char *buf, unsigned int size) nogil
int PJSIP_URI_SCHEME_IS_SIP(pjsip_sip_uri *uri) nogil
enum:
PJSIP_PARSE_URI_AS_NAMEADDR
pjsip_uri *pjsip_parse_uri(pj_pool_t *pool, char *buf, unsigned int size, unsigned int options) nogil
void pjsip_method_init_np(pjsip_method *m, pj_str_t *str) nogil
pj_str_t *pjsip_get_status_text(int status_code) nogil
int pjsip_print_body(pjsip_msg_body *msg_body, char **buf, int *len)
# module
enum pjsip_module_priority:
PJSIP_MOD_PRIORITY_APPLICATION
PJSIP_MOD_PRIORITY_DIALOG_USAGE
PJSIP_MOD_PRIORITY_TRANSPORT_LAYER
struct pjsip_event
struct pjsip_transaction
struct pjsip_module:
pj_str_t name
int id
int priority
int on_rx_request(pjsip_rx_data *rdata) with gil
int on_rx_response(pjsip_rx_data *rdata) with gil
int on_tx_request(pjsip_tx_data *tdata) with gil
int on_tx_response(pjsip_tx_data *tdata) with gil
void on_tsx_state(pjsip_transaction *tsx, pjsip_event *event) with gil
+ # transport manager
+ struct pjsip_tpmgr
+ struct pjsip_transport_state_info:
+ int status
+ enum pjsip_transport_state:
+ PJSIP_TP_STATE_CONNECTED
+ PJSIP_TP_STATE_DISCONNECTED
+ ctypedef pjsip_transport_state_info *pjsip_transport_state_info_ptr_const "const pjsip_transport_state_info *"
+ ctypedef void (*pjsip_tp_state_callback)(pjsip_transport *tp, pjsip_transport_state state, pjsip_transport_state_info_ptr_const info) with gil
+ int pjsip_tpmgr_set_state_cb(pjsip_tpmgr *mgr, pjsip_tp_state_callback cb)
+
# endpoint
struct pjsip_endpoint
int pjsip_endpt_create(pj_pool_factory *pf, char *name, pjsip_endpoint **endpt) nogil
void pjsip_endpt_destroy(pjsip_endpoint *endpt) nogil
pj_pool_t *pjsip_endpt_create_pool(pjsip_endpoint *endpt, char *pool_name, int initial, int increment) nogil
void pjsip_endpt_release_pool(pjsip_endpoint *endpt, pj_pool_t *pool) nogil
int pjsip_endpt_handle_events(pjsip_endpoint *endpt, pj_time_val *max_timeout) nogil
int pjsip_endpt_register_module(pjsip_endpoint *endpt, pjsip_module *module) nogil
int pjsip_endpt_schedule_timer(pjsip_endpoint *endpt, pj_timer_entry *entry, pj_time_val *delay) nogil
void pjsip_endpt_cancel_timer(pjsip_endpoint *endpt, pj_timer_entry *entry) nogil
enum:
PJSIP_H_ACCEPT
PJSIP_H_ALLOW
PJSIP_H_SUPPORTED
pjsip_hdr_ptr_const pjsip_endpt_get_capability(pjsip_endpoint *endpt, int htype, pj_str_t *hname) nogil
int pjsip_endpt_add_capability(pjsip_endpoint *endpt, pjsip_module *mod, int htype,
pj_str_t *hname, unsigned count, pj_str_t *tags) nogil
int pjsip_endpt_create_response(pjsip_endpoint *endpt, pjsip_rx_data *rdata,
int st_code, pj_str_t *st_text, pjsip_tx_data **p_tdata) nogil
int pjsip_endpt_send_response2(pjsip_endpoint *endpt, pjsip_rx_data *rdata,
pjsip_tx_data *tdata, void *token, void *cb) nogil
int pjsip_endpt_respond_stateless(pjsip_endpoint *endpt, pjsip_rx_data *rdata,
int st_code, pj_str_t *st_text, pjsip_hdr *hdr_list, pjsip_msg_body *body) nogil
int pjsip_endpt_create_request(pjsip_endpoint *endpt, pjsip_method *method, pj_str_t *target, pj_str_t *frm,
pj_str_t *to, pj_str_t *contact, pj_str_t *call_id,
int cseq,pj_str_t *text, pjsip_tx_data **p_tdata) nogil
pj_timer_heap_t *pjsip_endpt_get_timer_heap(pjsip_endpoint *endpt) nogil
int pjsip_endpt_create_resolver(pjsip_endpoint *endpt, pj_dns_resolver **p_resv) nogil
int pjsip_endpt_set_resolver(pjsip_endpoint *endpt, pj_dns_resolver *resv) nogil
pj_dns_resolver* pjsip_endpt_get_resolver(pjsip_endpoint *endpt) nogil
+ pjsip_tpmgr* pjsip_endpt_get_tpmgr(pjsip_endpoint *endpt)
# transports
enum pjsip_ssl_method:
PJSIP_TLSV1_METHOD
struct pjsip_transport:
char *type_name
pjsip_host_port local_name
struct pjsip_tpfactory:
pjsip_host_port addr_name
int destroy(pjsip_tpfactory *factory) nogil
struct pjsip_tls_setting:
pj_str_t ca_list_file
pj_str_t cert_file
pj_str_t privkey_file
int method
int verify_server
pj_time_val timeout
enum pjsip_tpselector_type:
PJSIP_TPSELECTOR_TRANSPORT
union pjsip_tpselector_u:
pjsip_transport *transport
struct pjsip_tpselector:
pjsip_tpselector_type type
pjsip_tpselector_u u
int pjsip_transport_shutdown(pjsip_transport *tp) nogil
int pjsip_udp_transport_start(pjsip_endpoint *endpt, pj_sockaddr_in *local, pjsip_host_port *a_name,
unsigned int async_cnt, pjsip_transport **p_transport) nogil
int pjsip_tcp_transport_start2(pjsip_endpoint *endpt, pj_sockaddr_in *local, pjsip_host_port *a_name,
unsigned int async_cnt, pjsip_tpfactory **p_tpfactory) nogil
int pjsip_tls_transport_start(pjsip_endpoint *endpt, pjsip_tls_setting *opt, pj_sockaddr_in *local,
pjsip_host_port *a_name, unsigned async_cnt, pjsip_tpfactory **p_factory) nogil
void pjsip_tls_setting_default(pjsip_tls_setting *tls_opt) nogil
int pjsip_transport_shutdown(pjsip_transport *tp) nogil
# transaction layer
enum pjsip_role_e:
PJSIP_ROLE_UAC
PJSIP_ROLE_UAS
enum pjsip_tsx_state_e:
PJSIP_TSX_STATE_TRYING
PJSIP_TSX_STATE_PROCEEDING
PJSIP_TSX_STATE_COMPLETED
PJSIP_TSX_STATE_TERMINATED
struct pjsip_transaction:
int status_code
pj_str_t status_text
pjsip_role_e role
pjsip_tx_data *last_tx
pjsip_tsx_state_e state
void **mod_data
pjsip_method method
int pjsip_tsx_layer_init_module(pjsip_endpoint *endpt) nogil
int pjsip_tsx_create_key(pj_pool_t *pool, pj_str_t *key, pjsip_role_e role,
pjsip_method *method, pjsip_rx_data *rdata) nogil
pjsip_transaction *pjsip_tsx_layer_find_tsx(pj_str_t *key, int lock) nogil
int pjsip_tsx_create_uac(pjsip_module *tsx_user, pjsip_tx_data *tdata, pjsip_transaction **p_tsx) nogil
int pjsip_tsx_terminate(pjsip_transaction *tsx, int code) nogil
int pjsip_tsx_send_msg(pjsip_transaction *tsx, pjsip_tx_data *tdata) nogil
pjsip_transaction *pjsip_rdata_get_tsx(pjsip_rx_data *rdata) nogil
int pjsip_tsx_create_uas(pjsip_module *tsx_user, pjsip_rx_data *rdata, pjsip_transaction **p_tsx) nogil
void pjsip_tsx_recv_msg(pjsip_transaction *tsx, pjsip_rx_data *rdata) nogil
# event
enum pjsip_event_id_e:
PJSIP_EVENT_TSX_STATE
PJSIP_EVENT_RX_MSG
PJSIP_EVENT_TX_MSG
PJSIP_EVENT_TRANSPORT_ERROR
PJSIP_EVENT_TIMER
union pjsip_event_body_tsx_state_src:
pjsip_rx_data *rdata
pjsip_tx_data *tdata
struct pjsip_event_body_tsx_state:
pjsip_event_body_tsx_state_src src
pjsip_transaction *tsx
pjsip_event_id_e type
struct pjsip_event_body_rx_msg:
pjsip_rx_data *rdata
union pjsip_event_body:
pjsip_event_body_tsx_state tsx_state
pjsip_event_body_rx_msg rx_msg
struct pjsip_event:
pjsip_event_id_e type
pjsip_event_body body
int pjsip_endpt_send_request(pjsip_endpoint *endpt, pjsip_tx_data *tdata, int timeout,
void *token, void cb(void *token, pjsip_event *e) with gil) nogil
# auth
enum:
PJSIP_EFAILEDCREDENTIAL
enum pjsip_cred_data_type:
PJSIP_CRED_DATA_PLAIN_PASSWD
struct pjsip_cred_info:
pj_str_t realm
pj_str_t scheme
pj_str_t username
pjsip_cred_data_type data_type
pj_str_t data
struct pjsip_auth_clt_sess:
pass
int pjsip_auth_clt_init(pjsip_auth_clt_sess *sess, pjsip_endpoint *endpt, pj_pool_t *pool, unsigned int options) nogil
int pjsip_auth_clt_set_credentials(pjsip_auth_clt_sess *sess, int cred_cnt, pjsip_cred_info *c) nogil
int pjsip_auth_clt_reinit_req(pjsip_auth_clt_sess *sess, pjsip_rx_data *rdata,
pjsip_tx_data *old_request, pjsip_tx_data **new_request) nogil
# dialog layer
ctypedef pjsip_module pjsip_user_agent
struct pjsip_dlg_party:
pjsip_contact_hdr *contact
pjsip_fromto_hdr *info
struct pjsip_dialog:
pjsip_auth_clt_sess auth_sess
pjsip_cid_hdr *call_id
pj_pool_t *pool
pjsip_dlg_party local
pjsip_dlg_party remote
struct pjsip_ua_init_param:
pjsip_dialog *on_dlg_forked(pjsip_dialog *first_set, pjsip_rx_data *res) nogil
int pjsip_ua_init_module(pjsip_endpoint *endpt, pjsip_ua_init_param *prm) nogil
pjsip_user_agent *pjsip_ua_instance() nogil
int pjsip_dlg_create_uac(pjsip_user_agent *ua, pj_str_t *local_uri, pj_str_t *local_contact,
pj_str_t *remote_uri, pj_str_t *target, pjsip_dialog **p_dlg) nogil
int pjsip_dlg_set_route_set(pjsip_dialog *dlg, pjsip_route_hdr *route_set) nogil
int pjsip_dlg_create_uas(pjsip_user_agent *ua, pjsip_rx_data *rdata, pj_str_t *contact, pjsip_dialog **p_dlg) nogil
int pjsip_dlg_terminate(pjsip_dialog *dlg) nogil
int pjsip_dlg_set_transport(pjsip_dialog *dlg, pjsip_tpselector *sel) nogil
int pjsip_dlg_respond(pjsip_dialog *dlg, pjsip_rx_data *rdata, int st_code,
pj_str_t *st_text, pjsip_hdr *hdr_list, pjsip_msg_body *body) nogil
int pjsip_dlg_create_response(pjsip_dialog *dlg, pjsip_rx_data *rdata,
int st_code, pj_str_t *st_text, pjsip_tx_data **tdata) nogil
int pjsip_dlg_modify_response(pjsip_dialog *dlg, pjsip_tx_data *tdata, int st_code, pj_str_t *st_text) nogil
int pjsip_dlg_send_response(pjsip_dialog *dlg, pjsip_transaction *tsx, pjsip_tx_data *tdata) nogil
void pjsip_dlg_inc_lock(pjsip_dialog *dlg) nogil
void pjsip_dlg_dec_lock(pjsip_dialog *dlg) nogil
int pjsip_dlg_inc_session(pjsip_dialog *dlg, pjsip_module *mod) nogil
int pjsip_dlg_dec_session(pjsip_dialog *dlg, pjsip_module *mod) nogil
cdef extern from "pjsip-simple/evsub_msg.h":
struct pjsip_event_hdr:
pj_str_t event_type
pj_str_t id_param
pjsip_param other_param
struct pjsip_sub_state_hdr:
pj_str_t sub_state
pj_str_t reason_param
int expires_param
int retry_after
pjsip_param other_param
pjsip_event_hdr *pjsip_event_hdr_create(pj_pool_t *pool) nogil
cdef extern from "pjsip_simple.h":
# subscribe / notify
enum:
PJSIP_EVSUB_NO_EVENT_ID
enum pjsip_evsub_state:
PJSIP_EVSUB_STATE_PENDING
PJSIP_EVSUB_STATE_ACTIVE
PJSIP_EVSUB_STATE_TERMINATED
struct pjsip_evsub
struct pjsip_evsub_user:
void on_evsub_state(pjsip_evsub *sub, pjsip_event *event) with gil
void on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event) with gil
void on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text,
pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil
void on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
pj_str_t **p_st_text,pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil
void on_client_refresh(pjsip_evsub *sub) with gil
void on_server_timeout(pjsip_evsub *sub) with gil
int pjsip_evsub_init_module(pjsip_endpoint *endpt) nogil
int pjsip_evsub_register_pkg(pjsip_module *pkg_mod, pj_str_t *event_name,
unsigned int expires, unsigned int accept_cnt, pj_str_t *accept) nogil
int pjsip_evsub_create_uac(pjsip_dialog *dlg, pjsip_evsub_user *user_cb,
pj_str_t *event, int option, pjsip_evsub **p_evsub) nogil
int pjsip_evsub_create_uas(pjsip_dialog *dlg, pjsip_evsub_user *user_cb,
pjsip_rx_data *rdata, unsigned int option, pjsip_evsub **p_evsub) nogil
int pjsip_evsub_initiate(pjsip_evsub *sub, void *method, unsigned int expires, pjsip_tx_data **p_tdata) nogil
int pjsip_evsub_send_request(pjsip_evsub *sub, pjsip_tx_data *tdata) nogil
int pjsip_evsub_terminate(pjsip_evsub *sub, int notify) nogil
char *pjsip_evsub_get_state_name(pjsip_evsub *sub) nogil
void pjsip_evsub_set_mod_data(pjsip_evsub *sub, int mod_id, void *data) nogil
void *pjsip_evsub_get_mod_data(pjsip_evsub *sub, int mod_id) nogil
void pjsip_evsub_update_expires(pjsip_evsub *sub, int interval) nogil
void pjsip_evsub_set_timer(pjsip_evsub *sub, int timer_id, int seconds) nogil
pjsip_hdr *pjsip_evsub_get_allow_events_hdr(pjsip_module *m) nogil
int pjsip_evsub_notify(pjsip_evsub *sub, pjsip_evsub_state state,
pj_str_t *state_str, pj_str_t *reason, pjsip_tx_data **p_tdata) nogil
cdef extern from "pjsip_ua.h":
# 100rel / PRACK
int pjsip_100rel_init_module(pjsip_endpoint *endpt) nogil
# invite sessions
enum pjsip_inv_option:
PJSIP_INV_SUPPORT_100REL
enum pjsip_inv_state:
PJSIP_INV_STATE_INCOMING
PJSIP_INV_STATE_CONFIRMED
struct pjsip_inv_session:
pjsip_inv_state state
void **mod_data
pjmedia_sdp_neg *neg
int cause
pj_str_t cause_text
int cancelling
pjsip_transaction *invite_tsx
struct pjsip_inv_callback:
void on_state_changed(pjsip_inv_session *inv, pjsip_event *e) with gil
void on_new_session(pjsip_inv_session *inv, pjsip_event *e) with gil
void on_tsx_state_changed(pjsip_inv_session *inv, pjsip_transaction *tsx, pjsip_event *e) with gil
void on_media_update(pjsip_inv_session *inv, int status) with gil
int on_rx_reinvite(pjsip_inv_session *inv, pjmedia_sdp_session_ptr_const offer, pjsip_rx_data *rdata) with gil
int pjsip_inv_usage_init(pjsip_endpoint *endpt, pjsip_inv_callback *cb) nogil
int pjsip_inv_terminate(pjsip_inv_session *inv, int st_code, int notify) nogil
int pjsip_inv_end_session(pjsip_inv_session *inv, int st_code, pj_str_t *st_text, pjsip_tx_data **p_tdata) nogil
int pjsip_inv_cancel_reinvite(pjsip_inv_session *inv, pjsip_tx_data **p_tdata) nogil
int pjsip_inv_send_msg(pjsip_inv_session *inv, pjsip_tx_data *tdata) nogil
int pjsip_inv_verify_request(pjsip_rx_data *rdata, unsigned int *options, pjmedia_sdp_session *sdp,
pjsip_dialog *dlg, pjsip_endpoint *endpt, pjsip_tx_data **tdata) nogil
int pjsip_inv_create_uas(pjsip_dialog *dlg, pjsip_rx_data *rdata, pjmedia_sdp_session *local_sdp,
unsigned int options, pjsip_inv_session **p_inv) nogil
int pjsip_inv_initial_answer(pjsip_inv_session *inv, pjsip_rx_data *rdata, int st_code,
pj_str_t *st_text, pjmedia_sdp_session *sdp, pjsip_tx_data **p_tdata) nogil
int pjsip_inv_answer(pjsip_inv_session *inv, int st_code, pj_str_t *st_text,
pjmedia_sdp_session *local_sdp, pjsip_tx_data **p_tdata) nogil
int pjsip_inv_create_uac(pjsip_dialog *dlg, pjmedia_sdp_session *local_sdp,
unsigned int options, pjsip_inv_session **p_inv) nogil
int pjsip_inv_invite(pjsip_inv_session *inv, pjsip_tx_data **p_tdata) nogil
char *pjsip_inv_state_name(pjsip_inv_state state) nogil
int pjsip_inv_reinvite(pjsip_inv_session *inv, pj_str_t *new_contact,
pjmedia_sdp_session *new_offer, pjsip_tx_data **p_tdata) nogil
int pjsip_create_sdp_body(pj_pool_t *pool, pjmedia_sdp_session *sdp, pjsip_msg_body **p_body) nogil
# Replaces
struct pjsip_replaces_hdr:
pj_str_t call_id
pj_str_t to_tag
pj_str_t from_tag
int early_only
pjsip_param other_param
pjsip_replaces_hdr *pjsip_replaces_hdr_create(pj_pool_t *pool) nogil
int pjsip_replaces_verify_request(pjsip_rx_data *rdata, pjsip_dialog **p_dlg, int lock_dlg, pjsip_tx_data **p_tdata) nogil
int pjsip_replaces_init_module(pjsip_endpoint *endpt) nogil
# declarations
# core.util
cdef class frozenlist(object):
# attributes
cdef int initialized
cdef list list
cdef long hash
cdef class frozendict(object):
# attributes
cdef int initialized
cdef dict dict
cdef long hash
cdef class PJSTR(object):
# attributes
cdef pj_str_t pj_str
cdef object str
# core.lib
cdef class PJLIB(object):
# attributes
cdef int _init_done
cdef class PJCachingPool(object):
# attributes
cdef pj_caching_pool _obj
cdef int _init_done
cdef class PJSIPEndpoint(object):
# attributes
cdef pjsip_endpoint *_obj
cdef pj_pool_t *_pool
cdef pjsip_transport *_udp_transport
cdef pjsip_tpfactory *_tcp_transport
cdef pjsip_tpfactory *_tls_transport
cdef int _tls_verify_server
cdef PJSTR _tls_ca_file
cdef PJSTR _tls_cert_file
cdef PJSTR _tls_privkey_file
cdef object _local_ip_used
cdef int _tls_timeout
# private methods
cdef int _make_local_addr(self, pj_sockaddr_in *local_addr, object ip_address, int port) except -1
cdef int _start_udp_transport(self, int port) except -1
cdef int _stop_udp_transport(self) except -1
cdef int _start_tcp_transport(self, int port) except -1
cdef int _stop_tcp_transport(self) except -1
cdef int _start_tls_transport(self, port) except -1
cdef int _stop_tls_transport(self) except -1
cdef int _set_dns_nameservers(self, list servers) except -1
cdef class PJMEDIAEndpoint(object):
# attributes
cdef pjmedia_endpt *_obj
cdef int _has_speex
cdef int _has_g722
cdef int _has_g711
cdef int _has_ilbc
cdef int _has_gsm
cdef int _has_opus
# private methods
cdef list _get_codecs(self)
cdef list _get_all_codecs(self)
cdef list _get_current_codecs(self)
cdef int _set_codecs(self, list req_codecs) except -1
# core.helper
cdef class BaseCredentials(object):
# attributes
cdef pjsip_cred_info _credentials
# private methods
cdef pjsip_cred_info* get_cred_info(self)
cdef class Credentials(BaseCredentials):
# attributes
cdef str _username
cdef str _realm
cdef str _password
cdef class FrozenCredentials(BaseCredentials):
# attributes
cdef int initialized
cdef readonly str username
cdef readonly str realm
cdef readonly str password
cdef class BaseSIPURI(object):
pass
cdef class SIPURI(BaseSIPURI):
# attributes
cdef public object user
cdef public object password
cdef public object host
cdef public bint secure
cdef public dict parameters
cdef public dict headers
cdef object _port
cdef class FrozenSIPURI(BaseSIPURI):
# attributes
cdef int initialized
cdef readonly object user
cdef readonly object password
cdef readonly object host
cdef readonly object port
cdef readonly bint secure
cdef readonly frozendict parameters
cdef readonly frozendict headers
cdef SIPURI SIPURI_create(pjsip_sip_uri *base_uri)
cdef FrozenSIPURI FrozenSIPURI_create(pjsip_sip_uri *base_uri)
# core.headers
cdef class BaseHeader(object):
pass
cdef class Header(BaseHeader):
# attributes
cdef str _name
cdef str _body
cdef class FrozenHeader(BaseHeader):
# attributes
cdef readonly str name
cdef readonly str body
cdef class BaseContactHeader(object):
pass
cdef class ContactHeader(BaseContactHeader):
# attributes
cdef SIPURI _uri
cdef unicode _display_name
cdef dict _parameters
cdef class FrozenContactHeader(BaseContactHeader):
# attributes
cdef int initialized
cdef readonly FrozenSIPURI uri
cdef readonly unicode display_name
cdef readonly frozendict parameters
cdef class BaseContentTypeHeader(object):
pass
cdef class ContentTypeHeader(BaseContentTypeHeader):
# attributes
cdef str _content_type
cdef dict _parameters
cdef class FrozenContentTypeHeader(BaseContentTypeHeader):
# attributes
cdef int initialized
cdef readonly str _content_type
cdef readonly frozendict parameters
cdef class BaseIdentityHeader(object):
pass
cdef class IdentityHeader(BaseIdentityHeader):
# attributes
cdef SIPURI _uri
cdef public unicode display_name
cdef dict _parameters
cdef class FrozenIdentityHeader(BaseIdentityHeader):
# attributes
cdef int initialized
cdef readonly FrozenSIPURI uri
cdef readonly unicode display_name
cdef readonly frozendict parameters
cdef class FromHeader(IdentityHeader):
pass
cdef class FrozenFromHeader(FrozenIdentityHeader):
pass
cdef class ToHeader(IdentityHeader):
pass
cdef class FrozenToHeader(FrozenIdentityHeader):
pass
cdef class RouteHeader(IdentityHeader):
pass
cdef class FrozenRouteHeader(FrozenIdentityHeader):
pass
cdef class RecordRouteHeader(IdentityHeader):
pass
cdef class FrozenRecordRouteHeader(FrozenIdentityHeader):
pass
cdef class BaseRetryAfterHeader(object):
pass
cdef class RetryAfterHeader(BaseRetryAfterHeader):
# attributes
cdef public int seconds
cdef public str comment
cdef dict _parameters
cdef class FrozenRetryAfterHeader(BaseRetryAfterHeader):
# attributes
cdef int initialized
cdef readonly int seconds
cdef readonly str comment
cdef readonly frozendict parameters
cdef class BaseViaHeader(object):
pass
cdef class ViaHeader(BaseViaHeader):
# attributes
cdef str _transport
cdef str _host
cdef int _port
cdef dict _parameters
cdef class FrozenViaHeader(BaseViaHeader):
# attributes
cdef int initialized
cdef readonly str transport
cdef readonly str host
cdef readonly int port
cdef readonly frozendict parameters
cdef class BaseWarningHeader(object):
pass
cdef class WarningHeader(BaseWarningHeader):
# attributes
cdef int _code
cdef str _agent
cdef str _text
cdef class FrozenWarningHeader(BaseWarningHeader):
# attributes
cdef int initialized
cdef readonly int code
cdef readonly str agent
cdef readonly str text
cdef class BaseEventHeader(object):
pass
cdef class EventHeader(BaseEventHeader):
# attributes
cdef public event
cdef dict _parameters
cdef class FrozenEventHeader(BaseEventHeader):
# attributes
cdef int initialized
cdef readonly str event
cdef readonly frozendict parameters
cdef class BaseSubscriptionStateHeader(object):
pass
cdef class SubscriptionStateHeader(BaseSubscriptionStateHeader):
# attributes
cdef public state
cdef dict _parameters
cdef class FrozenSubscriptionStateHeader(BaseSubscriptionStateHeader):
# attributes
cdef int initialized
cdef readonly str state
cdef readonly frozendict parameters
cdef class BaseReasonHeader(object):
pass
cdef class ReasonHeader(BaseReasonHeader):
# attributes
cdef public str protocol
cdef public dict parameters
cdef class FrozenReasonHeader(BaseReasonHeader):
# attributes
cdef int initialized
cdef readonly str protocol
cdef readonly frozendict parameters
cdef class BaseReferToHeader(object):
pass
cdef class ReferToHeader(BaseReferToHeader):
# attributes
cdef public str uri
cdef dict _parameters
cdef class FrozenReferToHeader(BaseReferToHeader):
# attributes
cdef int initialized
cdef readonly str uri
cdef readonly frozendict parameters
cdef class BaseSubjectHeader(object):
pass
cdef class SubjectHeader(BaseSubjectHeader):
# attributes
cdef public unicode subject
cdef class FrozenSubjectHeader(BaseSubjectHeader):
# attributes
cdef int initialized
cdef readonly unicode subject
cdef class BaseReplacesHeader(object):
pass
cdef class ReplacesHeader(BaseReplacesHeader):
# attributes
cdef public str call_id
cdef public str from_tag
cdef public str to_tag
cdef public int early_only
cdef dict _parameters
cdef class FrozenReplacesHeader(BaseReplacesHeader):
# attributes
cdef int initialized
cdef readonly str call_id
cdef readonly str from_tag
cdef readonly str to_tag
cdef readonly int early_only
cdef readonly frozendict parameters
cdef Header Header_create(pjsip_generic_string_hdr *header)
cdef FrozenHeader FrozenHeader_create(pjsip_generic_string_hdr *header)
cdef ContactHeader ContactHeader_create(pjsip_contact_hdr *header)
cdef FrozenContactHeader FrozenContactHeader_create(pjsip_contact_hdr *header)
cdef ContentTypeHeader ContentTypeHeader_create(pjsip_ctype_hdr *header)
cdef FrozenContentTypeHeader FrozenContentTypeHeader_create(pjsip_ctype_hdr *header)
cdef FromHeader FromHeader_create(pjsip_fromto_hdr *header)
cdef FrozenFromHeader FrozenFromHeader_create(pjsip_fromto_hdr *header)
cdef ToHeader ToHeader_create(pjsip_fromto_hdr *header)
cdef FrozenToHeader FrozenToHeader_create(pjsip_fromto_hdr *header)
cdef RouteHeader RouteHeader_create(pjsip_routing_hdr *header)
cdef FrozenRouteHeader FrozenRouteHeader_create(pjsip_routing_hdr *header)
cdef RecordRouteHeader RecordRouteHeader_create(pjsip_routing_hdr *header)
cdef FrozenRecordRouteHeader FrozenRecordRouteHeader_create(pjsip_routing_hdr *header)
cdef RetryAfterHeader RetryAfterHeader_create(pjsip_retry_after_hdr *header)
cdef FrozenRetryAfterHeader FrozenRetryAfterHeader_create(pjsip_retry_after_hdr *header)
cdef ViaHeader ViaHeader_create(pjsip_via_hdr *header)
cdef FrozenViaHeader FrozenViaHeader_create(pjsip_via_hdr *header)
cdef EventHeader EventHeader_create(pjsip_event_hdr *header)
cdef FrozenEventHeader FrozenEventHeader_create(pjsip_event_hdr *header)
cdef SubscriptionStateHeader SubscriptionStateHeader_create(pjsip_sub_state_hdr *header)
cdef FrozenSubscriptionStateHeader FrozenSubscriptionStateHeader_create(pjsip_sub_state_hdr *header)
cdef ReferToHeader ReferToHeader_create(pjsip_generic_string_hdr *header)
cdef FrozenReferToHeader FrozenReferToHeader_create(pjsip_generic_string_hdr *header)
cdef SubjectHeader SubjectHeader_create(pjsip_generic_string_hdr *header)
cdef FrozenSubjectHeader FrozenSubjectHeader_create(pjsip_generic_string_hdr *header)
cdef ReplacesHeader ReplacesHeader_create(pjsip_replaces_hdr *header)
cdef FrozenReplacesHeader FrozenReplacesHeader_create(pjsip_replaces_hdr *header)
# core.util
cdef int _str_to_pj_str(object string, pj_str_t *pj_str) except -1
cdef object _pj_str_to_str(pj_str_t pj_str)
cdef object _pj_status_to_str(int status)
cdef object _pj_status_to_def(int status)
cdef dict _pjsip_param_to_dict(pjsip_param *param_list)
cdef int _dict_to_pjsip_param(object params, pjsip_param *param_list, pj_pool_t *pool)
cdef int _pjsip_msg_to_dict(pjsip_msg *msg, dict info_dict) except -1
cdef int _is_valid_ip(int af, object ip) except -1
cdef int _get_ip_version(object ip) except -1
cdef int _add_headers_to_tdata(pjsip_tx_data *tdata, object headers) except -1
cdef int _remove_headers_from_tdata(pjsip_tx_data *tdata, object headers) except -1
cdef int _BaseSIPURI_to_pjsip_sip_uri(BaseSIPURI uri, pjsip_sip_uri *pj_uri, pj_pool_t *pool) except -1
cdef int _BaseRouteHeader_to_pjsip_route_hdr(BaseIdentityHeader header, pjsip_route_hdr *pj_header, pj_pool_t *pool) except -1
# core.ua
ctypedef int (*timer_callback)(object, object) except -1 with gil
cdef class Timer(object):
# attributes
cdef int _scheduled
cdef double schedule_time
cdef timer_callback callback
cdef object obj
# private methods
cdef int schedule(self, float delay, timer_callback callback, object obj) except -1
cdef int cancel(self) except -1
cdef int call(self) except -1
cdef class PJSIPThread(object):
# attributes
cdef pj_thread_t *_obj
cdef long _thread_desc[PJ_THREAD_DESC_SIZE]
cdef class PJSIPUA(object):
# attributes
cdef object _threads
cdef object _event_handler
cdef list _timers
cdef PJLIB _pjlib
cdef PJCachingPool _caching_pool
cdef PJSIPEndpoint _pjsip_endpoint
cdef PJMEDIAEndpoint _pjmedia_endpoint
cdef pjsip_module _module
cdef PJSTR _module_name
cdef pjsip_module _trace_module
cdef PJSTR _trace_module_name
cdef pjsip_module _ua_tag_module
cdef PJSTR _ua_tag_module_name
cdef pjsip_module _event_module
cdef PJSTR _event_module_name
cdef int _trace_sip
cdef int _detect_sip_loops
cdef PJSTR _user_agent
cdef object _events
cdef object _sent_messages
cdef int _rtp_port_start
cdef int _rtp_port_count
cdef int _rtp_port_usable_count
cdef int _rtp_port_index
cdef pj_stun_config _stun_cfg
cdef int _fatal_error
cdef set _incoming_events
cdef set _incoming_requests
cdef pj_rwmutex_t *audio_change_rwlock
cdef list old_devices
# private methods
cdef object _get_sound_devices(self, int is_output)
cdef object _get_default_sound_device(self, int is_output)
cdef int _poll_log(self) except -1
cdef int _handle_exception(self, int is_fatal) except -1
cdef int _check_self(self) except -1
cdef int _check_thread(self) except -1
cdef int _add_timer(self, Timer timer) except -1
cdef int _remove_timer(self, Timer timer) except -1
cdef int _cb_rx_request(self, pjsip_rx_data *rdata) except 0
cdef pj_pool_t* create_memory_pool(self, bytes name, int initial_size, int resize_size)
cdef void release_memory_pool(self, pj_pool_t* pool)
cdef void reset_memory_pool(self, pj_pool_t* pool)
cdef int _PJSIPUA_cb_rx_request(pjsip_rx_data *rdata) with gil
cdef void _cb_detect_nat_type(void *user_data, pj_stun_nat_detect_result_ptr_const res) with gil
cdef int _cb_trace_rx(pjsip_rx_data *rdata) with gil
cdef int _cb_trace_tx(pjsip_tx_data *tdata) with gil
cdef int _cb_add_user_agent_hdr(pjsip_tx_data *tdata) with gil
cdef int _cb_add_server_hdr(pjsip_tx_data *tdata) with gil
cdef PJSIPUA _get_ua()
cdef int deallocate_weakref(object weak_ref, object timer) except -1 with gil
# core.sound
cdef class AudioMixer(object):
# attributes
cdef int _input_volume
cdef int _output_volume
cdef bint _muted
cdef pj_mutex_t *_lock
cdef pj_pool_t *_conf_pool
cdef pj_pool_t *_snd_pool
cdef pjmedia_conf *_obj
cdef pjmedia_master_port *_master_port
cdef pjmedia_port *_null_port
cdef pjmedia_snd_port *_snd
cdef list _connected_slots
cdef readonly int ec_tail_length
cdef readonly int sample_rate
cdef readonly int slot_count
cdef readonly int used_slot_count
cdef readonly unicode input_device
cdef readonly unicode output_device
cdef readonly unicode real_input_device
cdef readonly unicode real_output_device
# private methods
cdef void _start_sound_device(self, PJSIPUA ua, unicode input_device, unicode output_device, int ec_tail_length)
cdef void _stop_sound_device(self, PJSIPUA ua)
cdef int _add_port(self, PJSIPUA ua, pj_pool_t *pool, pjmedia_port *port) except -1 with gil
cdef int _remove_port(self, PJSIPUA ua, unsigned int slot) except -1 with gil
cdef int _cb_postpoll_stop_sound(self, timer) except -1
cdef class ToneGenerator(object):
# attributes
cdef int _slot
cdef int _volume
cdef pj_mutex_t *_lock
cdef pj_pool_t *_pool
cdef pjmedia_port *_obj
cdef Timer _timer
cdef readonly AudioMixer mixer
# private methods
cdef PJSIPUA _get_ua(self, int raise_exception)
cdef int _stop(self, PJSIPUA ua) except -1
cdef int _cb_check_done(self, timer) except -1
cdef class RecordingWaveFile(object):
# attributes
cdef int _slot
cdef int _was_started
cdef pj_mutex_t *_lock
cdef pj_pool_t *_pool
cdef pjmedia_port *_port
cdef readonly str filename
cdef readonly AudioMixer mixer
# private methods
cdef PJSIPUA _check_ua(self)
cdef int _stop(self, PJSIPUA ua) except -1
cdef class WaveFile(object):
# attributes
cdef object __weakref__
cdef object weakref
cdef int _slot
cdef int _volume
cdef pj_mutex_t *_lock
cdef pj_pool_t *_pool
cdef pjmedia_port *_port
cdef readonly str filename
cdef readonly AudioMixer mixer
# private methods
cdef PJSIPUA _check_ua(self)
cdef int _stop(self, PJSIPUA ua, int notify) except -1
cdef int _cb_eof(self, timer) except -1
cdef class MixerPort(object):
cdef int _slot
cdef int _was_started
cdef pj_mutex_t *_lock
cdef pj_pool_t *_pool
cdef pjmedia_port *_port
cdef readonly AudioMixer mixer
# private methods
cdef PJSIPUA _check_ua(self)
cdef int _stop(self, PJSIPUA ua) except -1
cdef int _AudioMixer_dealloc_handler(object obj) except -1
cdef int cb_play_wav_eof(pjmedia_port *port, void *user_data) with gil
# core.event
cdef struct _core_event
cdef struct _handler_queue
cdef int _event_queue_append(_core_event *event)
cdef void _cb_log(int level, char_ptr_const data, int len)
cdef int _add_event(object event_name, dict params) except -1
cdef list _get_clear_event_queue()
cdef int _add_handler(int func(object obj) except -1, object obj, _handler_queue *queue) except -1
cdef int _remove_handler(object obj, _handler_queue *queue) except -1
cdef int _process_handler_queue(PJSIPUA ua, _handler_queue *queue) except -1
# core.request
cdef class EndpointAddress(object):
# attributes
cdef readonly bytes ip
cdef readonly int port
cdef class Request(object):
# attributes
cdef readonly object state
cdef PJSTR _method
cdef readonly EndpointAddress peer_address
cdef readonly FrozenCredentials credentials
cdef readonly FrozenFromHeader from_header
cdef readonly FrozenToHeader to_header
cdef readonly FrozenSIPURI request_uri
cdef readonly FrozenContactHeader contact_header
cdef readonly FrozenRouteHeader route_header
cdef PJSTR _call_id
cdef readonly int cseq
cdef readonly frozenlist extra_headers
cdef PJSTR _content_type
cdef PJSTR _content_subtype
cdef PJSTR _body
cdef pjsip_tx_data *_tdata
cdef pjsip_transaction *_tsx
cdef pjsip_auth_clt_sess _auth
cdef pjsip_route_hdr _route_header
cdef int _need_auth
cdef pj_timer_entry _timer
cdef int _timer_active
cdef int _expire_rest
cdef object _expire_time
cdef object _timeout
# private methods
cdef PJSIPUA _get_ua(self)
cdef int _cb_tsx_state(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef int _cb_timer(self, PJSIPUA ua) except -1
cdef class IncomingRequest(object):
# attributes
cdef readonly str state
cdef pjsip_transaction *_tsx
cdef pjsip_tx_data *_tdata
cdef readonly EndpointAddress peer_address
# methods
cdef int init(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef void _Request_cb_tsx_state(pjsip_transaction *tsx, pjsip_event *event) with gil
cdef void _Request_cb_timer(pj_timer_heap_t *timer_heap, pj_timer_entry *entry) with gil
# core.referral
cdef class Referral(object):
# attributes
cdef pjsip_evsub *_obj
cdef pjsip_dialog *_dlg
cdef pjsip_route_hdr _route_header
cdef pj_list _route_set
cdef int _create_subscription
cdef readonly object state
cdef pj_timer_entry _timeout_timer
cdef int _timeout_timer_active
cdef pj_timer_entry _refresh_timer
cdef int _refresh_timer_active
cdef readonly EndpointAddress peer_address
cdef readonly FrozenFromHeader from_header
cdef readonly FrozenToHeader to_header
cdef readonly FrozenReferToHeader refer_to_header
cdef readonly FrozenRouteHeader route_header
cdef readonly FrozenCredentials credentials
cdef readonly FrozenContactHeader local_contact_header
cdef readonly FrozenContactHeader remote_contact_header
cdef readonly int refresh
cdef readonly frozenlist extra_headers
cdef pj_time_val _request_timeout
cdef int _want_end
cdef int _term_code
cdef object _term_reason
# private methods
cdef PJSIPUA _get_ua(self)
cdef int _update_contact_header(self, BaseContactHeader contact_header) except -1
cdef int _cancel_timers(self, PJSIPUA ua, int cancel_timeout, int cancel_refresh) except -1
cdef int _send_refer(self, PJSIPUA ua, pj_time_val *timeout, FrozenReferToHeader refer_to_header, frozenlist extra_headers) except -1
cdef int _send_subscribe(self, PJSIPUA ua, int expires, pj_time_val *timeout, frozenlist extra_headers) except -1
cdef int _cb_state(self, PJSIPUA ua, object state, int code, str reason) except -1
cdef int _cb_got_response(self, PJSIPUA ua, pjsip_rx_data *rdata, str method) except -1
cdef int _cb_notify(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef int _cb_timeout_timer(self, PJSIPUA ua)
cdef int _cb_refresh_timer(self, PJSIPUA ua)
cdef class IncomingReferral(object):
cdef pjsip_evsub *_obj
cdef pjsip_dialog *_dlg
cdef pjsip_tx_data *_initial_response
cdef pjsip_transaction *_initial_tsx
cdef pj_time_val _expires_time
cdef int _create_subscription
cdef readonly str state
cdef readonly EndpointAddress peer_address
cdef readonly FrozenContactHeader local_contact_header
cdef readonly FrozenContactHeader remote_contact_header
cdef PJSTR _content
cdef int init(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef PJSIPUA _get_ua(self, int raise_exception)
cdef int _set_content(self, int code, str reason) except -1
cdef int _set_state(self, str state) except -1
cdef int _send_initial_response(self, int code) except -1
cdef int _send_notify(self) except -1
cdef int _terminate(self, PJSIPUA ua, int do_cleanup) except -1
cdef int _cb_rx_refresh(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef int _cb_server_timeout(self, PJSIPUA ua) except -1
cdef int _cb_tsx(self, PJSIPUA ua, pjsip_event *event) except -1
cdef void _Referral_cb_state(pjsip_evsub *sub, pjsip_event *event) with gil
cdef void _Referral_cb_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil
cdef void _Referral_cb_refresh(pjsip_evsub *sub) with gil
cdef void _IncomingReferral_cb_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil
cdef void _IncomingReferral_cb_server_timeout(pjsip_evsub *sub) with gil
cdef void _IncomingReferral_cb_tsx(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event) with gil
# core.subscription
cdef class Subscription(object):
# attributes
cdef pjsip_evsub *_obj
cdef pjsip_dialog *_dlg
cdef pjsip_route_hdr _route_header
cdef pj_list _route_set
cdef pj_timer_entry _timeout_timer
cdef int _timeout_timer_active
cdef pj_timer_entry _refresh_timer
cdef int _refresh_timer_active
cdef readonly object state
cdef readonly EndpointAddress peer_address
cdef readonly FrozenFromHeader from_header
cdef readonly FrozenToHeader to_header
cdef readonly FrozenContactHeader contact_header
cdef readonly object event
cdef readonly FrozenRouteHeader route_header
cdef readonly FrozenCredentials credentials
cdef readonly int refresh
cdef readonly frozenlist extra_headers
cdef readonly object body
cdef readonly object content_type
cdef readonly str call_id
cdef pj_time_val _subscribe_timeout
cdef int _want_end
cdef int _term_code
cdef object _term_reason
cdef int _expires
# private methods
cdef PJSIPUA _get_ua(self)
cdef int _cancel_timers(self, PJSIPUA ua, int cancel_timeout, int cancel_refresh) except -1
cdef int _send_subscribe(self, PJSIPUA ua, int expires, pj_time_val *timeout,
object extra_headers, object content_type, object body) except -1
cdef int _cb_state(self, PJSIPUA ua, object state, int code, object reason, dict headers) except -1
cdef int _cb_got_response(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef int _cb_notify(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef int _cb_timeout_timer(self, PJSIPUA ua)
cdef int _cb_refresh_timer(self, PJSIPUA ua)
cdef class IncomingSubscription(object):
# attributes
cdef pjsip_evsub *_obj
cdef pjsip_dialog *_dlg
cdef PJSTR _content_type
cdef PJSTR _content_subtype
cdef PJSTR _content
cdef pjsip_tx_data *_initial_response
cdef pjsip_transaction *_initial_tsx
cdef int _expires
cdef readonly str state
cdef readonly str event
cdef readonly str call_id
cdef readonly EndpointAddress peer_address
# methods
cdef int _set_state(self, str state) except -1
cdef PJSIPUA _get_ua(self, int raise_exception)
cdef int init(self, PJSIPUA ua, pjsip_rx_data *rdata, str event) except -1
cdef int _send_initial_response(self, int code) except -1
cdef int _send_notify(self, str reason=*) except -1
cdef int _terminate(self, PJSIPUA ua, str reason, int do_cleanup) except -1
cdef int _cb_rx_refresh(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef int _cb_server_timeout(self, PJSIPUA ua) except -1
cdef int _cb_tsx(self, PJSIPUA ua, pjsip_event *event) except -1
cdef void _Subscription_cb_state(pjsip_evsub *sub, pjsip_event *event) with gil
cdef void _Subscription_cb_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil
cdef void _Subscription_cb_refresh(pjsip_evsub *sub) with gil
cdef void _IncomingSubscription_cb_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
int *p_st_code, pj_str_t **p_st_text,
pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil
cdef void _IncomingSubscription_cb_server_timeout(pjsip_evsub *sub) with gil
cdef void _IncomingSubscription_cb_tsx(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event) with gil
# core.sdp
cdef class BaseSDPConnection(object):
# attributes
cdef pjmedia_sdp_conn _sdp_connection
# private methods
cdef pjmedia_sdp_conn* get_sdp_connection(self)
cdef class SDPConnection(BaseSDPConnection):
# attributes
cdef str _address
cdef str _net_type
cdef str _address_type
cdef class FrozenSDPConnection(BaseSDPConnection):
# attributes
cdef int initialized
cdef readonly str address
cdef readonly str net_type
cdef readonly str address_type
cdef class SDPAttributeList(list):
pass
cdef class FrozenSDPAttributeList(frozenlist):
pass
cdef class BaseSDPSession(object):
# attributes
cdef pjmedia_sdp_session _sdp_session
# private methods
cdef pjmedia_sdp_session* get_sdp_session(self)
cdef class SDPSession(BaseSDPSession):
# attributes
cdef str _address
cdef str _user
cdef str _net_type
cdef str _address_type
cdef str _name
cdef str _info
cdef SDPConnection _connection
cdef list _attributes
cdef list _media
# private methods
cdef int _update(self) except -1
cdef class FrozenSDPSession(BaseSDPSession):
# attributes
cdef int initialized
cdef readonly str address
cdef readonly unsigned int id
cdef readonly unsigned int version
cdef readonly str user
cdef readonly str net_type
cdef readonly str address_type
cdef readonly str name
cdef readonly str info
cdef readonly FrozenSDPConnection connection
cdef readonly int start_time
cdef readonly int stop_time
cdef readonly FrozenSDPAttributeList attributes
cdef readonly frozenlist media
cdef class BaseSDPMediaStream(object):
# attributes
cdef pjmedia_sdp_media _sdp_media
# private methods
cdef pjmedia_sdp_media* get_sdp_media(self)
cdef class SDPMediaStream(BaseSDPMediaStream):
# attributes
cdef str _media
cdef str _transport
cdef list _formats
cdef list _codec_list
cdef str _info
cdef SDPConnection _connection
cdef SDPAttributeList _attributes
# private methods
cdef int _update(self, SDPMediaStream media) except -1
cdef class FrozenSDPMediaStream(BaseSDPMediaStream):
# attributes
cdef int initialized
cdef readonly str media
cdef readonly int port
cdef readonly str transport
cdef readonly int port_count
cdef readonly frozenlist formats
cdef readonly frozenlist codec_list
cdef readonly str info
cdef readonly FrozenSDPConnection connection
cdef readonly FrozenSDPAttributeList attributes
cdef class BaseSDPAttribute(object):
# attributes
cdef pjmedia_sdp_attr _sdp_attribute
# private methods
cdef pjmedia_sdp_attr* get_sdp_attribute(self)
cdef class SDPAttribute(BaseSDPAttribute):
# attributes
cdef str _name
cdef str _value
cdef class FrozenSDPAttribute(BaseSDPAttribute):
# attributes
cdef int initialized
cdef readonly str name
cdef readonly str value
cdef SDPSession SDPSession_create(pjmedia_sdp_session_ptr_const pj_session)
cdef FrozenSDPSession FrozenSDPSession_create(pjmedia_sdp_session_ptr_const pj_session)
cdef SDPMediaStream SDPMediaStream_create(pjmedia_sdp_media *pj_media)
cdef FrozenSDPMediaStream FrozenSDPMediaStream_create(pjmedia_sdp_media *pj_media)
cdef SDPConnection SDPConnection_create(pjmedia_sdp_conn *pj_conn)
cdef FrozenSDPConnection FrozenSDPConnection_create(pjmedia_sdp_conn *pj_conn)
cdef SDPAttribute SDPAttribute_create(pjmedia_sdp_attr *pj_attr)
cdef FrozenSDPAttribute FrozenSDPAttribute_create(pjmedia_sdp_attr *pj_attr)
cdef class SDPNegotiator(object):
# attributes
cdef pjmedia_sdp_neg* _neg
cdef pj_pool_t *_pool
# core.invitation
cdef class SDPPayloads:
cdef readonly FrozenSDPSession proposed_local
cdef readonly FrozenSDPSession proposed_remote
cdef readonly FrozenSDPSession active_local
cdef readonly FrozenSDPSession active_remote
cdef class StateCallbackTimer(Timer):
cdef object state
cdef object sub_state
cdef object rdata
cdef object tdata
cdef object originator
cdef class SDPCallbackTimer(Timer):
cdef int status
cdef class TransferStateCallbackTimer(Timer):
cdef object state
cdef object code
cdef object reason
cdef class TransferResponseCallbackTimer(Timer):
cdef object method
cdef object rdata
cdef class TransferRequestCallbackTimer(Timer):
cdef object rdata
cdef class Invitation(object):
# attributes
cdef object __weakref__
cdef object weakref
cdef int _sdp_neg_status
cdef int _failed_response
cdef pj_list _route_set
cdef pj_mutex_t *_lock
cdef pjsip_inv_session *_invite_session
cdef pjsip_evsub *_transfer_usage
cdef pjsip_role_e _transfer_usage_role
cdef pjsip_dialog *_dialog
cdef pjsip_route_hdr _route_header
cdef pjsip_transaction *_reinvite_transaction
cdef PJSTR _sipfrag_payload
cdef Timer _timer
cdef Timer _transfer_timeout_timer
cdef Timer _transfer_refresh_timer
cdef readonly str call_id
cdef readonly str direction
cdef readonly str remote_user_agent
cdef readonly str state
cdef readonly str sub_state
cdef readonly str transport
cdef readonly str transfer_state
cdef readonly EndpointAddress peer_address
cdef readonly FrozenCredentials credentials
cdef readonly FrozenContactHeader local_contact_header
cdef readonly FrozenContactHeader remote_contact_header
cdef readonly FrozenFromHeader from_header
cdef readonly FrozenToHeader to_header
cdef readonly FrozenSIPURI request_uri
cdef readonly FrozenRouteHeader route_header
cdef readonly SDPPayloads sdp
# private methods
cdef int init_incoming(self, PJSIPUA ua, pjsip_rx_data *rdata, unsigned int inv_options) except -1
cdef int process_incoming_transfer(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef int process_incoming_options(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1
cdef PJSIPUA _check_ua(self)
cdef int _do_dealloc(self) except -1
cdef int _update_contact_header(self, BaseContactHeader contact_header) except -1
cdef int _fail(self, PJSIPUA ua) except -1
cdef int _cb_state(self, StateCallbackTimer timer) except -1
cdef int _cb_sdp_done(self, SDPCallbackTimer timer) except -1
cdef int _cb_timer_disconnect(self, timer) except -1
cdef int _cb_postpoll_fail(self, timer) except -1
cdef int _start_incoming_transfer(self, timer) except -1
cdef int _terminate_transfer(self) except -1
cdef int _terminate_transfer_uac(self) except -1
cdef int _terminate_transfer_uas(self) except -1
cdef int _set_transfer_state(self, str state) except -1
cdef int _set_sipfrag_payload(self, int code, str reason) except -1
cdef int _send_notify(self) except -1
cdef int _transfer_cb_timeout_timer(self, timer) except -1
cdef int _transfer_cb_refresh_timer(self, timer) except -1
cdef int _transfer_cb_state(self, TransferStateCallbackTimer timer) except -1
cdef int _transfer_cb_response(self, TransferResponseCallbackTimer timer) except -1
cdef int _transfer_cb_notify(self, TransferRequestCallbackTimer timer) except -1
cdef int _transfer_cb_server_timeout(self, timer) except -1
cdef void _Invitation_cb_state(pjsip_inv_session *inv, pjsip_event *e) with gil
cdef void _Invitation_cb_sdp_done(pjsip_inv_session *inv, int status) with gil
cdef int _Invitation_cb_rx_reinvite(pjsip_inv_session *inv, pjmedia_sdp_session_ptr_const offer, pjsip_rx_data *rdata) with gil
cdef void _Invitation_cb_tsx_state_changed(pjsip_inv_session *inv, pjsip_transaction *tsx, pjsip_event *e) with gil
cdef void _Invitation_cb_new(pjsip_inv_session *inv, pjsip_event *e) with gil
cdef void _Invitation_transfer_cb_state(pjsip_evsub *sub, pjsip_event *event) with gil
cdef void _Invitation_transfer_cb_tsx(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event) with gil
cdef void _Invitation_transfer_cb_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil
cdef void _Invitation_transfer_cb_refresh(pjsip_evsub *sub) with gil
cdef void _Invitation_transfer_in_cb_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil
cdef void _Invitation_transfer_in_cb_server_timeout(pjsip_evsub *sub) with gil
# core.mediatransport
cdef class ICECandidate(object):
# attributes
cdef readonly str component
cdef readonly str type
cdef readonly str address
cdef readonly int port
cdef readonly int priority
cdef readonly str rel_address
cdef class ICECheck(object):
cdef readonly ICECandidate local_candidate
cdef readonly ICECandidate remote_candidate
cdef readonly str state
cdef readonly int nominated
cdef class RTPTransport(object):
# attributes
cdef object __weakref__
cdef object weakref
cdef int _af
cdef int _ice_active
cdef pj_mutex_t *_lock
cdef pj_pool_t *_pool
cdef pjmedia_transport *_obj
cdef pjmedia_transport *_wrapped_transport
cdef object _local_rtp_addr
cdef ICECheck _rtp_valid_pair
cdef readonly object ice_stun_address
cdef readonly object ice_stun_port
cdef readonly object srtp_forced
cdef readonly object state
cdef readonly object use_ice
cdef readonly object use_srtp
# private methods
cdef PJSIPUA _check_ua(self)
cdef void _get_info(self, pjmedia_transport_info *info)
cdef int _update_local_sdp(self, SDPSession local_sdp, int sdp_index, pjmedia_sdp_session *remote_sdp) except -1
cdef class MediaCheckTimer(Timer):
# attributes
cdef int media_check_interval
cdef class AudioTransport(object):
# attributes
cdef object __weakref__
cdef object weakref
cdef int _is_offer
cdef int _is_started
cdef int _slot
cdef int _volume
cdef unsigned int _packets_received
cdef unsigned int _vad
cdef pj_mutex_t *_lock
cdef pj_pool_t *_pool
cdef pjmedia_sdp_media *_local_media
cdef pjmedia_stream *_obj
cdef pjmedia_stream_info _stream_info
cdef dict _cached_statistics
cdef Timer _timer
cdef readonly object direction
cdef readonly AudioMixer mixer
cdef readonly RTPTransport transport
# private methods
cdef PJSIPUA _check_ua(self)
cdef int _cb_check_rtp(self, MediaCheckTimer timer) except -1 with gil
cdef void _RTPTransport_cb_ice_complete(pjmedia_transport *tp, pj_ice_strans_op op, int status) with gil
cdef void _RTPTransport_cb_ice_state(pjmedia_transport *tp, pj_ice_strans_state prev, pj_ice_strans_state curr) with gil
cdef void _RTPTransport_cb_ice_stop(pjmedia_transport *tp, char *reason, int err) with gil
cdef void _AudioTransport_cb_dtmf(pjmedia_stream *stream, void *user_data, int digit) with gil
cdef ICECandidate ICECandidate_create(pj_ice_sess_cand *cand)
cdef ICECheck ICECheck_create(pj_ice_sess_check *check)
cdef str _ice_state_to_str(int state)
cdef dict _extract_ice_session_data(pj_ice_sess *ice_sess)
cdef dict _pj_math_stat_to_dict(pj_math_stat *stat)
cdef dict _pjmedia_rtcp_stream_stat_to_dict(pjmedia_rtcp_stream_stat *stream_stat)
diff --git a/sipsimple/session.py b/sipsimple/session.py
index b4a2a501..03556288 100644
--- a/sipsimple/session.py
+++ b/sipsimple/session.py
@@ -1,2481 +1,2435 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Implements an asynchronous notification based mechanism for
establishment, modification and termination of sessions using Session
Initiation Protocol (SIP) standardized in RFC3261.
"""
from __future__ import absolute_import
__all__ = ['Session', 'SessionManager']
import random
from datetime import datetime
from threading import RLock
from time import time
from application.notification import IObserver, Notification, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.decorator import decorator, preserve_signature
from application.python.types import Singleton
from application.system import host
from eventlib import api, coros, proc
from twisted.internet import reactor
from zope.interface import implements
from sipsimple.core import DialogID, Engine, Invitation, Referral, Subscription, PJSIPError, SIPCoreError, SIPCoreInvalidStateError, SIPURI, sip_status_messages, sipfrag_re
from sipsimple.core import ContactHeader, FromHeader, Header, ReasonHeader, ReferToHeader, ReplacesHeader, RouteHeader, SubjectHeader, ToHeader, WarningHeader
from sipsimple.core import SDPConnection, SDPMediaStream, SDPSession
from sipsimple.account import AccountManager, BonjourAccount
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import PublicGRUU, PublicGRUUIfAvailable, NoGRUU
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import ParserError
from sipsimple.payloads.conference import ConferenceDocument
from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
class InvitationDisconnectedError(Exception):
def __init__(self, invitation, data):
self.invitation = invitation
self.data = data
class MediaStreamDidFailError(Exception):
def __init__(self, stream, data):
self.stream = stream
self.data = data
class SubscriptionError(Exception):
def __init__(self, error, timeout, **attributes):
self.error = error
self.timeout = timeout
self.attributes = attributes
class SIPSubscriptionDidFail(Exception):
def __init__(self, data):
self.data = data
class InterruptSubscription(Exception):
pass
class TerminateSubscription(Exception):
pass
class ReferralError(Exception):
def __init__(self, error, code=0):
self.error = error
self.code = code
class TerminateReferral(Exception):
pass
class SIPReferralDidFail(Exception):
def __init__(self, data):
self.data = data
class IllegalStateError(RuntimeError):
pass
class IllegalDirectionError(Exception):
pass
class SIPInvitationTransferDidFail(Exception):
def __init__(self, data):
self.data = data
@decorator
def transition_state(required_state, new_state):
def state_transitioner(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
with obj._lock:
if obj.state != required_state:
raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state))
obj.state = new_state
return func(obj, *args, **kwargs)
return wrapper
return state_transitioner
@decorator
def check_state(required_states):
def state_checker(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
if obj.state not in required_states:
raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state))
return func(obj, *args, **kwargs)
return wrapper
return state_checker
@decorator
def check_transfer_state(direction, state):
def state_checker(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
if obj.transfer_handler.direction != direction:
raise IllegalDirectionError('cannot transfer in %s direction' % obj.transfer_handler.direction)
if obj.transfer_handler.state != state:
raise IllegalStateError('cannot transfer in %s state' % obj.transfer_handler.state)
return func(obj, *args, **kwargs)
return wrapper
return state_checker
class AddParticipantOperation(object):
pass
class RemoveParticipantOperation(object):
pass
class ReferralHandler(object):
implements(IObserver)
def __init__(self, session, participant_uri, operation):
self.participant_uri = participant_uri
if not isinstance(self.participant_uri, SIPURI):
if not self.participant_uri.startswith(('sip:', 'sips:')):
self.participant_uri = 'sip:%s' % self.participant_uri
try:
self.participant_uri = SIPURI.parse(self.participant_uri)
except SIPCoreError:
notification_center = NotificationCenter()
if operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI'))
else:
notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI'))
return
self.session = session
self.operation = operation
self.active = False
self.route = None
self._channel = coros.queue()
self._referral = None
- self._wakeup_timer = None
def start(self):
notification_center = NotificationCenter()
if not self.session.remote_focus:
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus'))
else:
notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus'))
self.session = None
return
notification_center.add_observer(self, sender=self.session)
- notification_center.add_observer(self, name='DNSNameserversDidChange')
- notification_center.add_observer(self, name='SystemIPAddressDidChange')
- notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.add_observer(self, name='NetworkConditionsDidChange')
proc.spawn(self._run)
def _run(self):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
try:
# Lookup routes
account = self.session.account
if account is BonjourAccount():
uri = SIPURI.new(self.session._invitation.remote_contact_header.uri)
elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = SIPURI.new(self.session.remote_identity.uri)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError, e:
timeout = random.uniform(15, 30)
raise ReferralError(error='DNS lookup failed: %s' % e)
target_uri = SIPURI.new(self.session.remote_identity.uri)
timeout = time() + 30
for route in routes:
self.route = route
remaining_time = timeout - time()
if remaining_time > 0:
try:
contact_uri = account.contact[NoGRUU, route]
except KeyError:
continue
refer_to_header = ReferToHeader(str(self.participant_uri))
refer_to_header.parameters['method'] = 'INVITE' if self.operation is AddParticipantOperation else 'BYE'
referral = Referral(target_uri, FromHeader(account.uri, account.display_name),
ToHeader(target_uri),
refer_to_header,
ContactHeader(contact_uri),
RouteHeader(route.uri),
account.credentials)
notification_center.add_observer(self, sender=referral)
try:
referral.send_refer(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=referral)
timeout = 5
raise ReferralError(error='Internal error')
self._referral = referral
try:
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidStart':
break
except SIPReferralDidFail, e:
notification_center.remove_observer(self, sender=referral)
self._referral = None
if e.data.code in (403, 405):
raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code)
else:
# Otherwise just try the next route
continue
else:
break
else:
self.route = None
raise ReferralError(error='No more routes to try')
# At this point it is subscribed. Handle notifications and ending/failures.
try:
self.active = True
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralGotNotify':
if notification.data.event == 'refer' and notification.data.body:
match = sipfrag_re.match(notification.data.body)
if match:
code = int(match.group('code'))
reason = match.group('reason')
if code/100 > 2:
continue
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceGotAddParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason))
else:
notification_center.post_notification('SIPConferenceGotRemoveParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason))
elif notification.name == 'SIPReferralDidEnd':
break
except SIPReferralDidFail, e:
notification_center.remove_observer(self, sender=self._referral)
raise ReferralError(error=e.data.reason, code=e.data.code)
else:
notification_center.remove_observer(self, sender=self._referral)
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri))
else:
notification_center.post_notification('SIPConferenceDidRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri))
finally:
self.active = False
except TerminateReferral:
if self._referral is not None:
try:
self._referral.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidEnd':
break
except SIPReferralDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._referral)
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error'))
else:
notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error'))
except ReferralError, e:
if self.operation is AddParticipantOperation:
notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error))
else:
notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error))
finally:
- if self._wakeup_timer is not None and self._wakeup_timer.active():
- self._wakeup_timer.cancel()
- self._wakeup_timer = None
notification_center.remove_observer(self, sender=self.session)
- notification_center.remove_observer(self, name='DNSNameserversDidChange')
- notification_center.remove_observer(self, name='SystemIPAddressDidChange')
- notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self.session = None
self._referral = None
def _refresh(self):
try:
contact_header = ContactHeader(self.session.account.contact[NoGRUU, self.route])
except KeyError:
pass
else:
try:
self._referral.refresh(contact_header=contact_header, timeout=2)
except (SIPCoreError, SIPCoreInvalidStateError):
pass
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPReferralDidStart(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidEnd(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidFail(self, notification):
self._channel.send_exception(SIPReferralDidFail(notification.data))
def _NH_SIPReferralGotNotify(self, notification):
self._channel.send(notification)
def _NH_SIPSessionDidFail(self, notification):
- if self._wakeup_timer is not None and self._wakeup_timer.active():
- self._wakeup_timer.cancel()
- self._wakeup_timer = None
self._channel.send_exception(TerminateReferral())
def _NH_SIPSessionWillEnd(self, notification):
- if self._wakeup_timer is not None and self._wakeup_timer.active():
- self._wakeup_timer.cancel()
- self._wakeup_timer = None
self._channel.send_exception(TerminateReferral())
- def _NH_DNSNameserversDidChange(self, notification):
+ def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._refresh()
- def _NH_SystemIPAddressDidChange(self, notification):
- if self.active:
- self._refresh()
-
- def _NH_SystemDidWakeUpFromSleep(self, notification):
- if self._wakeup_timer is None:
- def wakeup_action():
- if self.active:
- self._refresh()
- self._wakeup_timer = None
- self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
-
class ConferenceHandler(object):
implements(IObserver)
def __init__(self, session):
self.session = session
self.active = False
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._subscription = None
self._subscription_proc = None
self._subscription_timer = None
- self._wakeup_timer = None
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
- notification_center.add_observer(self, name='DNSNameserversDidChange')
- notification_center.add_observer(self, name='SystemIPAddressDidChange')
- notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
@run_in_green_thread
def add_participant(self, participant_uri):
referral_handler = ReferralHandler(self.session, participant_uri, AddParticipantOperation)
referral_handler.start()
@run_in_green_thread
def remove_participant(self, participant_uri):
referral_handler = ReferralHandler(self.session, participant_uri, RemoveParticipantOperation)
referral_handler.start()
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _activate(self):
self.active = True
command = Command('subscribe')
self._command_channel.send(command)
return command
def _deactivate(self):
self.active = False
command = Command('unsubscribe')
self._command_channel.send(command)
return command
def _resubscribe(self):
command = Command('subscribe')
self._command_channel.send(command)
return command
def _terminate(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.session)
- notification_center.remove_observer(self, name='DNSNameserversDidChange')
- notification_center.remove_observer(self, name='SystemIPAddressDidChange')
- notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep')
+ notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self._deactivate()
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self.session = None
def _CH_subscribe(self, command):
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._subscription_proc = proc.spawn(self._subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
- if self._wakeup_timer is not None and self._wakeup_timer.active():
- self._wakeup_timer.cancel()
- self._wakeup_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._subscription_proc = None
command.signal()
def _CH_terminate(self, command):
command.signal()
raise proc.ProcExit()
def _subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
try:
# Lookup routes
account = self.session.account
if account is BonjourAccount():
uri = SIPURI.new(self.session._invitation.remote_contact_header.uri)
elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = SIPURI.new(self.session.remote_identity.uri)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError, e:
timeout = random.uniform(15, 30)
raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout)
target_uri = SIPURI.new(self.session.remote_identity.uri)
default_interval = 600 if account is BonjourAccount() else account.sip.subscribe_interval
refresh_interval = getattr(command, 'refresh_interval', default_interval)
timeout = time() + 30
for route in routes:
remaining_time = timeout - time()
if remaining_time > 0:
try:
contact_uri = account.contact[NoGRUU, route]
except KeyError:
continue
subscription = Subscription(target_uri, FromHeader(account.uri, account.display_name),
ToHeader(target_uri),
ContactHeader(contact_uri),
'conference',
RouteHeader(route.uri),
credentials=account.credentials,
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=subscription)
timeout = 5
raise SubscriptionError(error='Internal error', timeout=timeout)
self._subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart':
break
except SIPSubscriptionDidFail, e:
notification_center.remove_observer(self, sender=subscription)
self._subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
timeout = random.uniform(60, 120)
raise SubscriptionError(error='Authentication failed', timeout=timeout)
elif e.data.code == 423:
# Get the value of the Min-Expires header
timeout = random.uniform(60, 120)
if e.data.min_expires is not None and e.data.min_expires > refresh_interval:
raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires)
else:
raise SubscriptionError(error='Interval too short', timeout=timeout)
elif e.data.code in (405, 406, 489, 1400):
command.signal(e)
return
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, reschedule the subscription
timeout = random.uniform(60, 180)
raise SubscriptionError(error='No more routes to try', timeout=timeout)
# At this point it is subscribed. Handle notifications and ending/failures.
try:
while True:
notification = self._data_channel.wait()
if notification.sender is not self._subscription:
continue
if notification.name == 'SIPSubscriptionGotNotify':
if notification.data.event == 'conference' and notification.data.body:
try:
conference_info = ConferenceDocument.parse(notification.data.body)
except ParserError:
pass
else:
notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info))
elif notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._subscription)
except InterruptSubscription, e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
notification_center.remove_observer(self, sender=self._subscription)
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
except TerminateSubscription, e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._subscription)
except SubscriptionError, e:
if 'min_expires' in e.attributes:
command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires'])
else:
command = Command('subscribe', command.event)
self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command)
finally:
self.subscribed = False
self._subscription = None
self._subscription_proc = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
self._data_channel.send(notification)
def _NH_SIPSessionDidStart(self, notification):
if self.session.remote_focus:
self._activate()
@run_in_green_thread
def _NH_SIPSessionDidFail(self, notification):
self._terminate()
@run_in_green_thread
def _NH_SIPSessionDidEnd(self, notification):
self._terminate()
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
if self.session.remote_focus and not self.active:
self._activate()
elif not self.session.remote_focus and self.active:
self._deactivate()
- def _NH_DNSNameserversDidChange(self, notification):
+ def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._resubscribe()
- def _NH_SystemIPAddressDidChange(self, notification):
- if self.active:
- self._resubscribe()
-
- def _NH_SystemDidWakeUpFromSleep(self, notification):
- if self._wakeup_timer is None:
- def wakeup_action():
- if self.active:
- self._resubscribe()
- self._wakeup_timer = None
- self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
-
class TransferInfo(object):
def __init__(self, referred_by=None, replaced_dialog_id=None):
self.referred_by = referred_by
self.replaced_dialog_id = replaced_dialog_id
class TransferHandler(object):
implements(IObserver)
def __init__(self, session):
self.direction = None
self.new_session = None
self.session = session
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.session._invitation)
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._proc = proc.spawn(self._run)
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
self.direction = None
self.state = None
def _CH_incoming_transfer(self, command):
self.direction = 'incoming'
notification_center = NotificationCenter()
refer_to_hdr = command.data.headers.get('Refer-To')
target = SIPURI.parse(refer_to_hdr.uri)
referred_by_hdr = command.data.headers.get('Referred-By', None)
if referred_by_hdr is not None:
origin = referred_by_hdr.body
else:
origin = None
try:
while True:
try:
notification = self._data_channel.wait()
except SIPInvitationTransferDidFail:
self.state = 'failed'
return
else:
if notification.name == 'SIPInvitationTransferDidStart':
self.state = 'starting'
refer_to_uri = SIPURI.new(target)
refer_to_uri.headers = {}
refer_to_uri.parameters = {}
notification_center.post_notification('SIPSessionTransferNewIncoming', self.session, NotificationData(transfer_destination=refer_to_uri, transfer_source=origin))
elif notification.name == 'SIPSessionTransferDidStart':
break
elif notification.name == 'SIPSessionTransferDidFail':
self.state = 'failed'
try:
self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason)
except SIPCoreError:
return
while True:
try:
notification = self._data_channel.wait()
except SIPInvitationTransferDidFail:
return
self.state = 'started'
transfer_info = TransferInfo(referred_by=origin)
try:
replaces_hdr = target.headers.pop('Replaces')
call_id, rest = replaces_hdr.split(';', 1)
params = dict((item.split('=') for item in rest.split(';')))
to_tag = params.get('to-tag')
from_tag = params.get('from-tag')
except (KeyError, ValueError):
pass
else:
transfer_info.replaced_dialog_id = DialogID(call_id, local_tag=from_tag, remote_tag=to_tag)
settings = SIPSimpleSettings()
account = self.session.account
if account is BonjourAccount():
uri = target
elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = target
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError, e:
self.state = 'failed'
notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=e.data.code, reason=e.data.reason))
try:
self.session._invitation.notify_transfer_progress(480)
except SIPCoreError:
pass
while True:
try:
notification = self._data_channel.wait()
except SIPInvitationTransferDidFail:
return
self.new_session = Session(account)
stream_registry = MediaStreamRegistry()
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.new_session)
self.new_session.connect(ToHeader(target), routes=routes, streams=[stream_registry.AudioStream()], transfer_info=transfer_info)
while True:
try:
notification = self._data_channel.wait()
except SIPInvitationTransferDidFail:
return
if notification.name == 'SIPInvitationTransferDidEnd':
return
except proc.ProcExit:
if self.new_session is not None:
notification_center.remove_observer(self, sender=self.new_session)
self.new_session = None
raise
def _CH_outgoing_transfer(self, command):
self.direction = 'outgoing'
notification_center = NotificationCenter()
self.state = 'starting'
while True:
try:
notification = self._data_channel.wait()
except SIPInvitationTransferDidFail, e:
self.state = 'failed'
notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=e.data.code, reason=e.data.reason))
return
if notification.name == 'SIPInvitationTransferDidStart':
self.state = 'started'
notification_center.post_notification('SIPSessionTransferDidStart', sender=self.session)
elif notification.name == 'SIPInvitationTransferDidEnd':
self.state = 'ended'
self.session.end()
notification_center.post_notification('SIPSessionTransferDidEnd', sender=self.session)
return
def _terminate(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.session._invitation)
notification_center.remove_observer(self, sender=self.session)
self._proc.kill()
self._proc = None
self._command_channel = None
self._data_channel = None
self.session = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPInvitationTransferNewIncoming(self, notification):
self._command_channel.send(Command('incoming_transfer', data=notification.data))
def _NH_SIPInvitationTransferNewOutgoing(self, notification):
self._command_channel.send(Command('outgoing_transfer', data=notification.data))
def _NH_SIPInvitationTransferDidStart(self, notification):
self._data_channel.send(notification)
def _NH_SIPInvitationTransferDidFail(self, notification):
self._data_channel.send_exception(SIPInvitationTransferDidFail(notification.data))
def _NH_SIPInvitationTransferDidEnd(self, notification):
self._data_channel.send(notification)
def _NH_SIPInvitationTransferGotNotify(self, notification):
if notification.data.event == 'refer' and notification.data.body:
match = sipfrag_re.match(notification.data.body)
if match:
code = int(match.group('code'))
reason = match.group('reason')
notification.center.post_notification('SIPSessionTransferGotProgress', sender=self.session, data=NotificationData(code=code, reason=reason))
def _NH_SIPSessionTransferDidStart(self, notification):
if notification.sender is self.session and self.state == 'starting':
self._data_channel.send(notification)
def _NH_SIPSessionTransferDidFail(self, notification):
if notification.sender is self.session and self.state == 'starting':
self._data_channel.send(notification)
def _NH_SIPSessionGotRingIndication(self, notification):
if notification.sender is self.new_session and self.session is not None:
try:
self.session._invitation.notify_transfer_progress(180)
except SIPCoreError:
pass
def _NH_SIPSessionGotProvisionalResponse(self, notification):
if notification.sender is self.new_session and self.session is not None:
try:
self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason)
except SIPCoreError:
pass
def _NH_SIPSessionDidStart(self, notification):
if notification.sender is self.new_session:
notification.center.remove_observer(self, sender=notification.sender)
self.new_session = None
if self.session is not None:
notification.center.post_notification('SIPSessionTransferDidEnd', sender=self.session)
if self.state == 'started':
try:
self.session._invitation.notify_transfer_progress(200)
except SIPCoreError:
pass
self.state = 'ended'
self.session.end()
def _NH_SIPSessionDidEnd(self, notification):
if notification.sender is self.new_session:
# If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead
notification.center.remove_observer(self, sender=notification.sender)
self.new_session = None
if self.session is not None:
notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=500, reason='internal error'))
if self.state == 'started':
try:
self.session._invitation.notify_transfer_progress(500)
except SIPCoreError:
pass
self.state = 'failed'
else:
self._terminate()
def _NH_SIPSessionDidFail(self, notification):
if notification.sender is self.new_session:
notification.center.remove_observer(self, sender=notification.sender)
self.new_session = None
if self.session is not None:
notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=notification.data.code or 500, reason=notification.data.reason))
if self.state == 'started':
try:
self.session._invitation.notify_transfer_progress(notification.data.code or 500, notification.data.reason)
except SIPCoreError:
pass
self.state = 'failed'
else:
self._terminate()
class SessionReplaceHandler(object):
implements(IObserver)
def __init__(self, session):
self.session = session
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.session.replaced_session)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
notification.center.remove_observer(self, sender=self.session)
notification.center.remove_observer(self, sender=self.session.replaced_session)
self.session.replaced_session.end()
self.session.replaced_session = None
self.session = None
def _NH_SIPSessionDidFail(self, notification):
if notification.sender is self.session:
notification.center.remove_observer(self, sender=self.session)
notification.center.remove_observer(self, sender=self.session.replaced_session)
self.session.replaced_session = None
self.session = None
_NH_SIPSessionDidEnd = _NH_SIPSessionDidFail
class Session(object):
implements(IObserver)
media_stream_timeout = 15
def __init__(self, account):
self.account = account
self.direction = None
self.end_time = None
self.on_hold = False
self.proposed_streams = None
self.route = None
self.state = None
self.start_time = None
self.streams = None
self.transport = None
self.local_focus = False
self.remote_focus = False
self.greenlet = None
self.conference = None
self.replaced_session = None
self.transfer_handler = None
self.transfer_info = None
self._channel = coros.queue()
self._hold_in_progress = False
self._invitation = None
self._local_identity = None
self._remote_identity = None
self._lock = RLock()
self.__dict__['subject'] = None
def init_incoming(self, invitation, data):
notification_center = NotificationCenter()
remote_sdp = invitation.sdp.proposed_remote
self.proposed_streams = []
if remote_sdp:
for index, media_stream in enumerate(remote_sdp.media):
if media_stream.port != 0:
for stream_type in MediaStreamRegistry():
try:
stream = stream_type.new_from_sdp(self, remote_sdp, index)
except InvalidStreamError:
break
except UnknownStreamError:
continue
else:
stream.index = index
self.proposed_streams.append(stream)
break
if self.proposed_streams:
self.direction = 'incoming'
self.state = 'incoming'
self.transport = invitation.transport
self._invitation = invitation
self.conference = ConferenceHandler(self)
self.transfer_handler = TransferHandler(self)
if 'isfocus' in invitation.remote_contact_header.parameters:
self.remote_focus = True
try:
self.__dict__['subject'] = data.headers['Subject'].subject
except KeyError:
pass
if 'Referred-By' in data.headers or 'Replaces' in data.headers:
self.transfer_info = TransferInfo()
if 'Referred-By' in data.headers:
self.transfer_info.referred_by = data.headers['Referred-By'].body
if 'Replaces' in data.headers:
replaces_header = data.headers.get('Replaces')
replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=replaces_header.from_tag)
session_manager = SessionManager()
try:
self.replaced_session = (session for session in session_manager.sessions if session._invitation is not None and session._invitation.dialog_id == replaced_dialog_id).next()
except StopIteration:
invitation.send_response(481)
return
else:
self.transfer_info.replaced_dialog_id = replaced_dialog_id
replace_handler = SessionReplaceHandler(self)
replace_handler.start()
notification_center.add_observer(self, sender=invitation)
notification_center.post_notification('SIPSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams[:]))
else:
invitation.send_response(488)
@transition_state(None, 'connecting')
@run_in_green_thread
def connect(self, to_header, routes, streams, is_focus=False, subject=None, transfer_info=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
connected = False
received_code = 0
received_reason = None
unhandled_notifications = []
self.direction = 'outgoing'
self.proposed_streams = streams
self.route = routes[0]
self.transport = self.route.transport
self.local_focus = is_focus
self._invitation = Invitation()
self._local_identity = FromHeader(self.account.uri, self.account.display_name)
self._remote_identity = to_header
self.conference = ConferenceHandler(self)
self.transfer_handler = TransferHandler(self)
self.__dict__['subject'] = subject
self.transfer_info = transfer_info
notification_center.add_observer(self, sender=self._invitation)
notification_center.post_notification('SIPSessionNewOutgoing', sender=self, data=NotificationData(streams=streams[:]))
for stream in self.proposed_streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
try:
contact_uri = self.account.contact[PublicGRUUIfAvailable, self.route]
local_ip = host.outgoing_ip_for(self.route.address)
if local_ip is None:
raise ValueError("could not get outgoing IP address")
except (KeyError, ValueError), e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e))
return
local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent)
for index, stream in enumerate(self.proposed_streams):
stream.index = index
media = stream.get_local_media(for_offer=True)
media.connection = None
local_sdp.media.append(media)
from_header = FromHeader(self.account.uri, self.account.display_name)
route_header = RouteHeader(self.route.uri)
contact_header = ContactHeader(contact_uri)
if is_focus:
contact_header.parameters['isfocus'] = None
extra_headers = []
if self.subject is not None:
extra_headers.append(SubjectHeader(self.subject))
if self.transfer_info is not None:
extra_headers.append(Header('Referred-By', self.transfer_info.referred_by))
if self.transfer_info.replaced_dialog_id is not None:
dialog_id = self.transfer_info.replaced_dialog_id
extra_headers.append(ReplacesHeader(dialog_id.call_id, dialog_id.local_tag, dialog_id.remote_tag))
self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, self.account.credentials, extra_headers)
try:
with api.timeout(settings.sip.invite_timeout):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
break
else:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self)
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connecting':
received_code = notification.data.code
received_reason = notification.data.reason
elif notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason))
else:
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except api.TimeoutError:
self.greenlet = None
self.end()
return
notification_center.post_notification('SIPSessionWillStart', sender=self)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map[index]
if remote_media.port:
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
invitation_notifications = []
with api.timeout(self.media_stream_timeout):
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
invitation_notifications.append(notification)
[self._channel.send(notification) for notification in invitation_notifications]
while not connected or self._channel:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self)
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connecting':
received_code = notification.data.code
received_reason = notification.data.reason
elif notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason))
else:
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except (MediaStreamDidFailError, api.TimeoutError), e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', code=0, reason=None, error=error)
except InvitationDisconnectedError, e:
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
# As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE
if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE':
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator))
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200]))
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason))
else:
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason))
code = e.data.code
reason = e.data.reason
elif e.data.disconnect_reason == 'timeout':
code = 408
reason = 'timeout'
elif e.data.originator == 'local' and e.data.code == 408:
code = e.data.code
reason = e.data.reason
else:
code = 0
reason = None
if e.data.originator == 'remote' and code // 100 == 3:
redirect_identities = e.data.headers.get('Contact', [])
else:
redirect_identities = None
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities))
self.greenlet = None
except SIPCoreError, e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@check_state(['incoming', 'received_proposal'])
@run_in_green_thread
def send_ring_indication(self):
try:
self._invitation.send_response(180)
except SIPCoreInvalidStateError:
pass # The INVITE session might have already been cancelled; ignore the error
@transition_state('incoming', 'accepting')
@run_in_green_thread
def accept(self, streams, is_focus=False):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
self.local_focus = is_focus
connected = False
unhandled_notifications = []
if self.proposed_streams:
for stream in self.proposed_streams:
if stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
else:
for index, stream in enumerate(streams):
notification_center.add_observer(self, sender=stream)
stream.index = index
stream.initialize(self, direction='outgoing')
self.proposed_streams = streams
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
sdp_connection = self._invitation.sdp.proposed_remote.connection or (media.connection for media in self._invitation.sdp.proposed_remote.media if media.connection is not None).next()
local_ip = host.outgoing_ip_for(sdp_connection.address) if sdp_connection.address != '0.0.0.0' else sdp_connection.address
if local_ip is None:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address')
return
local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent)
if self._invitation.sdp.proposed_remote:
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, media in enumerate(self._invitation.sdp.proposed_remote.media):
stream = stream_map.get(index, None)
if stream is not None:
media = stream.get_local_media(for_offer=False)
media.connection = None
local_sdp.media.append(media)
else:
media = SDPMediaStream.new(media)
media.port = 0
media.attributes = []
local_sdp.media.append(media)
else:
for stream in self.proposed_streams:
media = stream.get_local_media(for_offer=True)
local_sdp.media.append(media)
contact_header = ContactHeader.new(self._invitation.local_contact_header)
try:
local_contact_uri = self.account.contact[PublicGRUU, self._invitation.transport]
except KeyError:
pass
else:
contact_header.uri = local_contact_uri
if is_focus:
contact_header.parameters['isfocus'] = None
self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp)
notification_center.post_notification('SIPSessionWillStart', sender=self)
# Local and remote SDPs will be set after the 200 OK is sent
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
break
else:
if not connected:
# we could not have got a SIPInvitationGotSDPUpdate if we did not get an ACK
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True))
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True))
elif notification.data.prev_state == 'connected':
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
wait_count = 0
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map.get(index, None)
if stream is not None:
if remote_media.port:
wait_count += 1
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
with api.timeout(self.media_stream_timeout):
while wait_count > 0 or not connected or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True))
elif notification.data.prev_state == 'connected':
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
else:
unhandled_notifications.append(notification)
except (MediaStreamDidFailError, api.TimeoutError), e:
if self._invitation.state == 'connecting':
ack_received = False if isinstance(e, api.TimeoutError) and wait_count == 0 else 'unknown'
# pjsip's invite session object does not inform us whether the ACK was received or not
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=ack_received))
elif self._invitation.state == 'connected' and not connected:
# we didn't yet get to process the SIPInvitationChangedState (state -> connected) notification
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True))
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
reason_header = None
if isinstance(e, api.TimeoutError) and wait_count > 0:
error = 'media stream timed out while starting'
elif isinstance(e, api.TimeoutError) and wait_count == 0:
error = 'No ACK received'
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = 'Missing ACK'
else:
error = 'media stream failed: %s' % e.data.reason
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = 'media stream failed to start'
self.start_time = datetime.now()
if self._invitation.state in ('incoming', 'early'):
self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header)
else:
self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header)
except InvitationDisconnectedError, e:
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
if e.data.prev_state in ('incoming', 'early'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None))
elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=False))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None))
else:
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote'))
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=e.data.method, code=200, reason='OK'))
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason))
self.greenlet = None
except SIPCoreInvalidStateError:
# the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.greenlet = None
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
except SIPCoreError, e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('incoming', 'terminating')
@run_in_green_thread
def reject(self, code=603, reason=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.send_response(code, reason)
with api.timeout(1):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'disconnected':
ack_received = notification.data.disconnect_reason != 'missing ACK'
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=ack_received))
break
except SIPCoreInvalidStateError:
# the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
except SIPCoreError, e:
self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e))
except api.TimeoutError:
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=False))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None))
else:
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
self.proposed_streams = None
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None))
@transition_state('received_proposal', 'accepting_proposal')
@run_in_green_thread
def accept_proposal(self, streams):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
streams = [stream for stream in streams if stream in self.proposed_streams]
for stream in self.proposed_streams:
if stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
try:
wait_count = len(streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
stream_map = dict((stream.index, stream) for stream in streams)
for index, media in enumerate(self._invitation.sdp.proposed_remote.media):
stream = stream_map.get(index, None)
if stream is not None:
if index < len(local_sdp.media):
local_sdp.media[index] = stream.get_local_media(for_offer=False)
else:
local_sdp.media.append(stream.get_local_media(for_offer=False))
elif index >= len(local_sdp.media): # actually == is sufficient
media = SDPMediaStream.new(media)
media.port = 0
media.attributes = []
local_sdp.media.append(media)
self._invitation.send_response(200, sdp=local_sdp)
prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
else:
self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown'))
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
if on_hold_streams != prev_on_hold_streams:
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams),
partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams)))
for stream in streams:
stream.start(local_sdp, remote_sdp, stream.index)
with api.timeout(self.media_stream_timeout):
wait_count = len(streams)
while wait_count > 0 or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
else:
unhandled_notifications.append(notification)
except (MediaStreamDidFailError, api.TimeoutError), e:
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail_proposal(originator='remote', error=error)
except InvitationDisconnectedError, e:
self._fail_proposal(originator='remote', error='session ended')
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
except SIPCoreError, e:
self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=self.proposed_streams[:]))
self.streams = self.streams + streams
self.proposed_streams = None
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('received_proposal', 'rejecting_proposal')
@run_in_green_thread
def reject_proposal(self, code=488, reason=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.send_response(code, reason)
with api.timeout(1, None):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received='unknown'))
except SIPCoreError, e:
self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=self.proposed_streams[:]))
self.proposed_streams = None
if self._hold_in_progress:
self._send_hold()
@transition_state('connected', 'sending_proposal')
@run_in_green_thread
def add_stream(self, stream):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
received_code = None
received_reason = None
unhandled_notifications = []
self.proposed_streams = [stream]
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
while True:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
break
elif notification.name == 'SIPInvitationChangedState':
# This is actually the only reason for which this notification could be received
if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal':
self._fail_proposal(originator='local', error='received stream proposal')
self.handle_notification(notification)
return
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
stream.index = len(local_sdp.media)
local_sdp.media.append(stream.get_local_media(for_offer=True))
self._invitation.send_reinvite(sdp=local_sdp)
notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:]))
received_invitation_state = False
received_sdp_update = False
try:
with api.timeout(settings.sip.invite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for s in self.streams:
s.update(local_sdp, remote_sdp, s.index)
else:
self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
if 200 <= notification.data.code < 300:
received_code = notification.data.code
received_reason = notification.data.reason
else:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=self.proposed_streams[:]))
self.state = 'connected'
self.proposed_streams = None
self.greenlet = None
return
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except api.TimeoutError:
self.greenlet = None
self.cancel_proposal()
return
try:
remote_media = remote_sdp.media[stream.index]
except IndexError:
self._fail_proposal(originator='local', error='SDP media missing in answer')
return
else:
if remote_media.port:
stream.start(local_sdp, remote_sdp, stream.index)
else:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=received_code, reason=received_reason, proposed_streams=self.proposed_streams[:]))
self.state = 'connected'
self.proposed_streams = None
self.greenlet = None
return
with api.timeout(self.media_stream_timeout):
wait_count = 1
while wait_count > 0 or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
except (MediaStreamDidFailError, api.TimeoutError), e:
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail_proposal(originator='local', error=error)
except InvitationDisconnectedError, e:
self._fail_proposal(originator='local', error='session ended')
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
except SIPCoreError, e:
self._fail_proposal(originator='local', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
proposed_streams = self.proposed_streams[:]
notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=proposed_streams, proposed_streams=proposed_streams))
self.streams += self.proposed_streams
self.proposed_streams = None
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=proposed_streams, removed_streams=[]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('connected', 'sending_proposal')
@run_in_green_thread
def remove_stream(self, stream):
if stream not in self.streams:
self.state = 'connected'
return
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
self.streams.remove(stream)
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
local_sdp.media[stream.index].port = 0
local_sdp.media[stream.index].attributes = []
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
if not (200 <= notification.data.code < 300):
break
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except InvitationDisconnectedError, e:
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
except SIPCoreError:
raise #FIXME
else:
stream.end()
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=[stream]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('sending_proposal', 'cancelling_proposal')
@run_in_green_thread
def cancel_proposal(self):
if self.greenlet is not None:
api.kill(self.greenlet, api.GreenletExit())
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.cancel_reinvite()
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason))
if notification.data.code == 487:
for stream in self.proposed_streams:
stream.deactivate()
stream.end()
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=self.proposed_streams[:]))
elif notification.data.code == 200:
self.end()
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
break
except SIPCoreError, e:
for stream in self.proposed_streams:
stream.deactivate()
stream.end()
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None))
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=self.proposed_streams[:]))
self.proposed_streams = None
self.greenlet = None
self.state = 'connected'
except InvitationDisconnectedError, e:
self.proposed_streams = None
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
else:
self.proposed_streams = None
self.greenlet = None
self.state = 'connected'
finally:
if self._hold_in_progress:
self._send_hold()
@run_in_green_thread
def hold(self):
if self.on_hold or self._hold_in_progress:
return
self._hold_in_progress = True
streams = self.streams if self.streams is not None else self.proposed_streams
if not streams:
return
for stream in streams:
stream.hold()
if self.state == 'connected':
self._send_hold()
@run_in_green_thread
def unhold(self):
if not self.on_hold and not self._hold_in_progress:
return
self._hold_in_progress = False
streams = self.streams if self.streams is not None else self.proposed_streams
if not streams:
return
for stream in streams:
stream.unhold()
if self.state == 'connected':
self._send_unhold()
@run_in_green_thread
def end(self):
if self.state in (None, 'terminating', 'terminated'):
return
if self.greenlet is not None:
api.kill(self.greenlet, api.GreenletExit())
self.greenlet = None
notification_center = NotificationCenter()
if self._invitation is None or self._invitation.state is None:
# The invitation was not yet constructed
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
return
invitation_state = self._invitation.state
if invitation_state in ('disconnecting', 'disconnected'):
return
self.greenlet = api.getcurrent()
self.state = 'terminating'
if invitation_state == 'connected':
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local'))
streams = (self.streams or []) + (self.proposed_streams or [])
for stream in streams[:]:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
streams.remove(stream)
else:
stream.deactivate()
cancelling = invitation_state != 'connected' and self.direction == 'outgoing'
try:
self._invitation.end(timeout=1)
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected':
if notification.data.disconnect_reason in ('internal error', 'missing ACK'):
pass
elif notification.data.disconnect_reason == 'timeout':
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local' if self.direction=='outgoing' else 'remote', method='INVITE', code=408, reason='Timeout'))
elif cancelling:
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
elif hasattr(notification.data, 'method'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='remote', method=notification.data.method, code=200, reason=sip_status_messages[200]))
elif notification.data.disconnect_reason == 'user request':
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local', method='BYE', code=notification.data.code, reason=notification.data.reason))
break
except SIPCoreError, e:
if cancelling:
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None))
else:
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e)))
except InvitationDisconnectedError, e:
# As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE
if e.data.prev_state == 'connected':
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200]))
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason))
elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200]))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None))
else:
if e.data.originator == 'remote':
code = e.data.code
reason = e.data.reason
elif e.data.disconnect_reason == 'timeout':
code = 408
reason = 'timeout'
else:
code = 0
reason = None
if e.data.originator == 'remote' and code // 100 == 3:
redirect_identities = e.data.headers.get('Contact', [])
else:
redirect_identities = None
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=code, reason=reason))
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities))
else:
if cancelling:
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
else:
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request'))
finally:
for stream in streams:
stream.end()
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
@check_state(['connected'])
@run_in_twisted_thread
def transfer(self, target_uri, replaced_session=None):
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionTransferNewOutgoing', self, NotificationData(transfer_destination=target_uri, transfer_source=self.local_identity.uri))
try:
self._invitation.transfer(target_uri, replaced_session._invitation.dialog_id if replaced_session is not None else None)
except SIPCoreError, e:
notification_center.post_notification('SIPSessionTransferDidFail', sender=self, data=NotificationData(code=500, reason=str(e)))
@check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal'])
@check_transfer_state('incoming', 'starting')
def accept_transfer(self):
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionTransferDidStart', sender=self)
@check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal'])
@check_transfer_state('incoming', 'starting')
def reject_transfer(self, code=486, reason=None):
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionTransferDidFail', self, NotificationData(code=code, reason=reason or sip_status_messages[code]))
@property
def local_identity(self):
if self._invitation is not None and self._invitation.local_identity is not None:
return self._invitation.local_identity
else:
return self._local_identity
@property
def peer_address(self):
return self._invitation.peer_address if self._invitation is not None else None
@property
def remote_identity(self):
if self._invitation is not None and self._invitation.remote_identity is not None:
return self._invitation.remote_identity
else:
return self._remote_identity
@property
def remote_user_agent(self):
return self._invitation.remote_user_agent if self._invitation is not None else None
@property
def subject(self):
return self.__dict__['subject']
def _send_hold(self):
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(for_offer=True)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except InvitationDisconnectedError, e:
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
except SIPCoreError, e:
raise #FIXME
else:
self.greenlet = None
self.on_hold = True
self.state = 'connected'
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams)))
for notification in unhandled_notifications:
self.handle_notification(notification)
if not self._hold_in_progress:
for stream in self.streams:
stream.unhold()
self._send_unhold()
else:
self._hold_in_progress = False
def _send_unhold(self):
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(for_offer=True)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except InvitationDisconnectedError, e:
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
except SIPCoreError, e:
raise #FIXME
else:
self.greenlet = None
self.on_hold = False
self.state = 'connected'
notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
for stream in self.streams:
stream.hold()
self._send_hold()
def _fail(self, originator, code, reason, error, reason_header=None):
notification_center = NotificationCenter()
prev_inv_state = self._invitation.state
self.state = 'terminating'
if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'):
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator))
if self._invitation.state not in (None, 'disconnecting', 'disconnected'):
try:
if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'):
if 400<=code<=699 and reason is not None:
self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else [])
else:
self._invitation.end(extra_headers=[reason_header] if reason_header is not None else [])
with api.timeout(1):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected':
if prev_inv_state in ('connecting', 'connected'):
if notification.data.disconnect_reason in ('timeout', 'missing ACK'):
sip_code = 200
sip_reason = 'OK'
originator = 'local'
elif hasattr(notification.data, 'method'):
sip_code = 200
sip_reason = 'OK'
originator = 'remote'
else:
sip_code = notification.data.code
sip_reason = notification.data.reason
originator = 'local'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=originator, method='BYE', code=sip_code, reason=sip_reason))
elif self._invitation.direction == 'incoming' and prev_inv_state in ('incoming', 'early'):
ack_received = notification.data.disconnect_reason != 'missing ACK'
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=reason, ack_received=ack_received))
elif self._invitation.direction == 'outgoing' and prev_inv_state in ('outgoing', 'early'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=487, reason='Session Cancelled'))
break
except SIPCoreError:
pass
except api.TimeoutError:
if prev_inv_state in ('connecting', 'connected'):
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='BYE', code=408, reason=sip_status_messages[408]))
notification_center.remove_observer(self, sender=self._invitation)
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None))
self.greenlet = None
def _fail_proposal(self, originator, error):
notification_center = NotificationCenter()
for stream in self.proposed_streams:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
# _fail_proposal can be called from reject_proposal, which means the stream will
# not have been initialized or the session registered as an observer for it.
pass
else:
stream.deactivate()
stream.end()
if originator == 'remote' and self._invitation.sub_state == 'received_proposal':
try:
self._invitation.send_response(500)
except SIPCoreError:
pass
else:
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=500, reason=sip_status_messages[500], ack_received='unknown'))
notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:]))
self.state = 'connected'
self.proposed_streams = None
self.greenlet = None
@run_in_green_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPInvitationChangedState(self, notification):
if self.state == 'terminated':
return
if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'):
contact_header = notification.data.headers.get('Contact', None)
if contact_header and 'isfocus' in contact_header[0].parameters:
self.remote_focus = True
if self.greenlet is not None:
if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting':
self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data))
else:
self._channel.send(notification)
else:
self.greenlet = api.getcurrent()
try:
if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal':
self.state = 'received_proposal'
try:
proposed_remote_sdp = self._invitation.sdp.proposed_remote
active_remote_sdp = self._invitation.sdp.active_remote
for stream in self.streams:
if not stream.validate_update(proposed_remote_sdp, stream.index):
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)])
self.state = 'connected'
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
return
added_media_indexes = set()
removed_media_indexes = set()
for index, media_stream in enumerate(proposed_remote_sdp.media):
if index >= len(active_remote_sdp.media):
added_media_indexes.add(index)
elif media_stream.media != active_remote_sdp.media[index].media:
added_media_indexes.add(index)
removed_media_indexes.add(index)
elif not media_stream.port and active_remote_sdp.media[index].port:
removed_media_indexes.add(index)
removed_media_indexes.update(xrange(len(proposed_remote_sdp.media), len(active_remote_sdp.media)))
if added_media_indexes and removed_media_indexes:
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')])
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
elif added_media_indexes:
self.proposed_streams = []
for index in added_media_indexes:
media_stream = proposed_remote_sdp.media[index]
if media_stream.port != 0:
for stream_type in MediaStreamRegistry():
try:
stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index)
except InvalidStreamError:
break
except UnknownStreamError:
continue
else:
stream.index = index
self.proposed_streams.append(stream)
break
if self.proposed_streams:
self._invitation.send_response(100)
notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:]))
return
else:
self._invitation.send_response(488)
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
else:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes]
prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
for stream in removed_streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
local_sdp.media[stream.index].port = 0
local_sdp.media[stream.index].attributes = []
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(for_offer=False)
try:
self._invitation.send_response(200, sdp=local_sdp)
except PJSIPError, e:
if 'PJMEDIA_SDPNEG' in str(e):
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Changing the codec of an audio stream is currently not supported')])
self.state = 'connected'
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown'))
return
else:
raise
else:
for stream in removed_streams:
self.streams.remove(stream)
stream.end()
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown'))
received_invitation_state = False
received_sdp_update = False
while not received_sdp_update or not received_invitation_state:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
if on_hold_streams != prev_on_hold_streams:
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams),
partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams)))
if removed_media_indexes:
notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams))
except InvitationDisconnectedError, e:
self.greenlet = None
self.state == 'connected'
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = NotificationCenter()
self.handle_notification(notification)
except SIPCoreError:
raise #FIXME
else:
self.state = 'connected'
elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request':
self.state = 'received_proposal_request'
try:
# An empty proposal was received, generate an offer
self._invitation.send_response(100)
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip)
if local_sdp.connection is not None:
local_sdp.connection.address = connection_address
for stream in self.streams:
stream.reset(stream.index)
media = stream.get_local_media(for_offer=True)
if media.connection is not None:
media.connection.address = connection_address
local_sdp.media[stream.index] = media
self._invitation.send_response(200, sdp=local_sdp)
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown'))
received_invitation_state = False
received_sdp_update = False
while not received_sdp_update or not received_invitation_state:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except InvitationDisconnectedError, e:
self.greenlet = None
self.state == 'connected'
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = NotificationCenter()
self.handle_notification(notification)
except SIPCoreError:
raise #FIXME
else:
self.state = 'connected'
elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal':
if notification.data.originator == 'local' and notification.data.code == 487:
self.state = 'connected'
notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=self.proposed_streams[:]))
self.proposed_streams = None
if self._hold_in_progress:
self._send_hold()
elif notification.data.state == 'disconnected':
if self.state == 'incoming':
self.state = 'terminated'
if notification.data.originator == 'remote':
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown'))
notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None))
else:
# There must have been an error involved
notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None))
else:
notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator))
for stream in self.streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
if notification.data.originator == 'remote':
if hasattr(notification.data, 'method'):
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method=notification.data.method, code=200, reason=sip_status_messages[200]))
else:
notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method='INVITE', code=notification.data.code, reason=notification.data.reason))
self.end_time = datetime.now()
notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason))
notification.center.remove_observer(self, sender=self._invitation)
finally:
self.greenlet = None
def _NH_SIPInvitationGotSDPUpdate(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidInitialize(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidStart(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidFail(self, notification):
if self.greenlet is not None:
if self.state not in ('terminating', 'terminated'):
self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data))
else:
stream = notification.sender
if self.streams == [stream]:
self.end()
else:
try:
self.remove_stream(stream)
except IllegalStateError:
self.end()
class SessionManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
self.sessions = []
self.state = None
self._channel = coros.queue()
def start(self):
self.state = 'starting'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillStart', sender=self)
notification_center.add_observer(self, 'SIPInvitationChangedState')
notification_center.add_observer(self, 'SIPSessionNewIncoming')
notification_center.add_observer(self, 'SIPSessionNewOutgoing')
notification_center.add_observer(self, 'SIPSessionDidFail')
notification_center.add_observer(self, 'SIPSessionDidEnd')
self.state = 'started'
notification_center.post_notification('SIPSessionManagerDidStart', sender=self)
def stop(self):
self.state = 'stopping'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillEnd', sender=self)
for session in self.sessions:
session.end()
while self.sessions:
self._channel.wait()
notification_center.remove_observer(self, 'SIPInvitationChangedState')
notification_center.remove_observer(self, 'SIPSessionNewIncoming')
notification_center.remove_observer(self, 'SIPSessionNewOutgoing')
notification_center.remove_observer(self, 'SIPSessionDidFail')
notification_center.remove_observer(self, 'SIPSessionDidEnd')
self.state = 'stopped'
notification_center.post_notification('SIPSessionManagerDidEnd', sender=self)
@run_in_twisted_thread
def handle_notification(self, notification):
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming':
account_manager = AccountManager()
account = account_manager.find_account(notification.data.request_uri)
if account is None:
notification.sender.send_response(404)
return
notification.sender.send_response(100)
session = Session(account)
session.init_incoming(notification.sender, notification.data)
elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'):
self.sessions.append(notification.sender)
elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'):
self.sessions.remove(notification.sender)
if self.state == 'stopping':
self._channel.send(notification)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 3:17 AM (13 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408664
Default Alt Text
(343 KB)

Event Timeline