diff --git a/sipsimple/account/bonjour/__init__.py b/sipsimple/account/bonjour/__init__.py index 3462b402..ede0d759 100644 --- a/sipsimple/account/bonjour/__init__.py +++ b/sipsimple/account/bonjour/__init__.py @@ -1,503 +1,503 @@ """Implements Bonjour service handlers""" __all__ = ['BonjourServices'] import re import uuid from threading import Lock from weakref import WeakKeyDictionary from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib import api, coros, proc from eventlib.green import select from twisted.internet import reactor from zope.interface import implementer from sipsimple import log from sipsimple.account.bonjour import _bonjour from sipsimple.core import FrozenSIPURI, SIPCoreError, NoGRUU from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.threading import call_in_twisted_thread, run_in_twisted_thread from sipsimple.threading.green import Command class RestartSelect(Exception): pass class BonjourFile(object): __instances__ = WeakKeyDictionary() def __new__(cls, file): if cls is BonjourFile: raise TypeError("BonjourFile cannot be instantiated directly") instance = cls.__instances__.get(file) if instance is None: instance = object.__new__(cls) instance.file = file instance.active = False cls.__instances__[file] = instance return instance def fileno(self): return self.file.fileno() if not self.closed else -1 def close(self): self.file.close() self.file = None @property def closed(self): return self.file is None @classmethod def find_by_file(cls, file): """Return the instance matching the given DNSServiceRef file""" try: return cls.__instances__[file] except KeyError: raise KeyError("cannot find a %s matching the given DNSServiceRef file" % cls.__name__) class BonjourDiscoveryFile(BonjourFile): def __new__(cls, file, transport): instance = BonjourFile.__new__(cls, file) instance.transport = transport return instance class BonjourRegistrationFile(BonjourFile): def __new__(cls, file, transport): instance = BonjourFile.__new__(cls, file) instance.transport = transport return instance class BonjourResolutionFile(BonjourFile): def __new__(cls, file, discovery_file, service_description): instance = BonjourFile.__new__(cls, file) instance.discovery_file = discovery_file instance.service_description = service_description return instance @property def transport(self): return self.discovery_file.transport class BonjourServiceDescription(object): def __init__(self, name, type, domain): self.name = name self.type = type self.domain = domain def __repr__(self): return "%s(%r, %r, %r)" % (self.__class__.__name__, self.name, self.type, self.domain) def __hash__(self): return hash(self.name) def __eq__(self, other): if isinstance(other, BonjourServiceDescription): return self.name==other.name and self.type==other.type and self.domain==other.domain return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal class BonjourNeighbourPresence(object): def __init__(self, state, note): self.state = state self.note = note class BonjourNeighbourRecord(object): def __init__(self, service_description, host, txtrecord): self.id = txtrecord.get('instance_id', None).decode() self.name = txtrecord.get('name', '').decode('utf-8') or None self.host = re.match(r'^(?P.*?)(\.local)?\.?$', host).group('host') contact = txtrecord.get('contact', service_description.name).decode() self.uri = FrozenSIPURI.parse(contact) state = txtrecord.get('state', txtrecord.get('status', None)) # status is read for legacy (remove later) -Dan if state: state = state.decode() note = txtrecord.get('note', '') or None if note: note = note.decode() self.presence = BonjourNeighbourPresence(state, note) @implementer(IObserver) class BonjourServices(object): def __init__(self, account): self.account = account self._started = False self._files = [] self._neighbours = {} self._command_channel = coros.queue() self._select_proc = None self._discover_timer = None self._register_timer = None self._update_timer = None self._lock = Lock() self.__dict__['presence_state'] = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='NetworkConditionsDidChange') self._select_proc = proc.spawn(self._process_files) proc.spawn(self._handle_commands) def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._select_proc.kill() self._command_channel.send_exception(api.GreenletExit) def activate(self): self._started = True self._command_channel.send(Command('register')) self._command_channel.send(Command('discover')) def deactivate(self): command = Command('stop') self._command_channel.send(command) command.wait() self._started = False def restart_discovery(self): self._command_channel.send(Command('discover')) def restart_registration(self): self._command_channel.send(Command('unregister')) self._command_channel.send(Command('register')) def update_registrations(self): self._command_channel.send(Command('update_registrations')) @property def presence_state(self): return self.__dict__['presence_state'] @presence_state.setter def presence_state(self, state): if state is not None and not isinstance(state, BonjourPresenceState): raise ValueError("state must be a BonjourPresenceState instance or None") with self._lock: old_state = self.__dict__['presence_state'] self.__dict__['presence_state'] = state if state != old_state: call_in_twisted_thread(self.update_registrations) def _register_cb(self, file, flags, error_code, name, regtype, domain): notification_center = NotificationCenter() file = BonjourRegistrationFile.find_by_file(file) if error_code == _bonjour.kDNSServiceErr_NoError: notification_center.post_notification('BonjourAccountRegistrationDidSucceed', sender=self.account, data=NotificationData(name=name, transport=file.transport)) else: error = _bonjour.BonjourError(error_code) notification_center.post_notification('BonjourAccountRegistrationDidFail', sender=self.account, data=NotificationData(reason=str(error), transport=file.transport)) self._files.remove(file) self._select_proc.kill(RestartSelect) file.close() if self._register_timer is None: - self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register')) + self._register_timer = reactor.callLater(10, self._command_channel.send, Command('register')) def _browse_cb(self, file, flags, interface_index, error_code, service_name, regtype, reply_domain): notification_center = NotificationCenter() file = BonjourDiscoveryFile.find_by_file(file) service_description = BonjourServiceDescription(service_name, regtype, reply_domain) if error_code != _bonjour.kDNSServiceErr_NoError: error = _bonjour.BonjourError(error_code) notification_center.post_notification('BonjourAccountDiscoveryDidFail', sender=self.account, data=NotificationData(reason=str(error), transport=file.transport)) removed_files = [file] + [f for f in self._files if isinstance(f, BonjourResolutionFile) and f.discovery_file==file] for f in removed_files: self._files.remove(f) self._select_proc.kill(RestartSelect) for f in removed_files: f.close() if self._discover_timer is None: - self._discover_timer = reactor.callLater(1, self._command_channel.send, Command('discover')) + self._discover_timer = reactor.callLater(5, self._command_channel.send, Command('discover')) return if reply_domain != 'local.': return if flags & _bonjour.kDNSServiceFlagsAdd: try: resolution_file = next((f for f in self._files if isinstance(f, BonjourResolutionFile) and f.discovery_file==file and f.service_description==service_description)) except StopIteration: try: resolution_file = _bonjour.DNSServiceResolve(0, interface_index, service_name, regtype, reply_domain, self._resolve_cb) except _bonjour.BonjourError as e: notification_center.post_notification('BonjourAccountDiscoveryFailure', sender=self.account, data=NotificationData(error=str(e), transport=file.transport)) else: resolution_file = BonjourResolutionFile(resolution_file, discovery_file=file, service_description=service_description) self._files.append(resolution_file) self._select_proc.kill(RestartSelect) else: try: resolution_file = next((f for f in self._files if isinstance(f, BonjourResolutionFile) and f.discovery_file==file and f.service_description==service_description)) except StopIteration: pass else: self._files.remove(resolution_file) self._select_proc.kill(RestartSelect) resolution_file.close() service_description = resolution_file.service_description if service_description in self._neighbours: record = self._neighbours.pop(service_description) notification_center.post_notification('BonjourAccountDidRemoveNeighbour', sender=self.account, data=NotificationData(neighbour=service_description, record=record)) def _resolve_cb(self, file, flags, interface_index, error_code, fullname, host_target, port, txtrecord): notification_center = NotificationCenter() settings = SIPSimpleSettings() file = BonjourResolutionFile.find_by_file(file) if error_code == _bonjour.kDNSServiceErr_NoError: service_description = file.service_description try: record = BonjourNeighbourRecord(service_description, host_target, _bonjour.TXTRecord.parse(txtrecord)) except SIPCoreError: pass else: transport = record.uri.transport supported_transport = transport in settings.sip.transport_list and (transport!='tls' or self.account.tls.certificate is not None) if not supported_transport and service_description in self._neighbours: record = self._neighbours.pop(service_description) notification_center.post_notification('BonjourAccountDidRemoveNeighbour', sender=self.account, data=NotificationData(neighbour=service_description, record=record)) elif supported_transport: try: our_contact_uri = self.account.contact[NoGRUU, transport] except KeyError: return if str(record.uri) != str(our_contact_uri): had_neighbour = service_description in self._neighbours self._neighbours[service_description] = record notification_name = 'BonjourAccountDidUpdateNeighbour' if had_neighbour else 'BonjourAccountDidAddNeighbour' notification_data = NotificationData(neighbour=service_description, record=record) notification_center.post_notification(notification_name, sender=self.account, data=notification_data) else: self._files.remove(file) self._select_proc.kill(RestartSelect) file.close() error = _bonjour.BonjourError(error_code) notification_center.post_notification('BonjourAccountDiscoveryFailure', sender=self.account, data=NotificationData(error=str(error), transport=file.transport)) # start a new resolve process here? -Dan def _process_files(self): while True: try: ready = select.select([f for f in self._files if not f.active and not f.closed], [], [])[0] except RestartSelect: continue else: for file in ready: file.active = True self._command_channel.send(Command('process_results', files=[f for f in ready if not f.closed])) def _handle_commands(self): while True: command = self._command_channel.wait() if self._started: handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_unregister(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile)): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): notification_center.post_notification('BonjourAccountRegistrationDidEnd', sender=self.account, data=NotificationData(transport=transport)) command.signal() def _CH_register(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None supported_transports = set(transport for transport in settings.sip.transport_list if transport != 'tls' or self.account.tls.certificate is not None) registered_transports = set(file.transport for file in self._files if isinstance(file, BonjourRegistrationFile)) missing_transports = supported_transports - registered_transports added_transports = set() if len(missing_transports) > 1 and 'udp' in missing_transports: missing_transports.remove('udp') for transport in missing_transports: notification_center.post_notification('BonjourAccountWillRegister', sender=self.account, data=NotificationData(transport=transport)) try: contact = self.account.contact[NoGRUU, transport] instance_id = str(uuid.UUID(settings.instance_id)) txtdata = dict(txtvers=1, name=self.account.display_name, contact="<%s>" % str(contact), instance_id=instance_id) state = self.account.presence_state if self.account.presence.enabled and state is not None: txtdata['state'] = state.state txtdata['note'] = state.note file = _bonjour.DNSServiceRegister(name=str(contact), regtype="_sipuri._%s" % (transport if transport == 'udp' else 'tcp'), port=contact.port, callBack=self._register_cb, txtRecord=_bonjour.TXTRecord(items=txtdata)) except (_bonjour.BonjourError, KeyError) as e: notification_center.post_notification('BonjourAccountRegistrationDidFail', sender=self.account, data=NotificationData(reason=str(e), transport=transport)) else: self._files.append(BonjourRegistrationFile(file, transport)) added_transports.add(transport) if added_transports: self._select_proc.kill(RestartSelect) if added_transports != missing_transports: - self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register', command.event)) + self._register_timer = reactor.callLater(10, self._command_channel.send, Command('register', command.event)) else: command.signal() def _CH_update_registrations(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None available_transports = settings.sip.transport_list old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile) and f.transport not in available_transports): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() update_failure = False for file in (f for f in self._files if isinstance(f, BonjourRegistrationFile)): try: contact = self.account.contact[NoGRUU, file.transport] instance_id = str(uuid.UUID(settings.instance_id)) txtdata = dict(txtvers=1, name=self.account.display_name, contact="<%s>" % str(contact), instance_id=instance_id) state = self.account.presence_state if self.account.presence.enabled and state is not None: txtdata['state'] = state.state txtdata['note'] = state.note _bonjour.DNSServiceUpdateRecord(file.file, None, flags=0, rdata=_bonjour.TXTRecord(items=txtdata), ttl=0) except (_bonjour.BonjourError, KeyError) as e: notification_center.post_notification('BonjourAccountRegistrationUpdateDidFail', sender=self.account, data=NotificationData(reason=str(e), transport=file.transport)) update_failure = True self._command_channel.send(Command('register')) if update_failure: self._update_timer = reactor.callLater(1, self._command_channel.send, Command('update_registrations', command.event)) else: command.signal() def _CH_discover(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._discover_timer is not None and self._discover_timer.active(): self._discover_timer.cancel() self._discover_timer = None supported_transports = set(transport for transport in settings.sip.transport_list if transport!='tls' or self.account.tls.certificate is not None) discoverable_transports = set('tcp' if transport=='tls' else transport for transport in supported_transports) old_files = [] for file in (f for f in self._files[:] if isinstance(f, (BonjourDiscoveryFile, BonjourResolutionFile)) and f.transport not in discoverable_transports): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() for service_description in [service for service, record in list(self._neighbours.items()) if record.uri.transport not in supported_transports]: record = self._neighbours.pop(service_description) notification_center.post_notification('BonjourAccountDidRemoveNeighbour', sender=self.account, data=NotificationData(neighbour=service_description, record=record)) discovered_transports = set(file.transport for file in self._files if isinstance(file, BonjourDiscoveryFile)) missing_transports = discoverable_transports - discovered_transports added_transports = set() for transport in missing_transports: notification_center.post_notification('BonjourAccountWillInitiateDiscovery', sender=self.account, data=NotificationData(transport=transport)) try: file = _bonjour.DNSServiceBrowse(regtype="_sipuri._%s" % transport, callBack=self._browse_cb) except _bonjour.BonjourError as e: notification_center.post_notification('BonjourAccountDiscoveryDidFail', sender=self.account, data=NotificationData(reason=str(e), transport=transport)) else: self._files.append(BonjourDiscoveryFile(file, transport)) added_transports.add(transport) if added_transports: self._select_proc.kill(RestartSelect) if added_transports != missing_transports: - self._discover_timer = reactor.callLater(1, self._command_channel.send, Command('discover', command.event)) + self._discover_timer = reactor.callLater(5, self._command_channel.send, Command('discover', command.event)) else: command.signal() def _CH_process_results(self, command): for file in (f for f in command.files if not f.closed): try: _bonjour.DNSServiceProcessResult(file.file) except: # Should we close the file? The documentation doesn't say anything about this. -Luci log.exception() for file in command.files: file.active = False self._files = [f for f in self._files if not f.closed] self._select_proc.kill(RestartSelect) def _CH_stop(self, command): if self._discover_timer is not None and self._discover_timer.active(): self._discover_timer.cancel() self._discover_timer = None if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None files = self._files neighbours = self._neighbours self._files = [] self._select_proc.kill(RestartSelect) self._neighbours = {} for file in files: file.close() notification_center = NotificationCenter() for neighbour, record in list(neighbours.items()): notification_center.post_notification('BonjourAccountDidRemoveNeighbour', sender=self.account, data=NotificationData(neighbour=neighbour, record=record)) for transport in set(file.transport for file in files): notification_center.post_notification('BonjourAccountRegistrationDidEnd', sender=self.account, data=NotificationData(transport=transport)) command.signal() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_NetworkConditionsDidChange(self, notification): if self._files: self.restart_discovery() self.restart_registration() class BonjourPresenceState(object): def __init__(self, state, note=None): self.state = state self.note = note or '' def __eq__(self, other): if isinstance(other, BonjourPresenceState): return self.state == other.state and self.note == other.note return NotImplemented def __ne__(self, other): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal diff --git a/sipsimple/account/publication.py b/sipsimple/account/publication.py index 268d1d23..d25649dd 100644 --- a/sipsimple/account/publication.py +++ b/sipsimple/account/publication.py @@ -1,390 +1,398 @@ """Implements the publisher handlers""" __all__ = ['Publisher', 'PresencePublisher', 'DialogPublisher'] import random from abc import ABCMeta, abstractproperty from threading import Lock from time import time from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit from application.python.types import MarkerType +from application.system import host as Host from eventlib import coros, proc from twisted.internet import reactor from zope.interface import implementer from sipsimple.core import FromHeader, Publication, PublicationETagError, RouteHeader, SIPURI, SIPCoreError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads.dialoginfo import DialogInfoDocument from sipsimple.payloads.pidf import PIDFDocument from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread Command.register_defaults('publish', refresh_interval=None) class SameState(metaclass=MarkerType): pass class SIPPublicationDidFail(Exception): def __init__(self, data): self.data = data class SIPPublicationDidNotEnd(Exception): def __init__(self, data): self.data = data class PublicationError(Exception): def __init__(self, error, retry_after, refresh_interval=None): self.error = error self.retry_after = retry_after self.refresh_interval = refresh_interval class PublisherNickname(dict): def __missing__(self, name): return self.setdefault(name, name[:-9] if name.endswith('Publisher') else name) def __get__(self, obj, objtype): return self[objtype.__name__] def __set__(self, obj, value): raise AttributeError('cannot set attribute') def __delete__(self, obj): raise AttributeError('cannot delete attribute') @implementer(IObserver) class Publisher(object, metaclass=ABCMeta): __nickname__ = PublisherNickname() __transports__ = frozenset(['tls', 'tcp', 'udp']) def __init__(self, account): self.account = account self.started = False self.active = False self.publishing = False self._lock = Lock() self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._publication = None self._dns_wait = 1 self._publish_wait = 1 self._publication_timer = None self.__dict__['state'] = None @abstractproperty def event(self): return None @abstractproperty def payload_type(self): return None @property def extra_headers(self): return [] @property def state(self): return self.__dict__['state'] @state.setter def state(self, state): if state is not None and not isinstance(state, self.payload_type.root_element): raise ValueError("state must be a %s document or None" % self.payload_type.root_element.__name__) with self._lock: old_state = self.__dict__['state'] self.__dict__['state'] = state if state == old_state: return self._publish(state) def start(self): if self.started: return self.started = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.post_notification(self.__class__.__name__ + 'WillStart', sender=self) notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account) notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) notification_center.post_notification(self.__class__.__name__ + 'DidStart', sender=self) notification_center.remove_observer(self, sender=self) def stop(self): if not self.started: return self.started = False self.active = False notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.post_notification(self.__class__.__name__ + 'WillEnd', sender=self) notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account) notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) notification_center.remove_observer(self, name='NetworkConditionsDidChange') command = Command('terminate') self._command_channel.send(command) command.wait() self._command_proc = None notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self) notification_center.post_notification(self.__class__.__name__ + 'DidEnd', sender=self) notification_center.remove_observer(self, sender=self) def activate(self): if not self.started: raise RuntimeError("not started") self.active = True self._command_channel.send(Command('publish', state=self.state)) notification_center = NotificationCenter() notification_center.post_notification(self.__class__.__name__ + 'DidActivate', sender=self) def deactivate(self): if not self.started: raise RuntimeError("not started") self.active = False self._command_channel.send(Command('unpublish')) notification_center = NotificationCenter() notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self) @run_in_twisted_thread def _publish(self, state): if not self.active: return if state is None: self._command_channel.send(Command('unpublish')) else: self._command_channel.send(Command('publish', state=state)) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_publish(self, command): if command.state is None or self._publication is None and command.state is SameState: command.signal() return notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._publication_timer is not None and self._publication_timer.active(): self._publication_timer.cancel() self._publication_timer = None if self._publication is None: duration = command.refresh_interval or self.account.sip.publish_interval from_header = FromHeader(self.account.uri, self.account.display_name) self._publication = Publication(from_header, self.event, self.payload_type.content_type, credentials=self.account.credentials, duration=duration, extra_headers=self.extra_headers) notification_center.add_observer(self, sender=self._publication) notification_center.post_notification(self.__class__.__name__ + 'WillPublish', sender=self, data=NotificationData(state=command.state, duration=duration)) else: notification_center.post_notification(self.__class__.__name__ + 'WillRefresh', sender=self, data=NotificationData(state=command.state)) try: + if Host.default_ip is None: + raise PublicationError('No IP address', retry_after=60) + # Lookup routes valid_transports = self.__transports__.intersection(settings.sip.transport_list) if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in valid_transports: uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport}) else: uri = SIPURI(host=self.account.id.domain) + lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, valid_transports).wait() except DNSLookupError as e: retry_after = random.uniform(self._dns_wait, 2*self._dns_wait) self._dns_wait = limit(2*self._dns_wait, max=30) raise PublicationError('DNS lookup failed: %s' % e, retry_after=retry_after) else: self._dns_wait = 1 body = None if command.state is SameState else command.state.toxml() # Publish by trying each route in turn publish_timeout = time() + 30 for route in routes: + if Host.default_ip is None: + raise PublicationError('No IP address', retry_after=60) + remaining_time = publish_timeout-time() if remaining_time > 0: try: try: self._publication.publish(body, RouteHeader(route.uri), timeout=limit(remaining_time, min=1, max=10)) except ValueError as e: # this happens for an initial PUBLISH with body=None raise PublicationError(str(e), retry_after=0) except PublicationETagError: state = self.state # access self.state only once to avoid race conditions if state is not None: self._publication.publish(state.toxml(), RouteHeader(route.uri), timeout=limit(remaining_time, min=1, max=10)) else: command.signal() return except SIPCoreError: raise PublicationError('Internal error', retry_after=5) try: while True: notification = self._data_channel.wait() if notification.name == 'SIPPublicationDidSucceed': break if notification.name == 'SIPPublicationDidEnd': raise PublicationError('Publication expired', retry_after=random.uniform(60, 120)) # publication expired while we were trying to re-publish except SIPPublicationDidFail as e: if e.data.code == 407: # Authentication failed, so retry the publication in some time raise PublicationError('Authentication failed', retry_after=random.uniform(60, 120)) elif e.data.code == 412: raise PublicationError('Conditional request failed', retry_after=0) elif e.data.code == 423: # Get the value of the Min-Expires header if e.data.min_expires is not None and e.data.min_expires > self.account.sip.publish_interval: refresh_interval = e.data.min_expires else: refresh_interval = None raise PublicationError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval) elif e.data.code in (405, 406, 489): raise PublicationError('Method or event not supported', retry_after=3600) else: # Otherwise just try the next route continue else: self.publishing = True self._publish_wait = 1 command.signal() break else: # There are no more routes to try, reschedule the publication retry_after = random.uniform(self._publish_wait, 2*self._publish_wait) self._publish_wait = limit(self._publish_wait*2, max=30) raise PublicationError('No more routes to try', retry_after=retry_after) except PublicationError as e: self.publishing = False notification_center.remove_observer(self, sender=self._publication) def publish(e): if self.active: self._command_channel.send(Command('publish', event=command.event, state=self.state, refresh_interval=e.refresh_interval)) else: command.signal() self._publication_timer = None self._publication_timer = reactor.callLater(e.retry_after, publish, e) self._publication = None notification_center.post_notification(self.__nickname__ + 'PublicationDidFail', sender=self, data=NotificationData(reason=e.error)) else: notification_center.post_notification(self.__nickname__ + 'PublicationDidSucceed', sender=self) def _CH_unpublish(self, command): # Cancel any timer which would restart the publication process if self._publication_timer is not None and self._publication_timer.active(): self._publication_timer.cancel() self._publication_timer = None publishing = self.publishing self.publishing = False if self._publication is not None: notification_center = NotificationCenter() if publishing: self._publication.end(timeout=2) try: while True: notification = self._data_channel.wait() if notification.name == 'SIPPublicationDidEnd': break except (SIPPublicationDidFail, SIPPublicationDidNotEnd): notification_center.post_notification(self.__nickname__ + 'PublicationDidNotEnd', sender=self) else: notification_center.post_notification(self.__nickname__ + 'PublicationDidEnd', sender=self) notification_center.remove_observer(self, sender=self._publication) self._publication = None command.signal() def _CH_terminate(self, command): self._CH_unpublish(command) raise proc.ProcExit @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPPublicationDidSucceed(self, notification): if notification.sender is self._publication: self._data_channel.send(notification) def _NH_SIPPublicationDidFail(self, notification): if notification.sender is self._publication: self._data_channel.send_exception(SIPPublicationDidFail(notification.data)) def _NH_SIPPublicationDidEnd(self, notification): if notification.sender is self._publication: self._data_channel.send(notification) def _NH_SIPPublicationDidNotEnd(self, notification): if notification.sender is self._publication: self._data_channel.send_exception(SIPPublicationDidNotEnd(notification.data)) def _NH_SIPPublicationWillExpire(self, notification): if notification.sender is self._publication: self._publish(SameState) @run_in_green_thread def _NH_CFGSettingsObjectDidChange(self, notification): if not self.started: return if 'enabled' in notification.data.modified: return # global account activation is handled separately by the account itself elif 'presence.enabled' in notification.data.modified: if self.account.presence.enabled: self.activate() else: self.deactivate() elif self.active and {'__id__', 'auth.password', 'auth.username', 'sip.outbound_proxy', 'sip.transport_list', 'sip.publish_interval'}.intersection(notification.data.modified): self._command_channel.send(Command('unpublish')) self._command_channel.send(Command('publish', state=self.state)) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._command_channel.send(Command('unpublish')) self._command_channel.send(Command('publish', state=self.state)) class PresencePublisher(Publisher): """A publisher for presence state""" @property def event(self): return 'presence' @property def payload_type(self): return PIDFDocument def _NH_PresencePublisherDidStart(self, notification): if self.account.presence.enabled: self.activate() class DialogPublisher(Publisher): """A publisher for dialog info state""" @property def event(self): return 'dialog' @property def payload_type(self): return DialogInfoDocument def _NH_DialogPublisherDidStart(self, notification): if self.account.presence.enabled: self.activate() diff --git a/sipsimple/account/registration.py b/sipsimple/account/registration.py index e61a77b9..4390c36a 100644 --- a/sipsimple/account/registration.py +++ b/sipsimple/account/registration.py @@ -1,316 +1,320 @@ """Implements the registration handler""" __all__ = ['Registrar'] import random from time import time from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit +from application.system import host as Host from eventlib import coros, proc from twisted.internet import reactor from zope.interface import implementer from sipsimple.core import ContactHeader, FromHeader, Header, Registration, RouteHeader, SIPURI, SIPCoreError, NoGRUU from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread Command.register_defaults('register', refresh_interval=None) class SIPRegistrationDidFail(Exception): def __init__(self, data): self.data = data class SIPRegistrationDidNotEnd(Exception): def __init__(self, data): self.data = data class RegistrationError(Exception): def __init__(self, error, retry_after, refresh_interval=None): self.error = error self.retry_after = retry_after self.refresh_interval = refresh_interval @implementer(IObserver) class Registrar(object): def __init__(self, account): self.account = account self.started = False self.active = False self.registered = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._registration = None self._dns_wait = 1 self._register_wait = 1 self._registration_timer = None def start(self): if self.started: return self.started = True notification_center = NotificationCenter() notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account) notification_center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) if self.account.sip.register: self.activate() def stop(self): if not self.started: return self.started = False self.active = False notification_center = NotificationCenter() notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account) notification_center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) notification_center.remove_observer(self, name='NetworkConditionsDidChange') command = Command('terminate') self._command_channel.send(command) command.wait() self._command_proc = None def activate(self): if not self.started: raise RuntimeError("not started") self.active = True self._command_channel.send(Command('register')) def deactivate(self): if not self.started: raise RuntimeError("not started") self.active = False self._command_channel.send(Command('unregister')) def reregister(self): if self.active: self._command_channel.send(Command('unregister')) self._command_channel.send(Command('register')) def _run(self): while True: command = self._command_channel.wait() #print('Registrar for %s got command %s' % (self.account.id, command.name)) handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_register(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._registration_timer is not None and self._registration_timer.active(): self._registration_timer.cancel() self._registration_timer = None try: + if Host.default_ip is None: + raise RegistrationError('No IP address', retry_after=60) + # Initialize the registration if self._registration is None: duration = command.refresh_interval or self.account.sip.register_interval try: self._registration = Registration(FromHeader(self.account.uri, self.account.display_name), credentials=self.account.credentials, duration=duration, extra_headers=[Header('Supported', 'gruu')]) except Exception as e: raise RegistrationError('Cannot create registration: %s' % str(e), retry_after=120) notification_center.add_observer(self, sender=self._registration) notification_center.post_notification('SIPAccountWillRegister', sender=self.account) else: notification_center.post_notification('SIPAccountRegistrationWillRefresh', sender=self.account) # Lookup routes if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport}) else: uri = SIPURI(host=self.account.id.domain) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: retry_after = int(random.uniform(self._dns_wait, 2*self._dns_wait)) self._dns_wait = limit(2*self._dns_wait, max=30) raise RegistrationError('DNS lookup failed: %s' % e, retry_after=retry_after) else: self._dns_wait = 1 # Register by trying each route in turn register_timeout = time() + 30 for route in routes: remaining_time = register_timeout-time() if remaining_time > 0: try: contact_uri = self.account.contact[NoGRUU, route] except KeyError: continue contact_header = ContactHeader(contact_uri) instance_id = '"<%s>"' % settings.instance_id contact_header.parameters[b"+sip.instance"] = instance_id.encode() if self.account.nat_traversal.use_ice: contact_header.parameters[b"+sip.ice"] = None route_header = RouteHeader(route.uri) try: self._registration.register(contact_header, route_header, timeout=limit(remaining_time, min=1, max=10)) except SIPCoreError: raise RegistrationError('Internal error', retry_after=5) try: while True: notification = self._data_channel.wait() if notification.name == 'SIPRegistrationDidSucceed': break if notification.name == 'SIPRegistrationDidEnd': raise RegistrationError('Registration expired', retry_after=int(random.uniform(60, 120))) # registration expired while we were trying to re-register except SIPRegistrationDidFail as e: notification_data = NotificationData(code=e.data.code, reason=e.data.reason, registration=self._registration, registrar=route) notification_center.post_notification('SIPAccountRegistrationGotAnswer', sender=self.account, data=notification_data) if e.data.code == 401: # Authentication failed, so retry the registration in some time raise RegistrationError('Authentication failed', retry_after=int(random.uniform(60, 120))) elif e.data.code == 408: # Timeout - raise RegistrationError('Request timeout', retry_after=int(random.uniform(60, 120))) + raise RegistrationError('Request timeout', retry_after=int(random.uniform(15, 40))) elif e.data.code == 423: # Get the value of the Min-Expires header if e.data.min_expires is not None and e.data.min_expires > self.account.sip.register_interval: refresh_interval = e.data.min_expires else: refresh_interval = None raise RegistrationError('Interval too short', retry_after=int(random.uniform(60, 120)), refresh_interval=refresh_interval) else: # Otherwise just try the next route #continue break else: notification_data = NotificationData(code=notification.data.code, reason=notification.data.reason, registration=self._registration, registrar=route) notification_center.post_notification('SIPAccountRegistrationGotAnswer', sender=self.account, data=notification_data) self.registered = True # Save GRUU try: header = next(header for header in notification.data.contact_header_list if header.parameters.get('+sip.instance', '').strip('"<>') == settings.instance_id) except StopIteration: self.account.contact.public_gruu = None self.account.contact.temporary_gruu = None else: public_gruu = header.parameters.get('pub-gruu', None) temporary_gruu = header.parameters.get('temp-gruu', None) try: self.account.contact.public_gruu = SIPURI.parse(public_gruu.strip('"')) except (AttributeError, SIPCoreError): self.account.contact.public_gruu = None try: self.account.contact.temporary_gruu = SIPURI.parse(temporary_gruu.strip('"')) except (AttributeError, SIPCoreError): self.account.contact.temporary_gruu = None notification_data = NotificationData(contact_header=notification.data.contact_header, contact_header_list=notification.data.contact_header_list, expires=notification.data.expires_in, registrar=route) notification_center.post_notification('SIPAccountRegistrationDidSucceed', sender=self.account, data=notification_data) self._register_wait = 1 command.signal() break else: # There are no more routes to try, reschedule the registration retry_after = int(random.uniform(self._register_wait, 2*self._register_wait)) self._register_wait = limit(self._register_wait*2, max=30) raise RegistrationError('No more routes to try', retry_after=retry_after) except RegistrationError as e: self.registered = False notification_center.discard_observer(self, sender=self._registration) notification_center.post_notification('SIPAccountRegistrationDidFail', sender=self.account, data=NotificationData(error=e.error, retry_after=e.retry_after)) def register(e): if self.active: self._command_channel.send(Command('register', command.event, refresh_interval=e.refresh_interval)) self._registration_timer = None self._registration_timer = reactor.callLater(e.retry_after, register, e) self._registration = None self.account.contact.public_gruu = None self.account.contact.temporary_gruu = None def _CH_unregister(self, command): # Cancel any timer which would restart the registration process if self._registration_timer is not None and self._registration_timer.active(): self._registration_timer.cancel() self._registration_timer = None registered = self.registered self.registered = False if self._registration is not None: notification_center = NotificationCenter() if registered: self._registration.end(timeout=2) try: while True: notification = self._data_channel.wait() if notification.name == 'SIPRegistrationDidEnd': break except (SIPRegistrationDidFail, SIPRegistrationDidNotEnd) as e: notification_center.post_notification('SIPAccountRegistrationDidNotEnd', sender=self.account, data=NotificationData(code=e.data.code, reason=e.data.reason, registration=self._registration)) else: notification_center.post_notification('SIPAccountRegistrationDidEnd', sender=self.account, data=NotificationData(registration=self._registration)) notification_center.remove_observer(self, sender=self._registration) self._registration = None self.account.contact.public_gruu = None self.account.contact.temporary_gruu = None command.signal() def _CH_terminate(self, command): self._CH_unregister(command) raise proc.ProcExit @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPRegistrationDidSucceed(self, notification): if notification.sender is self._registration: self._data_channel.send(notification) def _NH_SIPRegistrationDidFail(self, notification): if notification.sender is self._registration: self._data_channel.send_exception(SIPRegistrationDidFail(notification.data)) def _NH_SIPRegistrationDidEnd(self, notification): if notification.sender is self._registration: self._data_channel.send(notification) def _NH_SIPRegistrationDidNotEnd(self, notification): if notification.sender is self._registration: self._data_channel.send_exception(SIPRegistrationDidNotEnd(notification.data)) def _NH_SIPRegistrationWillExpire(self, notification): if self.active: self._command_channel.send(Command('register')) @run_in_green_thread def _NH_CFGSettingsObjectDidChange(self, notification): if not self.started: return if 'enabled' in notification.data.modified: return # global account activation is handled separately by the account itself elif 'sip.register' in notification.data.modified: if self.account.sip.register: self.activate() else: self.deactivate() elif self.active and {'__id__', 'auth.password', 'auth.username', 'nat_traversal.use_ice', 'sip.outbound_proxy', 'sip.transport_list', 'sip.register_interval'}.intersection(notification.data.modified): self._command_channel.send(Command('unregister')) self._command_channel.send(Command('register')) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._command_channel.send(Command('unregister')) self._command_channel.send(Command('register')) diff --git a/sipsimple/account/subscription.py b/sipsimple/account/subscription.py index 6b88aec7..a88dbbc9 100644 --- a/sipsimple/account/subscription.py +++ b/sipsimple/account/subscription.py @@ -1,494 +1,502 @@ """Implements the subscription handlers""" __all__ = ['Subscriber', 'MWISubscriber', 'PresenceWinfoSubscriber', 'DialogWinfoSubscriber', 'PresenceSubscriber', 'SelfPresenceSubscriber', 'DialogSubscriber'] import random from abc import ABCMeta, abstractproperty from time import time from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit +from application.system import host as Host from eventlib import coros, proc from twisted.internet import reactor from zope.interface import implementer from sipsimple.core import ContactHeader, FromHeader, Header, RouteHeader, SIPURI, Subscription, ToHeader, SIPCoreError, NoGRUU from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread Command.register_defaults('subscribe', refresh_interval=None) class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class SubscriptionError(Exception): def __init__(self, error, retry_after, refresh_interval=None): self.error = error self.retry_after = retry_after self.refresh_interval = refresh_interval class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class Content(object): def __init__(self, body, type): self.body = body self.type = type class SubscriberNickname(dict): def __missing__(self, name): return self.setdefault(name, name[:-10] if name.endswith('Subscriber') else name) def __get__(self, obj, objtype): return self[objtype.__name__] def __set__(self, obj, value): raise AttributeError('cannot set attribute') def __delete__(self, obj): raise AttributeError('cannot delete attribute') @implementer(IObserver) class Subscriber(object, metaclass=ABCMeta): __nickname__ = SubscriberNickname() __transports__ = frozenset(['tls', 'tcp', 'udp']) def __init__(self, account): self.account = account self.started = False self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None @abstractproperty def event(self): return None @property def subscription_uri(self): return self.account.id @property def content(self): return Content(None, None) @property def extra_headers(self): return [] def start(self): if self.started: return self.started = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.post_notification(self.__class__.__name__ + 'WillStart', sender=self) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) notification_center.post_notification(self.__class__.__name__ + 'DidStart', sender=self) notification_center.remove_observer(self, sender=self) def stop(self): if not self.started: return self.started = False self.active = False notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.post_notification(self.__class__.__name__ + 'WillEnd', sender=self) notification_center.remove_observer(self, name='NetworkConditionsDidChange') command = Command('terminate') self._command_channel.send(command) command.wait() self._command_proc = None notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self) notification_center.post_notification(self.__class__.__name__ + 'DidEnd', sender=self) notification_center.remove_observer(self, sender=self) def activate(self): if not self.started: raise RuntimeError("not started") self.active = True self._command_channel.send(Command('subscribe')) notification_center = NotificationCenter() notification_center.post_notification(self.__class__.__name__ + 'DidActivate', sender=self) def deactivate(self): if not self.started: raise RuntimeError("not started") self.active = False self._command_channel.send(Command('unsubscribe')) notification_center = NotificationCenter() notification_center.post_notification(self.__class__.__name__ + 'DidDeactivate', sender=self) def resubscribe(self): if self.active: self._command_channel.send(Command('subscribe')) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): self._CH_unsubscribe(command) raise proc.ProcExit def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() subscription_uri = self.subscription_uri refresh_interval = command.refresh_interval or self.account.sip.subscribe_interval valid_transports = self.__transports__.intersection(settings.sip.transport_list) try: + if Host.default_ip is None: + raise SubscriptionError('No IP address', retry_after=60) + # Lookup routes if self.account.sip.outbound_proxy is not None and self.account.sip.outbound_proxy.transport in valid_transports: uri = SIPURI(host=self.account.sip.outbound_proxy.host, port=self.account.sip.outbound_proxy.port, parameters={'transport': self.account.sip.outbound_proxy.transport}) elif self.account.sip.always_use_my_proxy: uri = SIPURI(host=self.account.id.domain) else: uri = SIPURI(host=subscription_uri.domain) + lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, valid_transports).wait() except DNSLookupError as e: raise SubscriptionError('DNS lookup failed: %s' % e, retry_after=random.uniform(15, 30)) subscription_uri = SIPURI(user=subscription_uri.username, host=subscription_uri.domain) content = self.content timeout = time() + 30 for route in routes: + if Host.default_ip is None: + raise SubscriptionError('No IP address', retry_after=60) + remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = self.account.contact[NoGRUU, route] except KeyError: continue subscription = Subscription(subscription_uri, FromHeader(self.account.uri, self.account.display_name), ToHeader(subscription_uri), ContactHeader(contact_uri), self.event.encode(), RouteHeader(route.uri), credentials=self.account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(body=content.body, content_type=content.type, extra_headers=self.extra_headers, timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) raise SubscriptionError('Internal error', retry_after=5) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail as e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time raise SubscriptionError('Authentication failed', retry_after=random.uniform(60, 120)) elif e.data.code == 423: # Get the value of the Min-Expires header if e.data.min_expires is not None and e.data.min_expires > self.account.sip.subscribe_interval: refresh_interval = e.data.min_expires else: refresh_interval = None raise SubscriptionError('Interval too short', retry_after=random.uniform(60, 120), refresh_interval=refresh_interval) elif e.data.code in (405, 406, 489): raise SubscriptionError('Method or event not supported', retry_after=3600) elif e.data.code == 1400: raise SubscriptionError(e.data.reason, retry_after=3600) else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription raise SubscriptionError('No more routes to try', retry_after=random.uniform(60, 180)) # At this point it is subscribed. Handle notifications and ending/failures. notification_center.post_notification(self.__nickname__ + 'SubscriptionDidStart', sender=self) try: while True: notification = self._data_channel.wait() if notification.name == 'SIPSubscriptionGotNotify': notification_center.post_notification(self.__nickname__ + 'SubscriptionGotNotify', sender=self, data=notification.data) elif notification.name == 'SIPSubscriptionDidEnd': notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='remote')) if self.active: self._command_channel.send(Command('subscribe')) break except SIPSubscriptionDidFail: notification_center.post_notification(self.__nickname__ + 'SubscriptionDidFail', sender=self) if self.active: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) except InterruptSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass finally: notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) except TerminateSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) notification_center.post_notification(self.__nickname__ + 'SubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) except SubscriptionError as e: def subscribe(e): if self.active: self._command_channel.send(Command('subscribe', command.event, refresh_interval=e.refresh_interval)) self._subscription_timer = None self._subscription_timer = reactor.callLater(e.retry_after, subscribe, e) notification_center.post_notification(self.__nickname__ + 'SubscriptionDidFail', sender=self) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): if notification.sender is self._subscription: self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): if notification.sender is self._subscription: self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): if notification.sender is self._subscription: self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): if notification.sender is self._subscription: self._data_channel.send(notification) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._command_channel.send(Command('subscribe')) class MWISubscriber(Subscriber): """Message Waiting Indicator subscriber""" @property def event(self): return 'message-summary' @property def subscription_uri(self): return self.account.message_summary.voicemail_uri or self.account.id def _NH_MWISubscriberWillStart(self, notification): notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account) notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) def _NH_MWISubscriberWillEnd(self, notification): notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account) notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) def _NH_MWISubscriberDidStart(self, notification): if self.account.message_summary.enabled: self.activate() @run_in_green_thread def _NH_CFGSettingsObjectDidChange(self, notification): if not self.started: return if 'enabled' in notification.data.modified: return # global account activation is handled separately by the account itself elif 'message_summary.enabled' in notification.data.modified: if self.account.message_summary.enabled: self.activate() else: self.deactivate() elif self.active and {'__id__', 'auth.password', 'auth.username', 'message_summary.voicemail_uri', 'sip.always_use_my_proxy', 'sip.outbound_proxy', 'sip.subscribe_interval', 'sip.transport_list'}.intersection(notification.data.modified): self._command_channel.send(Command('subscribe')) class AbstractPresenceSubscriber(Subscriber): """Abstract class defining behavior for all presence subscribers""" __transports__ = frozenset(['tls', 'tcp']) def _NH_AbstractPresenceSubscriberWillStart(self, notification): notification.center.add_observer(self, name='SIPAccountDidDiscoverXCAPSupport', sender=self.account) notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=self.account) notification.center.add_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) def _NH_AbstractPresenceSubscriberWillEnd(self, notification): notification.center.remove_observer(self, name='SIPAccountDidDiscoverXCAPSupport', sender=self.account) notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=self.account) notification.center.remove_observer(self, name='CFGSettingsObjectDidChange', sender=SIPSimpleSettings()) def _NH_AbstractPresenceSubscriberDidStart(self, notification): if self.account.presence.enabled and self.account.xcap.discovered: self.activate() def _NH_SIPAccountDidDiscoverXCAPSupport(self, notification): if self.account.presence.enabled and not self.active: self.activate() @run_in_green_thread def _NH_CFGSettingsObjectDidChange(self, notification): if not self.started or not self.account.xcap.discovered: return if 'enabled' in notification.data.modified: return # global account activation is handled separately by the account itself elif 'presence.enabled' in notification.data.modified: if self.account.presence.enabled: self.activate() else: self.deactivate() elif self.active and {'__id__', 'auth.password', 'auth.username', 'sip.always_use_my_proxy', 'sip.outbound_proxy', 'sip.subscribe_interval', 'sip.transport_list'}.intersection(notification.data.modified): self._command_channel.send(Command('subscribe')) class PresenceWinfoSubscriber(AbstractPresenceSubscriber): """Presence Watcher Info subscriber""" _NH_PresenceWinfoSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart _NH_PresenceWinfoSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd _NH_PresenceWinfoSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart @property def event(self): return 'presence.winfo' class DialogWinfoSubscriber(AbstractPresenceSubscriber): """Dialog Watcher Info subscriber""" _NH_DialogWinfoSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart _NH_DialogWinfoSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd _NH_DialogWinfoSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart @property def event(self): return 'dialog.winfo' class PresenceSubscriber(AbstractPresenceSubscriber): """Presence subscriber""" _NH_PresenceSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart _NH_PresenceSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd _NH_PresenceSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart @property def event(self): return 'presence' @property def subscription_uri(self): return self.account.xcap_manager.rls_presence_uri @property def extra_headers(self): return [Header('Supported', 'eventlist')] class SelfPresenceSubscriber(AbstractPresenceSubscriber): """Self presence subscriber""" _NH_SelfPresenceSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart _NH_SelfPresenceSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd _NH_SelfPresenceSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart @property def event(self): return 'presence' @property def subscription_uri(self): return self.account.id class DialogSubscriber(AbstractPresenceSubscriber): """Dialog subscriber""" _NH_DialogSubscriberWillStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillStart _NH_DialogSubscriberWillEnd = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberWillEnd _NH_DialogSubscriberDidStart = AbstractPresenceSubscriber._NH_AbstractPresenceSubscriberDidStart @property def event(self): return 'dialog' @property def subscription_uri(self): return self.account.xcap_manager.rls_dialog_uri @property def extra_headers(self): return [Header('Supported', 'eventlist')]