Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7159432
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
55 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/sipsimple/account/publication.py b/sipsimple/account/publication.py
index d99ae4da..fa1a72b6 100644
--- a/sipsimple/account/publication.py
+++ b/sipsimple/account/publication.py
@@ -1,390 +1,390 @@
"""Implements the publisher handlers"""
__all__ = ['Publisher', 'PresencePublisher', 'DialogPublisher']
import random
from abc import ABCMeta, abstractproperty
from threading import Lock
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.types import MarkerType
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implementer
from sipsimple.core import FromHeader, Publication, PublicationETagError, RouteHeader, SIPURI, SIPCoreError
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads.dialoginfo import DialogInfoDocument
from sipsimple.payloads.pidf import PIDFDocument
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
Command.register_defaults('publish', refresh_interval=None)
class SameState(metaclass=MarkerType): pass
class SIPPublicationDidFail(Exception):
def __init__(self, data):
self.data = data
class SIPPublicationDidNotEnd(Exception):
def __init__(self, data):
self.data = data
class PublicationError(Exception):
def __init__(self, error, retry_after, refresh_interval=None):
self.error = error
self.retry_after = retry_after
self.refresh_interval = refresh_interval
class PublisherNickname(dict):
def __missing__(self, name):
return self.setdefault(name, name[:-9] if name.endswith('Publisher') else name)
def __get__(self, obj, objtype):
return self[objtype.__name__]
def __set__(self, obj, value):
raise AttributeError('cannot set attribute')
def __delete__(self, obj):
raise AttributeError('cannot delete attribute')
@implementer(IObserver)
class Publisher(object, metaclass=ABCMeta):
__nickname__ = PublisherNickname()
__transports__ = frozenset(['tls', 'tcp', 'udp'])
def __init__(self, account):
self.account = account
self.started = False
self.active = False
self.publishing = False
self._lock = Lock()
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._publication = None
self._dns_wait = 1
self._publish_wait = 1
self._publication_timer = None
self.__dict__['state'] = None
@abstractproperty
def event(self):
return None
@abstractproperty
def payload_type(self):
return None
@property
def extra_headers(self):
return []
@property
def state(self):
return self.__dict__['state']
@state.setter
def state(self, state):
if state is not None and not isinstance(state, self.payload_type.root_element):
raise ValueError("state must be a %s document or None" % self.payload_type.root_element.__name__)
with self._lock:
old_state = self.__dict__['state']
self.__dict__['state'] = state
if state == old_state:
return
self._publish(state)
def start(self):
if self.started:
return
self.started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillStart', sender=self)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
notification_center.post_notification(self.__class__.__name__ + 'DidStart', sender=self)
notification_center.remove_observer(self, sender=self)
def stop(self):
if not self.started:
return
self.started = False
self.active = False
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillEnd', sender=self)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self._command_proc = None
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
notification_center.post_notification(self.__class__.__name__ + 'DidEnd', sender=self)
notification_center.remove_observer(self, sender=self)
def activate(self):
if not self.started:
raise RuntimeError("not started")
self.active = True
self._command_channel.send(Command('publish', state=self.state))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidActivate', sender=self)
def deactivate(self):
if not self.started:
raise RuntimeError("not started")
self.active = False
self._command_channel.send(Command('unpublish'))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
@run_in_twisted_thread
def _publish(self, state):
if not self.active:
return
if state is None:
self._command_channel.send(Command('unpublish'))
else:
self._command_channel.send(Command('publish', state=state))
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_publish(self, command):
if command.state is None or self._publication is None and command.state is SameState:
command.signal()
return
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._publication_timer is not None and self._publication_timer.active():
self._publication_timer.cancel()
self._publication_timer = None
if self._publication is None:
duration = command.refresh_interval or self.account.sip.publish_interval
from_header = FromHeader(self.account.uri, self.account.display_name)
self._publication = Publication(from_header, self.event, self.payload_type.content_type, credentials=self.account.credentials, duration=duration, extra_headers=self.extra_headers)
notification_center.add_observer(self, sender=self._publication)
notification_center.post_notification(self.__class__.__name__ + 'WillPublish', sender=self, data=NotificationData(state=command.state, duration=duration))
else:
notification_center.post_notification(self.__class__.__name__ + 'WillRefresh', sender=self, data=NotificationData(state=command.state))
try:
# Lookup routes
valid_transports = self.__transports__.intersection(settings.sip.transport_list)
if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in valid_transports:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
else:
uri = SIPURI(host=self.account.id.domain)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, valid_transports).wait()
except DNSLookupError as e:
retry_after = random.uniform(self._dns_wait, 2*self._dns_wait)
self._dns_wait = limit(2*self._dns_wait, max=30)
raise PublicationError('DNS lookup failed: %s' % e, retry_after=retry_after)
else:
self._dns_wait = 1
body = None if command.state is SameState else command.state.toxml()
# Publish by trying each route in turn
publish_timeout = time() + 30
for route in routes:
remaining_time = publish_timeout-time()
if remaining_time > 0:
try:
try:
self._publication.publish(body, RouteHeader(route.uri), timeout=limit(remaining_time, min=1, max=10))
except ValueError as e: # this happens for an initial PUBLISH with body=None
raise PublicationError(str(e), retry_after=0)
except PublicationETagError:
state = self.state # access self.state only once to avoid race conditions
if state is not None:
self._publication.publish(state.toxml(), RouteHeader(route.uri), timeout=limit(remaining_time, min=1, max=10))
else:
command.signal()
return
except SIPCoreError:
raise PublicationError('Internal error', retry_after=5)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPPublicationDidSucceed':
break
if notification.name == 'SIPPublicationDidEnd':
raise PublicationError('Publication expired', retry_after=0) # publication expired while we were trying to re-publish
except SIPPublicationDidFail as e:
if e.data.code == 407:
# Authentication failed, so retry the publication in some time
raise PublicationError('Authentication failed', retry_after=random.uniform(60, 120))
elif e.data.code == 412:
raise PublicationError('Conditional request failed', retry_after=0)
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > self.account.sip.publish_interval:
refresh_interval = e.data.min_expires
else:
refresh_interval = None
raise PublicationError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval)
elif e.data.code in (405, 406, 489):
raise PublicationError('Method or event not supported', retry_after=3600)
else:
# Otherwise just try the next route
continue
else:
self.publishing = True
self._publish_wait = 1
command.signal()
break
else:
# There are no more routes to try, reschedule the publication
retry_after = random.uniform(self._publish_wait, 2*self._publish_wait)
self._publish_wait = limit(self._publish_wait*2, max=30)
raise PublicationError('No more routes to try', retry_after=retry_after)
except PublicationError as e:
self.publishing = False
notification_center.remove_observer(self, sender=self._publication)
- def publish():
+ def publish(e):
if self.active:
self._command_channel.send(Command('publish', event=command.event, state=self.state, refresh_interval=e.refresh_interval))
else:
command.signal()
self._publication_timer = None
- self._publication_timer = reactor.callLater(e.retry_after, publish)
+ self._publication_timer = reactor.callLater(e.retry_after, publish, e)
self._publication = None
notification_center.post_notification(self.__nickname__ + 'PublicationDidFail', sender=self, data=NotificationData(reason=e.error))
else:
notification_center.post_notification(self.__nickname__ + 'PublicationDidSucceed', sender=self)
def _CH_unpublish(self, command):
# Cancel any timer which would restart the publication process
if self._publication_timer is not None and self._publication_timer.active():
self._publication_timer.cancel()
self._publication_timer = None
publishing = self.publishing
self.publishing = False
if self._publication is not None:
notification_center = NotificationCenter()
if publishing:
self._publication.end(timeout=2)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPPublicationDidEnd':
break
except (SIPPublicationDidFail, SIPPublicationDidNotEnd):
notification_center.post_notification(self.__nickname__ + 'PublicationDidNotEnd', sender=self)
else:
notification_center.post_notification(self.__nickname__ + 'PublicationDidEnd', sender=self)
notification_center.remove_observer(self, sender=self._publication)
self._publication = None
command.signal()
def _CH_terminate(self, command):
self._CH_unpublish(command)
raise proc.ProcExit
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPPublicationDidSucceed(self, notification):
if notification.sender is self._publication:
self._data_channel.send(notification)
def _NH_SIPPublicationDidFail(self, notification):
if notification.sender is self._publication:
self._data_channel.send_exception(SIPPublicationDidFail(notification.data))
def _NH_SIPPublicationDidEnd(self, notification):
if notification.sender is self._publication:
self._data_channel.send(notification)
def _NH_SIPPublicationDidNotEnd(self, notification):
if notification.sender is self._publication:
self._data_channel.send_exception(SIPPublicationDidNotEnd(notification.data))
def _NH_SIPPublicationWillExpire(self, notification):
if notification.sender is self._publication:
self._publish(SameState)
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'presence.enabled' in notification.data.modified:
if self.account.presence.enabled:
self.activate()
else:
self.deactivate()
elif self.active and {'__id__', 'auth.password', 'auth.username', 'sip.outbound_proxy', 'sip.transport_list', 'sip.publish_interval'}.intersection(notification.data.modified):
self._command_channel.send(Command('unpublish'))
self._command_channel.send(Command('publish', state=self.state))
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._command_channel.send(Command('unpublish'))
self._command_channel.send(Command('publish', state=self.state))
class PresencePublisher(Publisher):
"""A publisher for presence state"""
@property
def event(self):
return 'presence'
@property
def payload_type(self):
return PIDFDocument
def _NH_PresencePublisherDidStart(self, notification):
if self.account.presence.enabled:
self.activate()
class DialogPublisher(Publisher):
"""A publisher for dialog info state"""
@property
def event(self):
return 'dialog'
@property
def payload_type(self):
return DialogInfoDocument
def _NH_DialogPublisherDidStart(self, notification):
if self.account.presence.enabled:
self.activate()
diff --git a/sipsimple/account/registration.py b/sipsimple/account/registration.py
index 95b9beb0..37f881f4 100644
--- a/sipsimple/account/registration.py
+++ b/sipsimple/account/registration.py
@@ -1,309 +1,309 @@
"""Implements the registration handler"""
__all__ = ['Registrar']
import random
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implementer
from sipsimple.core import ContactHeader, FromHeader, Header, Registration, RouteHeader, SIPURI, SIPCoreError, NoGRUU
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
Command.register_defaults('register', refresh_interval=None)
class SIPRegistrationDidFail(Exception):
def __init__(self, data):
self.data = data
class SIPRegistrationDidNotEnd(Exception):
def __init__(self, data):
self.data = data
class RegistrationError(Exception):
def __init__(self, error, retry_after, refresh_interval=None):
self.error = error
self.retry_after = retry_after
self.refresh_interval = refresh_interval
@implementer(IObserver)
class Registrar(object):
def __init__(self, account):
self.account = account
self.started = False
self.active = False
self.registered = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._registration = None
self._dns_wait = 1
self._register_wait = 1
self._registration_timer = None
def start(self):
if self.started:
return
self.started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
if self.account.sip.register:
self.activate()
def stop(self):
if not self.started:
return
self.started = False
self.active = False
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self._command_proc = None
def activate(self):
if not self.started:
raise RuntimeError("not started")
self.active = True
self._command_channel.send(Command('register'))
def deactivate(self):
if not self.started:
raise RuntimeError("not started")
self.active = False
self._command_channel.send(Command('unregister'))
def reregister(self):
if self.active:
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_register(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._registration_timer is not None and self._registration_timer.active():
self._registration_timer.cancel()
self._registration_timer = None
# Initialize the registration
if self._registration is None:
duration = command.refresh_interval or self.account.sip.register_interval
self._registration = Registration(FromHeader(self.account.uri, self.account.display_name),
credentials=self.account.credentials,
duration=duration,
extra_headers=[Header('Supported', 'gruu')])
notification_center.add_observer(self, sender=self._registration)
notification_center.post_notification('SIPAccountWillRegister', sender=self.account)
else:
notification_center.post_notification('SIPAccountRegistrationWillRefresh', sender=self.account)
try:
# Lookup routes
if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
else:
uri = SIPURI(host=self.account.id.domain)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError as e:
retry_after = random.uniform(self._dns_wait, 2*self._dns_wait)
self._dns_wait = limit(2*self._dns_wait, max=30)
raise RegistrationError('DNS lookup failed: %s' % e, retry_after=retry_after)
else:
self._dns_wait = 1
# Register by trying each route in turn
register_timeout = time() + 30
for route in routes:
remaining_time = register_timeout-time()
if remaining_time > 0:
try:
contact_uri = self.account.contact[NoGRUU, route]
except KeyError:
continue
contact_header = ContactHeader(contact_uri)
instance_id = '"<%s>"' % settings.instance_id
contact_header.parameters[b"+sip.instance"] = instance_id.encode()
if self.account.nat_traversal.use_ice:
contact_header.parameters[b"+sip.ice"] = None
route_header = RouteHeader(route.uri)
try:
self._registration.register(contact_header, route_header, timeout=limit(remaining_time, min=1, max=10))
except SIPCoreError:
raise RegistrationError('Internal error', retry_after=5)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPRegistrationDidSucceed':
break
if notification.name == 'SIPRegistrationDidEnd':
raise RegistrationError('Registration expired', retry_after=0) # registration expired while we were trying to re-register
except SIPRegistrationDidFail as e:
notification_data = NotificationData(code=e.data.code, reason=e.data.reason, registration=self._registration, registrar=route)
notification_center.post_notification('SIPAccountRegistrationGotAnswer', sender=self.account, data=notification_data)
if e.data.code == 401:
# Authentication failed, so retry the registration in some time
raise RegistrationError('Authentication failed', retry_after=random.uniform(60, 120))
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > self.account.sip.register_interval:
refresh_interval = e.data.min_expires
else:
refresh_interval = None
raise RegistrationError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval)
else:
# Otherwise just try the next route
#continue
break
else:
notification_data = NotificationData(code=notification.data.code, reason=notification.data.reason, registration=self._registration, registrar=route)
notification_center.post_notification('SIPAccountRegistrationGotAnswer', sender=self.account, data=notification_data)
self.registered = True
# Save GRUU
try:
header = next(header for header in notification.data.contact_header_list if header.parameters.get('+sip.instance', '').strip('"<>') == settings.instance_id)
except StopIteration:
self.account.contact.public_gruu = None
self.account.contact.temporary_gruu = None
else:
public_gruu = header.parameters.get('pub-gruu', None)
temporary_gruu = header.parameters.get('temp-gruu', None)
try:
self.account.contact.public_gruu = SIPURI.parse(public_gruu.strip('"'))
except (AttributeError, SIPCoreError):
self.account.contact.public_gruu = None
try:
self.account.contact.temporary_gruu = SIPURI.parse(temporary_gruu.strip('"'))
except (AttributeError, SIPCoreError):
self.account.contact.temporary_gruu = None
notification_data = NotificationData(contact_header=notification.data.contact_header,
contact_header_list=notification.data.contact_header_list,
expires=notification.data.expires_in, registrar=route)
notification_center.post_notification('SIPAccountRegistrationDidSucceed', sender=self.account, data=notification_data)
self._register_wait = 1
command.signal()
break
else:
# There are no more routes to try, reschedule the registration
retry_after = random.uniform(self._register_wait, 2*self._register_wait)
self._register_wait = limit(self._register_wait*2, max=30)
raise RegistrationError('No more routes to try', retry_after=retry_after)
except RegistrationError as e:
self.registered = False
notification_center.remove_observer(self, sender=self._registration)
notification_center.post_notification('SIPAccountRegistrationDidFail', sender=self.account, data=NotificationData(error=e.error, retry_after=e.retry_after))
- def register():
+ def register(e):
if self.active:
self._command_channel.send(Command('register', command.event, refresh_interval=e.refresh_interval))
self._registration_timer = None
- self._registration_timer = reactor.callLater(e.retry_after, register)
+ self._registration_timer = reactor.callLater(e.retry_after, register, e)
self._registration = None
self.account.contact.public_gruu = None
self.account.contact.temporary_gruu = None
def _CH_unregister(self, command):
# Cancel any timer which would restart the registration process
if self._registration_timer is not None and self._registration_timer.active():
self._registration_timer.cancel()
self._registration_timer = None
registered = self.registered
self.registered = False
if self._registration is not None:
notification_center = NotificationCenter()
if registered:
self._registration.end(timeout=2)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPRegistrationDidEnd':
break
except (SIPRegistrationDidFail, SIPRegistrationDidNotEnd) as e:
notification_center.post_notification('SIPAccountRegistrationDidNotEnd', sender=self.account, data=NotificationData(code=e.data.code, reason=e.data.reason,
registration=self._registration))
else:
notification_center.post_notification('SIPAccountRegistrationDidEnd', sender=self.account, data=NotificationData(registration=self._registration))
notification_center.remove_observer(self, sender=self._registration)
self._registration = None
self.account.contact.public_gruu = None
self.account.contact.temporary_gruu = None
command.signal()
def _CH_terminate(self, command):
self._CH_unregister(command)
raise proc.ProcExit
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPRegistrationDidSucceed(self, notification):
if notification.sender is self._registration:
self._data_channel.send(notification)
def _NH_SIPRegistrationDidFail(self, notification):
if notification.sender is self._registration:
self._data_channel.send_exception(SIPRegistrationDidFail(notification.data))
def _NH_SIPRegistrationDidEnd(self, notification):
if notification.sender is self._registration:
self._data_channel.send(notification)
def _NH_SIPRegistrationDidNotEnd(self, notification):
if notification.sender is self._registration:
self._data_channel.send_exception(SIPRegistrationDidNotEnd(notification.data))
def _NH_SIPRegistrationWillExpire(self, notification):
if self.active:
self._command_channel.send(Command('register'))
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'sip.register' in notification.data.modified:
if self.account.sip.register:
self.activate()
else:
self.deactivate()
elif self.active and {'__id__', 'auth.password', 'auth.username', 'nat_traversal.use_ice', 'sip.outbound_proxy', 'sip.transport_list', 'sip.register_interval'}.intersection(notification.data.modified):
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
diff --git a/sipsimple/account/subscription.py b/sipsimple/account/subscription.py
index 1373c7fe..39eacc61 100644
--- a/sipsimple/account/subscription.py
+++ b/sipsimple/account/subscription.py
@@ -1,494 +1,494 @@
"""Implements the subscription handlers"""
__all__ = ['Subscriber', 'MWISubscriber', 'PresenceWinfoSubscriber', 'DialogWinfoSubscriber', 'PresenceSubscriber', 'SelfPresenceSubscriber', 'DialogSubscriber']
import random
from abc import ABCMeta, abstractproperty
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from eventlib import coros, proc
from twisted.internet import reactor
from zope.interface import implementer
from sipsimple.core import ContactHeader, FromHeader, Header, RouteHeader, SIPURI, Subscription, ToHeader, SIPCoreError, NoGRUU
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
Command.register_defaults('subscribe', refresh_interval=None)
class SIPSubscriptionDidFail(Exception):
def __init__(self, data):
self.data = data
class SubscriptionError(Exception):
def __init__(self, error, retry_after, refresh_interval=None):
self.error = error
self.retry_after = retry_after
self.refresh_interval = refresh_interval
class InterruptSubscription(Exception): pass
class TerminateSubscription(Exception): pass
class Content(object):
def __init__(self, body, type):
self.body = body
self.type = type
class SubscriberNickname(dict):
def __missing__(self, name):
return self.setdefault(name, name[:-10] if name.endswith('Subscriber') else name)
def __get__(self, obj, objtype):
return self[objtype.__name__]
def __set__(self, obj, value):
raise AttributeError('cannot set attribute')
def __delete__(self, obj):
raise AttributeError('cannot delete attribute')
@implementer(IObserver)
class Subscriber(object, metaclass=ABCMeta):
__nickname__ = SubscriberNickname()
__transports__ = frozenset(['tls', 'tcp', 'udp'])
def __init__(self, account):
self.account = account
self.started = False
self.active = False
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._subscription = None
self._subscription_proc = None
self._subscription_timer = None
@abstractproperty
def event(self):
return None
@property
def subscription_uri(self):
return self.account.id
@property
def content(self):
return Content(None, None)
@property
def extra_headers(self):
return []
def start(self):
if self.started:
return
self.started = True
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillStart', sender=self)
notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
notification_center.post_notification(self.__class__.__name__ + 'DidStart', sender=self)
notification_center.remove_observer(self, sender=self)
def stop(self):
if not self.started:
return
self.started = False
self.active = False
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.post_notification(self.__class__.__name__ + 'WillEnd', sender=self)
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self._command_proc = None
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
notification_center.post_notification(self.__class__.__name__ + 'DidEnd', sender=self)
notification_center.remove_observer(self, sender=self)
def activate(self):
if not self.started:
raise RuntimeError("not started")
self.active = True
self._command_channel.send(Command('subscribe'))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidActivate', sender=self)
def deactivate(self):
if not self.started:
raise RuntimeError("not started")
self.active = False
self._command_channel.send(Command('unsubscribe'))
notification_center = NotificationCenter()
notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self)
def resubscribe(self):
if self.active:
self._command_channel.send(Command('subscribe'))
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_subscribe(self, command):
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._subscription_proc = proc.spawn(self._subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._subscription_proc = None
command.signal()
def _CH_terminate(self, command):
self._CH_unsubscribe(command)
raise proc.ProcExit
def _subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
subscription_uri = self.subscription_uri
refresh_interval = command.refresh_interval or self.account.sip.subscribe_interval
valid_transports = self.__transports__.intersection(settings.sip.transport_list)
try:
# Lookup routes
if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in valid_transports:
uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport})
elif self.account.sip.always_use_my_proxy:
uri = SIPURI(host=self.account.id.domain)
else:
uri = SIPURI(host=subscription_uri.domain)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, valid_transports).wait()
except DNSLookupError as e:
raise SubscriptionError('DNS lookup failed: %s' % e, retry_after=random.uniform(15, 30))
subscription_uri = SIPURI(user=subscription_uri.username, host=subscription_uri.domain)
content = self.content
timeout = time() + 30
for route in routes:
remaining_time = timeout - time()
if remaining_time > 0:
try:
contact_uri = self.account.contact[NoGRUU, route]
except KeyError:
continue
subscription = Subscription(subscription_uri, FromHeader(self.account.uri, self.account.display_name),
ToHeader(subscription_uri),
ContactHeader(contact_uri),
self.event,
RouteHeader(route.uri),
credentials=self.account.credentials,
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(body=content.body, content_type=content.type, extra_headers=self.extra_headers, timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=subscription)
raise SubscriptionError('Internal error', retry_after=5)
self._subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPSubscriptionDidStart':
break
except SIPSubscriptionDidFail as e:
notification_center.remove_observer(self, sender=subscription)
self._subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
raise SubscriptionError('Authentication failed', retry_after=random.uniform(60, 120))
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > self.account.sip.subscribe_interval:
refresh_interval = e.data.min_expires
else:
refresh_interval = None
raise SubscriptionError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval)
elif e.data.code in (405, 406, 489):
raise SubscriptionError('Method or event not supported', retry_after=3600)
elif e.data.code == 1400:
raise SubscriptionError(e.data.reason, retry_after=3600)
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, reschedule the subscription
raise SubscriptionError('No more routes to try', retry_after=random.uniform(60, 180))
# At this point it is subscribed. Handle notifications and ending/failures.
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidStart', sender=self)
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPSubscriptionGotNotify':
notification_center.post_notification(self.__nickname__ + 'SubscriptionGotNotify', sender=self, data=notification.data)
elif notification.name == 'SIPSubscriptionDidEnd':
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='remote'))
if self.active:
self._command_channel.send(Command('subscribe'))
break
except SIPSubscriptionDidFail:
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidFail', sender=self)
if self.active:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._subscription)
except InterruptSubscription as e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
notification_center.remove_observer(self, sender=self._subscription)
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
finally:
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='local'))
except TerminateSubscription as e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._subscription)
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='local'))
except SubscriptionError as e:
- def subscribe():
+ def subscribe(e):
if self.active:
self._command_channel.send(Command('subscribe', command.event, refresh_interval=e.refresh_interval))
self._subscription_timer = None
- self._subscription_timer = reactor.callLater(e.retry_after, subscribe)
+ self._subscription_timer = reactor.callLater(e.retry_after, subscribe, e)
notification_center.post_notification(self.__nickname__ + 'SubscriptionDidFail', sender=self)
finally:
self.subscribed = False
self._subscription = None
self._subscription_proc = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
if notification.sender is self._subscription:
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
if notification.sender is self._subscription:
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
if notification.sender is self._subscription:
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
if notification.sender is self._subscription:
self._data_channel.send(notification)
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._command_channel.send(Command('subscribe'))
class MWISubscriber(Subscriber):
"""Message Waiting Indicator subscriber"""
@property
def event(self):
return 'message-summary'
@property
def subscription_uri(self):
return self.account.message_summary.voicemail_uri or self.account.id
def _NH_MWISubscriberWillStart(self, notification):
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_MWISubscriberWillEnd(self, notification):
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_MWISubscriberDidStart(self, notification):
if self.account.message_summary.enabled:
self.activate()
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'message_summary.enabled' in notification.data.modified:
if self.account.message_summary.enabled:
self.activate()
else:
self.deactivate()
elif self.active and {'__id__', 'auth.password', 'auth.username', 'message_summary.voicemail_uri', 'sip.always_use_my_proxy', 'sip.outbound_proxy',
'sip.subscribe_interval', 'sip.transport_list'}.intersection(notification.data.modified):
self._command_channel.send(Command('subscribe'))
class AbstractPresenceSubscriber(Subscriber):
"""Abstract class defining behavior for all presence subscribers"""
__transports__ = frozenset(['tls', 'tcp'])
def _NH_AbstractPresenceSubscriberWillStart(self, notification):
notification.center.add_observer(self, name='SIPAccountDidDiscoverXCAPSupport', sender=self.account)
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_AbstractPresenceSubscriberWillEnd(self, notification):
notification.center.remove_observer(self, name='SIPAccountDidDiscoverXCAPSupport', sender=self.account)
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account)
notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings())
def _NH_AbstractPresenceSubscriberDidStart(self, notification):
if self.account.presence.enabled and self.account.xcap.discovered:
self.activate()
def _NH_SIPAccountDidDiscoverXCAPSupport(self, notification):
if self.account.presence.enabled and not self.active:
self.activate()
@run_in_green_thread
def _NH_CFGSettingsObjectDidChange(self, notification):
if not self.started or not self.account.xcap.discovered:
return
if 'enabled' in notification.data.modified:
return # global account activation is handled separately by the account itself
elif 'presence.enabled' in notification.data.modified:
if self.account.presence.enabled:
self.activate()
else:
self.deactivate()
elif self.active and {'__id__', 'auth.password', 'auth.username', 'sip.always_use_my_proxy', 'sip.outbound_proxy', 'sip.subscribe_interval', 'sip.transport_list'}.intersection(notification.data.modified):
self._command_channel.send(Command('subscribe'))
class PresenceWinfoSubscriber(AbstractPresenceSubscriber):
"""Presence Watcher Info subscriber"""
_NH_PresenceWinfoSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_PresenceWinfoSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_PresenceWinfoSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'presence.winfo'
class DialogWinfoSubscriber(AbstractPresenceSubscriber):
"""Dialog Watcher Info subscriber"""
_NH_DialogWinfoSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_DialogWinfoSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_DialogWinfoSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'dialog.winfo'
class PresenceSubscriber(AbstractPresenceSubscriber):
"""Presence subscriber"""
_NH_PresenceSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_PresenceSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_PresenceSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'presence'
@property
def subscription_uri(self):
return self.account.xcap_manager.rls_presence_uri
@property
def extra_headers(self):
return [Header('Supported', 'eventlist')]
class SelfPresenceSubscriber(AbstractPresenceSubscriber):
"""Self presence subscriber"""
_NH_SelfPresenceSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_SelfPresenceSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_SelfPresenceSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'presence'
@property
def subscription_uri(self):
return self.account.id
class DialogSubscriber(AbstractPresenceSubscriber):
"""Dialog subscriber"""
_NH_DialogSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart
_NH_DialogSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd
_NH_DialogSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart
@property
def event(self):
return 'dialog'
@property
def subscription_uri(self):
return self.account.xcap_manager.rls_dialog_uri
@property
def extra_headers(self):
return [Header('Supported', 'eventlist')]
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 5:49 AM (1 d, 8 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3407604
Default Alt Text
(55 KB)
Attached To
Mode
rPYNSIPSIMPLE python3-sipsimple
Attached
Detach File
Event Timeline
Log In to Comment