diff --git a/sylk-server b/sylk-server index 2528db6..bd83d85 100644 --- a/sylk-server +++ b/sylk-server @@ -1,117 +1,117 @@ #!/usr/bin/env python # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # import os import signal import sys from application import log from application.configuration import ConfigFile from application.process import process, ProcessError from optparse import OptionParser import sipsimple import sylk def main(): name = 'sylk-server' fullname = 'SylkServer' runtime_directory = '/var/run/sylkserver' system_config_directory = '/etc/sylkserver' default_pid = os.path.join(runtime_directory, 'server.pid') default_config = sylk.configuration_filename if os.path.isfile(sylk.configuration_filename) else os.path.join(system_config_directory, sylk.configuration_filename) parser = OptionParser(version='%%prog %s' % sylk.__version__) parser.add_option('--no-fork', action='store_false', dest='fork', default=1, help='run the process in the foreground (for debugging)') parser.add_option('--pid', dest='pid_file', help='pid file ("%s")' % default_pid, metavar='File') parser.add_option('--config-file', dest='config_file', default=default_config, help='path to configuration file to read ("%s")' % default_config, metavar='File') parser.add_option('--enable-bonjour', action='store_true', dest='enable_bonjour', default=False, help='enable Bonjour services') parser.add_option('--debug-memory', action='store_true', dest='debug_memory', default=False, help='enable memory debugging (works only if --no-fork is specified)') (options, args) = parser.parse_args() path, configuration_filename = os.path.split(options.config_file) if path: system_config_directory = path process.system_config_directory = system_config_directory sylk.configuration_filename = process.config_file(options.config_file) pid_file = options.pid_file or default_pid # when run in foreground, do not require root access because of /var/run/sylkserver if not options.fork: process._runtime_directory = None else: try: process.runtime_directory = runtime_directory process.daemonize(pid_file) - except ProcessError, e: + except ProcessError as e: log.fatal('Cannot start {name}: {exception!s}'.format(name=fullname, exception=e)) sys.exit(1) log.start_syslog(name) from sylk.server import SylkServer log.info('Starting {name} {module.__version__}, using SIP SIMPLE SDK {sipsimple.__version__}'.format(name=fullname, module=sylk, sipsimple=sipsimple)) config_file = ConfigFile(sylk.configuration_filename) if config_file.files: log.info('Reading configuration from {}'.format(', '.join(config_file.files))) else: log.info('Not reading any configuration files (using internal defaults)') if not options.fork and options.debug_memory: import atexit from application.debug.memory import memory_dump atexit.register(memory_dump) server = SylkServer() def stop_server(*args): if not server.stopping_event.is_set(): log.info('Stopping {name}...'.format(name=fullname)) server.stop() process.signals.add_handler(signal.SIGTERM, stop_server) process.signals.add_handler(signal.SIGINT, stop_server) def toggle_debugging(*args): from sylk.configuration import ServerConfig if log.level.current != log.level.DEBUG: log.level.current = log.level.DEBUG log.info('Switched logging level to DEBUG') else: log.info('Switched logging level to {}'.format(ServerConfig.log_level)) log.level.current = ServerConfig.log_level process.signals.add_handler(signal.SIGUSR1, toggle_debugging) try: server.start(options) - except Exception, e: + except Exception as e: log.fatal('Failed to run {name}: {exception!s}'.format(name=fullname, exception=e)) log.exception() sys.exit(1) else: while not server.stopping_event.wait(9999): pass server.stop_event.wait(5) if server.stop_event.is_set(): log.info('{name} stopped'.format(name=fullname)) else: log.info('Forcefully exiting {name}...'.format(name=fullname)) os._exit(1) sys.exit(int(server.failed)) if __name__ == "__main__": main() diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py index ccd683b..8a66b36 100644 --- a/sylk/applications/conference/__init__.py +++ b/sylk/applications/conference/__init__.py @@ -1,411 +1,411 @@ import os import re import shutil from application.notification import IObserver, NotificationCenter from application.python import Null from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError from sipsimple.core import Header, FromHeader, ToHeader, SubjectHeader from sipsimple.lookup import DNSLookup from sipsimple.streams import MediaStreamRegistry from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications import SylkApplication from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.logger import log from sylk.applications.conference.room import Room from sylk.applications.conference.web import ConferenceWeb from sylk.bonjour import BonjourService from sylk.configuration import ServerConfig, ThorNodeConfig from sylk.session import Session, IllegalStateError from sylk.web import server as web_server class ACLValidationError(Exception): pass class RoomNotFoundError(Exception): pass class ConferenceApplication(SylkApplication): implements(IObserver) def __init__(self): self._rooms = {} self.invited_participants_map = {} self.bonjour_focus_service = Null self.bonjour_room_service = Null self.web = Null def start(self): self.web = ConferenceWeb(self) web_server.register_resource('conference', self.web.resource()) # cleanup old files for path in (ConferenceConfig.file_transfer_dir, ConferenceConfig.screensharing_images_dir): try: shutil.rmtree(path) except EnvironmentError: pass if ServerConfig.enable_bonjour and ServerConfig.default_application == 'conference': self.bonjour_focus_service = BonjourService(service='sipfocus') self.bonjour_focus_service.start() log.info("Bonjour publication started for service 'sipfocus'") self.bonjour_room_service = BonjourService(service='sipuri', name='Conference Room', uri_user='conference') self.bonjour_room_service.start() self.bonjour_room_service.presence_state = BonjourPresenceState('available', u'No participants') log.info("Bonjour publication started for service 'sipuri'") def stop(self): self.bonjour_focus_service.stop() self.bonjour_room_service.stop() def get_room(self, uri, create=False): room_uri = '%s@%s' % (uri.user, uri.host) try: room = self._rooms[room_uri] except KeyError: if create: room = Room(room_uri) self._rooms[room_uri] = room return room else: raise RoomNotFoundError else: return room def remove_room(self, uri): room_uri = '%s@%s' % (uri.user, uri.host) self._rooms.pop(room_uri, None) def validate_acl(self, room_uri, from_uri): room_uri = '%s@%s' % (room_uri.user, room_uri.host) cfg = get_room_config(room_uri) if cfg.access_policy == 'allow,deny': if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri): return raise ACLValidationError else: if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri): raise ACLValidationError def incoming_session(self, session): log.info('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer'] if not audio_streams and not chat_streams and not transfer_streams: log.info(u'Session rejected: invalid media, only RTP audio and MSRP chat are supported') session.reject(488) return audio_stream = audio_streams[0] if audio_streams else None chat_stream = chat_streams[0] if chat_streams else None transfer_stream = transfer_streams[0] if transfer_streams else None try: self.validate_acl(session.request_uri, session.remote_identity.uri) except ACLValidationError: log.info(u'Session rejected: unauthorized by access list') session.reject(403) return if transfer_stream is not None: try: room = self.get_room(session.request_uri) except RoomNotFoundError: log.info(u'Session rejected: room not found') session.reject(404) return if transfer_stream.direction == 'sendonly': # file transfer 'pull' try: file = next(file for file in room.files if file.hash == transfer_stream.file_selector.hash) except StopIteration: log.info(u'Session rejected: requested file not found') session.reject(404) return try: transfer_stream.file_selector = file.file_selector - except EnvironmentError, e: + except EnvironmentError as e: log.info(u'Session rejected: error opening requested file: %s' % e) session.reject(404) return else: transfer_stream.handler.save_directory = os.path.join(ConferenceConfig.file_transfer_dir.normalized, room.uri) NotificationCenter().add_observer(self, sender=session) if audio_stream: session.send_ring_indication() streams = [stream for stream in (audio_stream, chat_stream, transfer_stream) if stream] reactor.callLater(4 if audio_stream is not None else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return if subscribe_request.event != 'conference': log.info(u'Subscription for event %s rejected: only conference event is supported' % subscribe_request.event) subscribe_request.reject(489) return try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: try: self.validate_acl(to_header.uri, from_header.uri) except ACLValidationError: # Check if we need to skip the ACL because this was an invited participant if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})): log.info(u'Subscription rejected: unauthorized by access list') subscribe_request.reject(403) return try: room = self.get_room(data.request_uri) except RoomNotFoundError: try: room = self.get_room(to_header.uri) except RoomNotFoundError: log.info(u'Subscription rejected: room not yet created') subscribe_request.reject(480) return if not room.started: log.info(u'Subscription rejected: room not started yet') subscribe_request.reject(480) else: room.handle_incoming_subscription(subscribe_request, data) def incoming_referral(self, refer_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) refer_to_header = data.headers.get('Refer-To', Null) if Null in (from_header, to_header, refer_to_header): refer_request.reject(400) return log.info(u'Room %s - join request from %s to %s' % ('%s@%s' % (to_header.uri.user, to_header.uri.host), from_header.uri, refer_to_header.uri)) try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: log.info(u'Room %s - invite participant request rejected: unauthorized by access list' % data.request_uri) refer_request.reject(403) return referral_handler = IncomingReferralHandler(refer_request, data) referral_handler.start() def incoming_message(self, message_request, data): log.info(u'SIP MESSAGE is not supported, use MSRP media instead') message_request.answer(405) def accept_session(self, session, streams): if session.state == 'incoming': try: session.accept(streams, is_focus=True) except IllegalStateError: pass def add_participant(self, session, room_uri): # Keep track of the invited participants, we must skip ACL policy # for SUBSCRIBE requests room_uri_str = '%s@%s' % (room_uri.user, room_uri.host) log.info(u'Room %s - outgoing session to %s started' % (room_uri_str, session.remote_identity.uri)) d = self.invited_participants_map.setdefault(room_uri_str, {}) d.setdefault(str(session.remote_identity.uri), 0) d[str(session.remote_identity.uri)] += 1 NotificationCenter().add_observer(self, sender=session) room = self.get_room(room_uri, True) room.start() room.add_session(session) def remove_participant(self, participant_uri, room_uri): try: room = self.get_room(room_uri) except RoomNotFoundError: pass else: log.info('Room %s - %s removed from conference' % (room_uri, participant_uri)) room.terminate_sessions(participant_uri) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender room = self.get_room(session.request_uri, True) room.start() room.add_session(session) @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) if session.direction == 'incoming': room_uri = session.request_uri else: # Clear invited participants mapping room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host) d = self.invited_participants_map[room_uri_str] d[str(session.remote_identity.uri)] -= 1 if d[str(session.remote_identity.uri)] == 0: del d[str(session.remote_identity.uri)] room_uri = session.local_identity.uri # We could get this notifiction even if we didn't get SIPSessionDidStart try: room = self.get_room(room_uri) except RoomNotFoundError: return if session in room.sessions: room.remove_session(session) if not room.stopping and room.empty: self.remove_room(room_uri) room.stop() def _NH_SIPSessionDidFail(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) log.info(u'Session from %s failed: %s' % (session.remote_identity.uri, notification.data.reason)) class IncomingReferralHandler(object): implements(IObserver) def __init__(self, refer_request, data): self._refer_request = refer_request self._refer_headers = data.headers self.room_uri = data.request_uri self.room_uri_str = '%s@%s' % (self.room_uri.user, self.room_uri.host) self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri) self.method = data.headers.get('Refer-To').parameters.get('method', 'INVITE').upper() self.session = None self.streams = [] def start(self): if not self.refer_to_uri.startswith(('sip:', 'sips:')): self.refer_to_uri = 'sip:%s' % self.refer_to_uri try: self.refer_to_uri = SIPURI.parse(self.refer_to_uri) except SIPCoreError: log.info('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.reject(488) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._refer_request) if self.method == 'INVITE': self._refer_request.accept() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.refer_to_uri lookup = DNSLookup() notification_center.add_observer(self, sender=lookup) lookup.lookup_sip_proxy(uri, settings.sip.transport_list) elif self.method == 'BYE': log.info('Room %s - %s removed %s from the room' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self._refer_request.accept() conference_application = ConferenceApplication() conference_application.remove_participant(self.refer_to_uri, self.room_uri) self._refer_request.end(200) else: self._refer_request.reject(488) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) account = DefaultAccount() conference_application = ConferenceApplication() try: room = conference_application.get_room(self.room_uri) except RoomNotFoundError: log.info('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return active_media = set(room.active_media).intersection(('audio', 'chat')) if not active_media: log.info('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return for stream_type in active_media: self.streams.append(MediaStreamRegistry.get(stream_type)()) self.session = Session(account) notification_center.add_observer(self, sender=self.session) original_from_header = self._refer_headers.get('From') if original_from_header.display_name: original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host) else: original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host) from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call') to_header = ToHeader(self.refer_to_uri) extra_headers = [] if self._refer_headers.get('Referred-By', None) is not None: extra_headers.append(Header.new(self._refer_headers.get('Referred-By'))) else: extra_headers.append(Header('Referred-By', str(original_from_header.uri))) if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(original_from_header.uri))) extra_headers.append(SubjectHeader(u'Join conference request from %s' % original_identity)) route = notification.data.result[0] self.session.connect(from_header, to_header, route=route, streams=self.streams, is_focus=True, extra_headers=extra_headers) def _NH_DNSLookupDidFail(self, notification): notification.center.remove_observer(self, sender=notification.sender) def _NH_SIPSessionGotRingIndication(self, notification): if self._refer_request is not None: self._refer_request.send_notify(180) def _NH_SIPSessionGotProvisionalResponse(self, notification): if self._refer_request is not None: self._refer_request.send_notify(notification.data.code, notification.data.reason) def _NH_SIPSessionDidStart(self, notification): notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) conference_application = ConferenceApplication() conference_application.add_participant(self.session, self.room_uri) log.info('Room %s - %s added %s' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self.session = None self.streams = [] def _NH_SIPSessionDidFail(self, notification): log.info('Room %s - failed to add %s: %s' % (self.room_uri_str, self.refer_to_uri, notification.data.reason)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(notification.data.code or 500, notification.data.reason or notification.data.code) self.session = None self.streams = [] def _NH_SIPSessionDidEnd(self, notification): # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead log.info('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) self.session = None self.streams = [] def _NH_SIPIncomingReferralDidEnd(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._refer_request = None diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py index 5681f6d..8a1550a 100644 --- a/sylk/applications/conference/room.py +++ b/sylk/applications/conference/room.py @@ -1,1079 +1,1079 @@ import os import random import shutil import string import weakref from collections import Counter, deque from glob import glob from itertools import chain, count, cycle from application.notification import IObserver, NotificationCenter from application.python import Null from application.system import makedirs from eventlib import api, coros, proc from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.application import SIPApplication from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPCoreError, SIPCoreInvalidStateError, SIPURI from sipsimple.core import Header, FromHeader, ToHeader, SubjectHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import conference from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.msrp.chat import ChatIdentity, CPIMHeader, CPIMNamespace from sipsimple.streams.msrp.filetransfer import FileSelector from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.logger import log from sylk.bonjour import BonjourService from sylk.configuration import ServerConfig, ThorNodeConfig from sylk.configuration.datatypes import URL from sylk.resources import Resources from sylk.session import Session, IllegalStateError from sylk.web import server as web_server def format_identity(identity): uri = identity.uri if identity.display_name: return u'%s <%s@%s>' % (identity.display_name, uri.user, uri.host) else: return u'%s@%s' % (uri.user, uri.host) class ScreenImage(object): def __init__(self, room, sender): self.room = weakref.ref(room) self.room_uri = room.uri self.sender = sender self.filename = os.path.join(ConferenceConfig.screensharing_images_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10)))) self.url = URL(web_server.url + '/conference/' + room.uri + '/screensharing') self.url.query_items['image'] = os.path.basename(self.filename) self.state = None self.timer = None @property def active(self): return self.state == 'active' @property def idle(self): return self.state == 'idle' @run_in_thread('file-io') def save(self, image): makedirs(os.path.dirname(self.filename)) tmp_filename = self.filename + '.tmp' try: with open(tmp_filename, 'wb') as file: file.write(image) - except EnvironmentError, e: + except EnvironmentError as e: log.info('Room %s - cannot write screen sharing image: %s: %s' % (self.room_uri, self.filename, e)) else: try: os.rename(tmp_filename, self.filename) except EnvironmentError: pass self.advertise() @run_in_twisted_thread def advertise(self): if self.state == 'active': self.timer.reset(10) else: if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'active' self.timer = reactor.callLater(10, self.stop_advertising) room = self.room() or Null room.dispatch_conference_info() txt = 'Room %s - %s is sharing the screen at %s' % (self.room_uri, format_identity(self.sender), self.url) room.dispatch_server_message(txt) log.info(txt) @run_in_twisted_thread def stop_advertising(self): if self.state != 'idle': if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'idle' self.timer = None room = self.room() or Null room.dispatch_conference_info() txt = '%s stopped sharing the screen' % format_identity(self.sender) room.dispatch_server_message(txt) log.info(txt) class Room(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ implements(IObserver) def __init__(self, uri): self.config = get_room_config(uri) self.uri = uri self.identity = ChatIdentity(SIPURI.parse('sip:%s' % self.uri), display_name='Conference Room') self.files = [] self.screen_images = {} self.sessions = [] self.subscriptions = [] self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.moh_player = None self.conference_info_payload = None self.conference_info_version = count(1) self.bonjour_services = Null self.session_nickname_map = {} self.last_nicknames_map = {} self.participants_counter = Counter() self.history = deque(maxlen=ConferenceConfig.history_size) @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' @property def stopping(self): return self.state in ('stopping', 'stopped') @property def active_media(self): return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams)))) @property def conference_info(self): if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent) conference_description.conf_uris = conference.ConfUris() conference_description.conf_uris.add(conference.ConfUrisEntry('sip:%s' % self.uri, purpose='participation')) if self.config.advertise_xmpp_support: conference_description.conf_uris.add(conference.ConfUrisEntry('xmpp:%s' % self.uri, purpose='participation')) # TODO: add grouptextchat service uri for number in self.config.pstn_access_numbers: conference_description.conf_uris.add(conference.ConfUrisEntry('tel:%s' % number, purpose='participation')) host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com')) self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users()) self.conference_info_payload.version = next(self.conference_info_version) user_count = len(self.participants_counter) self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True) users = conference.Users() for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')): try: user = next(user for user in users if user.entity == str(session.remote_identity.uri)) except StopIteration: display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name) user = conference.User(str(session.remote_identity.uri), display_text=display_text) user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host) screen_image = self.screen_images.get(user_uri, None) if screen_image is not None and screen_image.active: user.screen_image_url = screen_image.url users.add(user) joining_info = conference.JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected') display_text = self.session_nickname_map.get(session, session.remote_identity.display_name) endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status) for stream in session.streams: if stream.type == 'file-transfer': continue endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream))) user.add(endpoint) self.conference_info_payload.users = users if self.files: files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, 'OK') for file in self.files) self.conference_info_payload.conference_description.resources = conference.Resources(files=files) return self.conference_info_payload.toxml() def start(self): if self.started: return if ServerConfig.enable_bonjour and self.identity.uri.user != 'conference': room_user = self.identity.uri.user self.bonjour_services = BonjourService(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user) self.bonjour_services.start() self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.moh_player = MoHPlayer(self.audio_conference) self.moh_player.start() self.state = 'started' def stop(self): if not self.started: return self.state = 'stopping' self.bonjour_services.stop() self.bonjour_services = None self.incoming_message_queue.send_exception(api.GreenletExit) self.incoming_message_queue = None self.message_dispatcher.kill(proc.ProcExit) self.message_dispatcher = None self.moh_player.stop() self.moh_player = None self.audio_conference = None notification_center = NotificationCenter() for subscription in self.subscriptions: notification_center.remove_observer(self, sender=subscription) subscription.end() self.subscriptions = [] self.cleanup_files() self.conference_info_payload = None self.state = 'stopped' @run_in_thread('file-io') def cleanup_files(self): path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass path = os.path.join(ConferenceConfig.screensharing_images_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'message': message = data.message if message.sender.uri != session.remote_identity.uri: continue if message.content.startswith('?OTR:'): continue if message.timestamp is None: message.timestamp = ISOTimestamp.utcnow() message.sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), message.sender.display_name) recipient = message.recipients[0] private = len(message.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri if private: self.dispatch_private_message(session, message) else: self.history.append(message) self.dispatch_message(session, message) elif message_type == 'composing_indication': if data.sender.uri != session.remote_identity.uri: continue recipient = data.recipients[0] private = len(data.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri if private: self.dispatch_private_iscomposing(session, data) else: self.dispatch_iscomposing(session, data) def dispatch_message(self, session, message): for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers) def dispatch_private_message(self, session, message): # Private messages are delivered to all sessions matching the recipient but also to the sender, # for replication in clients recipient = message.recipients[0] for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers) def dispatch_iscomposing(self, session, data): identity = ChatIdentity(session.remote_identity.uri, session.remote_identity.display_name) for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_composing_indication(data.state, data.refresh, sender=identity, recipients=[self.identity]) def dispatch_private_iscomposing(self, session, data): identity = ChatIdentity(session.remote_identity.uri, session.remote_identity.display_name) recipient_uri = data.recipients[0].uri for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_composing_indication(data.state, data.refresh, sender=identity) def dispatch_server_message(self, content, content_type='text/plain', exclude=None): ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp') message_type = CPIMHeader('Message-Type', ns, 'status') for session in (session for session in self.sessions if session is not exclude): try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(content, content_type, sender=self.identity, recipients=[self.identity], additional_headers=[message_type]) def dispatch_conference_info(self): data = self.conference_info for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(conference.ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def dispatch_file(self, file): sender_uri = file.sender.uri for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)): handler = FileTransferHandler(self) handler.init_outgoing(uri, file) def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] += 1 try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams if stream.type == 'audio') except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.info(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, audio_stream.codec, audio_stream.sample_rate, audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) if audio_stream.encryption.type != 'ZRTP': # We don't listen for stream notifications early enough if audio_stream.encryption.active: log.info(u'Room %s - %s audio stream enabled %s encryption' % (self.uri, format_identity(session.remote_identity), audio_stream.encryption.type)) else: log.info(u'Room %s - %s audio stream did not enable encryption' % (self.uri, format_identity(session.remote_identity))) try: transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer') except StopIteration: pass else: transfer_handler = FileTransferHandler(self) transfer_handler.init_incoming(transfer_stream) if transfer_stream.direction == 'recvonly': filename = os.path.basename(os.path.splitext(transfer_stream.file_selector.name)[0]) txt = u'Room %s - %s is uploading file %s (%s)' % (self.uri, format_identity(session.remote_identity), filename,self.format_file_size(transfer_stream.file_selector.size)) else: filename = os.path.basename(transfer_stream.file_selector.name) txt = u'Room %s - %s requested file %s' % (self.uri, format_identity(session.remote_identity), filename) log.info(txt) self.dispatch_server_message(txt) if len(session.streams) == 1: return welcome_handler = WelcomeHandler(self, initial=True, session=session, streams=session.streams) welcome_handler.run() self.dispatch_conference_info() if len(self.sessions) == 1: log.info(u'Room %s - started by %s with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams))) else: log.info(u'Room %s - %s joined with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session) if ServerConfig.enable_bonjour: self._update_bonjour_presence() def remove_session(self, session): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.session_nickname_map.pop(session, None) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] -= 1 if self.participants_counter[remote_uri] == 0: del self.participants_counter[remote_uri] self.last_nicknames_map.pop(remote_uri, None) try: chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat') except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio') except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() try: next(stream for stream in session.streams if stream.type == 'file-transfer') except StopIteration: pass else: if len(session.streams) == 1: return self.dispatch_conference_info() log.info(u'Room %s - %s left conference after %s' % (self.uri, format_identity(session.remote_identity), self.format_session_duration(session))) if not self.sessions: log.info(u'Room %s - Last participant left conference' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session))) if ServerConfig.enable_bonjour: self._update_bonjour_presence() def terminate_sessions(self, uri): if not self.started: return for session in (session for session in self.sessions if session.remote_identity.uri == uri): session.end() def handle_incoming_subscription(self, subscribe_request, data): log.info('Room %s - subscription from %s' % (self.uri, data.headers['From'].uri)) if subscribe_request.event != 'conference': log.info('Room %s - Subscription for event %s rejected: only conference event is supported' % (self.uri, subscribe_request.event)) subscribe_request.reject(489) return NotificationCenter().add_observer(self, sender=subscribe_request) self.subscriptions.append(subscribe_request) try: subscribe_request.accept(conference.ConferenceDocument.content_type, self.conference_info) - except SIPCoreError, e: + except SIPCoreError as e: log.warning('Error accepting SIP subscription: %s' % e) subscribe_request.end() def _accept_proposal(self, session, streams): try: session.accept_proposal(streams) except IllegalStateError: pass session.proposal_timer = None def add_file(self, file): self.dispatch_server_message('%s has uploaded file %s (%s)' % (format_identity(file.sender), os.path.basename(file.name), self.format_file_size(file.size))) self.files.append(file) self.dispatch_conference_info() if ConferenceConfig.push_file_transfer: self.dispatch_file(file) def add_screen_image(self, sender, image): sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host) screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender)) screen_image.save(image) def _update_bonjour_presence(self): num = len(self.sessions) if num == 0: num_str = 'No' elif num == 1: num_str = 'One' elif num == 2: num_str = 'Two' else: num_str = str(num) txt = u'%s participant%s' % (num_str, '' if num==1 else 's') presence_state = BonjourPresenceState('available', txt) if self.bonjour_services is Null: # This is the room being published all the time from sylk.applications.conference import ConferenceApplication ConferenceApplication().bonjour_room_service.presence_state = presence_state else: self.bonjour_services.presence_state = presence_state @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPStreamDidEnableEncryption(self, notification): stream = notification.sender session = stream.session log.info(u'Room %s - %s %s stream enabled %s encryption' % (self.uri, format_identity(session.remote_identity), stream.type, stream.encryption.type)) def _NH_RTPStreamDidNotEnableEncryption(self, notification): stream = notification.sender session = stream.session log.info(u'Room %s - %s %s stream did not enable encryption: %s' % (self.uri, format_identity(session.remote_identity), stream.type, notification.data.reason)) def _NH_RTPStreamZRTPReceivedSAS(self, notification): if not self.config.zrtp_auto_verify: return stream = notification.sender session = stream.session sas = notification.data.sas # Send ZRTP SAS over the chat stream, if available try: chat_stream = next(stream for stream in session.streams if stream.type=='chat') except StopIteration: return # Only send the message if there are no relays in between secure_chat = chat_stream.transport == 'tls' and all(len(path)==1 for path in (chat_stream.msrp.full_local_path, chat_stream.msrp.full_remote_path)) if secure_chat: txt = 'Received ZRTP Short Authentication String: %s' % sas # Don't set the remote identity, that way it will appear as a private message ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp') message_type = CPIMHeader('Message-Type', ns, 'status') chat_stream.send_message(txt, 'text/plain', sender=self.identity, additional_headers=[message_type]) def _NH_RTPStreamDidTimeout(self, notification): stream = notification.sender if stream.type != 'audio': return session = stream.session log.info(u'Room %s - audio stream for session %s timed out' % (self.uri, format_identity(session.remote_identity))) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender data = notification.data session = notification.sender.session message = data.message content_type = message.content_type.lower() if content_type.startswith(('text/', 'image/')): stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') self.incoming_message_queue.send((session, 'message', data)) elif content_type == 'application/blink-screensharing': stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') self.add_screen_image(message.sender, message.content) elif content_type == 'application/blink-zrtp-sas': if not self.config.zrtp_auto_verify: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') return try: audio_stream = next(stream for stream in session.streams if stream.type=='audio' and stream.encryption.active and stream.encryption.type=='ZRTP') except StopIteration: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') return # Only trust it if there was a direct path and the transport is TLS secure_chat = stream.transport == 'tls' and all(len(path)==1 for path in (stream.msrp.full_local_path, stream.msrp.full_remote_path)) remote_sas = str(message.content) if remote_sas == audio_stream.encryption.zrtp.sas and secure_chat: audio_stream.encryption.zrtp.verified = True stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') else: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') else: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') def _NH_ChatStreamGotComposingIndication(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session self.incoming_message_queue.send((session, 'composing_indication', data)) def _NH_ChatStreamGotNicknameRequest(self, notification): nickname = notification.data.nickname session = notification.sender.session chunk = notification.data.chunk if nickname: if nickname in self.session_nickname_map.values() and (session not in self.session_nickname_map or self.session_nickname_map[session] != nickname): notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use') return self.session_nickname_map[session] = nickname self.last_nicknames_map[str(session.remote_identity.uri)] = nickname else: self.session_nickname_map.pop(session, None) self.last_nicknames_map.pop(str(session.remote_identity.uri), None) notification.sender.accept_nickname(chunk) self.dispatch_conference_info() def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender try: self.subscriptions.remove(subscription) except ValueError: pass else: notification.center.remove_observer(self, sender=subscription) def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.info(u'Room %s - %s has put the audio session on hold' % (self.uri, format_identity(session.remote_identity))) else: log.info(u'Room %s - %s has taken the audio session out of hold' % (self.uri, format_identity(session.remote_identity))) self.dispatch_conference_info() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] timer = reactor.callLater(3, self._accept_proposal, session, streams) old_timer = getattr(session, 'proposal_timer', None) assert old_timer is None session.proposal_timer = timer def _NH_SIPSessionProposalRejected(self, notification): if notification.data.originator == 'remote': session = notification.sender timer = getattr(session, 'proposal_timer', None) if timer is not None: timer.cancel() session.proposal_timer = None def _NH_SIPSessionHadProposalFailure(self, notification): if notification.data.originator == 'remote': session = notification.sender timer = getattr(session, 'proposal_timer', None) assert timer is not None timer.cancel() session.proposal_timer = None def _NH_SIPSessionDidRenegotiateStreams(self, notification): session = notification.sender for stream in notification.data.added_streams: notification.center.add_observer(self, sender=stream) txt = u'%s has added %s' % (format_identity(session.remote_identity), stream.type) log.info(u'Room %s - %s' % (self.uri, txt)) self.dispatch_server_message(txt, exclude=session) if stream.type == 'audio': log.info(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, stream.codec, stream.sample_rate, stream.local_rtp_address, stream.local_rtp_port, stream.remote_rtp_address, stream.remote_rtp_port)) if stream.encryption.type != 'ZRTP': # We don't listen for stream notifications early enough if stream.encryption.active: log.info(u'Room %s - %s %s stream enabled %s encryption' % (self.uri, format_identity(session.remote_identity), stream.type, stream.encryption.type)) else: log.info(u'Room %s - %s %s stream did not enable encryption' % (self.uri, format_identity(session.remote_identity), stream.type)) if notification.data.added_streams: welcome_handler = WelcomeHandler(self, initial=False, session=session, streams=notification.data.added_streams) welcome_handler.run() for stream in notification.data.removed_streams: notification.center.remove_observer(self, sender=stream) txt = u'%s has removed %s' % (format_identity(session.remote_identity), stream.type) log.info(u'Room %s - %s' % (self.uri, txt)) self.dispatch_server_message(txt, exclude=session) if stream.type == 'audio': try: self.audio_conference.remove(stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() if not session.streams: log.info(u'Room %s - %s has removed all streams, session will be terminated' % (self.uri, format_identity(session.remote_identity))) session.end() self.dispatch_conference_info() def _NH_SIPSessionTransferNewIncoming(self, notification): log.info(u'Room %s - Call transfer request rejected, REFER must be out of dialog (RFC4579 5.5)' % self.uri) notification.sender.reject_transfer(403) def _NH_SIPSessionWillEnd(self, notification): session = notification.sender timer = getattr(session, 'proposal_timer', None) if timer is not None and timer.active(): timer.cancel() session.proposal_timer = None @staticmethod def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt @staticmethod def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type @staticmethod def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text @staticmethod def format_file_size(size): infinite = float('infinity') boundaries = [( 1024, '%d bytes', 1), ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] for boundary, format, divisor in boundaries: if size < boundary: return format % (size/divisor,) else: return "%d bytes" % size class MoHPlayer(object): implements(IObserver) def __init__(self, conference): self.conference = conference self.files = None self.paused = None self._player = None def start(self): files = glob('%s/*.wav' % Resources.get('sounds/moh')) if not files: log.error(u'No files found, MoH is disabled') return random.shuffle(files) self.files = cycle(files) self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_delay=1, volume=20) self.paused = True self.conference.bridge.add(self._player) NotificationCenter().add_observer(self, sender=self._player) def stop(self): if self._player is None: return NotificationCenter().remove_observer(self, sender=self._player) self._player.stop() self.paused = True self.conference.bridge.remove(self._player) self.conference = None def play(self): if self._player is not None and self.paused: self.paused = False self._play_next_file() def pause(self): if self._player is not None and not self.paused: self.paused = True self._player.stop() def _play_next_file(self): self._player.filename = next(self.files) self._player.play() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_WavePlayerDidFail(self, notification): if not self.paused: self._play_next_file() _NH_WavePlayerDidEnd = _NH_WavePlayerDidFail class WelcomeHandler(object): implements(IObserver) def __init__(self, room, initial, session, streams): self.room = room self.initial = initial self.session = session self.streams = streams self.procs = proc.RunningProcSet() @run_in_green_thread def run(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) for stream in self.streams: if stream.type == 'audio': self.procs.spawn(self.audio_welcome, stream) elif stream.type == 'chat': self.procs.spawn(self.chat_welcome, stream) self.procs.waitall() notification_center.remove_observer(self, sender=self.session) self.session = None self.streams = None self.room = None self.procs = None def play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() - except WavePlayerError, e: + except WavePlayerError as e: log.warning(u'Error playing file %s: %s' % (file, e)) def audio_welcome(self, stream): player = WavePlayer(stream.mixer, '', pause_time=1, initial_delay=1, volume=50) stream.bridge.add(player) try: if self.initial: file = Resources.get('sounds/co_welcome_conference.wav') self.play_file_in_player(player, file, 1) user_count = len({str(s.remote_identity.uri) for s in self.room.sessions if s.remote_identity.uri != self.session.remote_identity.uri and any(stream for stream in s.streams if stream.type == 'audio')}) if user_count == 0: file = Resources.get('sounds/co_only_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count == 1: file = Resources.get('sounds/co_there_is_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count < 100: file = Resources.get('sounds/co_there_are.wav') self.play_file_in_player(player, file, 0.2) if user_count <= 24: file = Resources.get('sounds/bi_%d.wav' % user_count) self.play_file_in_player(player, file, 0.1) else: file = Resources.get('sounds/bi_%d0.wav' % (user_count / 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/bi_%d.wav' % (user_count % 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/co_more_participants.wav') self.play_file_in_player(player, file, 0) file = Resources.get('sounds/connected_tone.wav') self.play_file_in_player(player, file, 0.1) except proc.ProcExit: # No need to remove the bridge from the stream, it's done automatically pass else: stream.bridge.remove(player) self.room.audio_conference.add(stream) self.room.audio_conference.unhold() if len(self.room.audio_conference.streams) == 1: self.room.moh_player.play() else: self.room.moh_player.pause() finally: player.stop() def chat_welcome(self, stream): if self.initial: txt = 'Welcome to SylkServer!' else: txt = '' user_count = len({str(s.remote_identity.uri) for s in self.room.sessions if s.remote_identity.uri != self.session.remote_identity.uri}) if user_count == 0: txt += ' You are the first participant' else: if user_count == 1: txt += ' There is one more participant' else: txt += ' There are %s more participants' % user_count txt += ' in this conference room.' if self.room.config.advertise_xmpp_support or self.room.config.pstn_access_numbers or self.room.config.webrtc_gateway_url: txt += '\n\nOther participants can join at these addresses:\n\n' if self.room.config.pstn_access_numbers: if len(self.room.config.pstn_access_numbers) == 1: nums = self.room.config.pstn_access_numbers[0] else: nums = ', '.join(self.room.config.pstn_access_numbers[:-1]) + ' or %s' % self.room.config.pstn_access_numbers[-1] txt += ' - Using a landline or mobile phone, dial %s (audio)\n' % nums if self.room.config.advertise_xmpp_support: txt += ' - Using an XMPP client, connect to group chat room %s (chat)\n' % self.room.uri txt += ' - Using an XMPP Jingle capable client, add contact %s and call it (audio)\n' % self.room.uri txt += ' - Using a SIP client, initiate a session to %s (audio and chat)\n' % self.room.uri if self.room.config.webrtc_gateway_url: webrtc_url = str(self.room.config.webrtc_gateway_url).replace('$room', self.room.uri) txt += ' - Using a WebRTC enabled browser go to %s\n' % webrtc_url stream.send_message(txt, 'text/plain', sender=self.room.identity, recipients=[self.room.identity]) for msg in self.room.history: stream.send_message(msg.content, msg.content_type, sender=msg.sender, recipients=[self.room.identity], timestamp=msg.timestamp) # Send ZRTP SAS over the chat stream, if applicable if self.room.config.zrtp_auto_verify: session = stream.session try: audio_stream = next(stream for stream in session.streams if stream.type=='audio') except StopIteration: pass else: if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active: # Only send the message if there are no relays in between secure_chat = stream.transport == 'tls' and all(len(path)==1 for path in (stream.msrp.full_local_path, stream.msrp.full_remote_path)) sas = audio_stream.encryption.zrtp.sas if sas is not None and secure_chat: txt = 'Received ZRTP Short Authentication String: %s' % sas # Don't set the remote identity, that way it will appear as a private message ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp') message_type = CPIMHeader('Message-Type', ns, 'status') stream.send_message(txt, 'text/plain', sender=self.room.identity, additional_headers=[message_type]) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): self.procs.killall() class RoomFile(object): def __init__(self, name, hash, size, sender): self.name = name self.hash = hash self.size = size self.sender = sender @property def file_selector(self): return FileSelector.for_file(self.name, hash=self.hash) class FileTransferHandler(object): implements(IObserver) def __init__(self, room): self.room = weakref.ref(room) self.session = None self.stream = None self.handler = None self.direction = None def init_incoming(self, stream): self.direction = 'incoming' self.stream = stream self.session = stream.session self.handler = stream.handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.stream) notification_center.add_observer(self, sender=self.handler) @run_in_green_thread def init_outgoing(self, destination, file): self.direction = 'outgoing' room = self.room() if room is None: return settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI.new(destination) lookup = DNSLookup() try: route = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()[0] except (DNSLookupError, IndexError): return self.session = Session(account) self.stream = MediaStreamRegistry.get('file-transfer')(file.file_selector, 'sendonly') self.handler = self.stream.handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.stream) notification_center.add_observer(self, sender=self.handler) from_header = FromHeader(SIPURI.new(room.identity.uri), u'Conference File Transfer') to_header = ToHeader(SIPURI.new(destination)) extra_headers = [] if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(file.sender.uri))) extra_headers.append(SubjectHeader(u'File uploaded by %s' % file.sender)) self.session.connect(from_header, to_header, route=route, streams=[self.stream], is_focus=True, extra_headers=extra_headers) def _terminate(self, failure_reason=None): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.handler) room = self.room() if room is not None: if failure_reason is None: if self.direction == 'incoming' and self.stream.direction == 'recvonly': sender = ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name) file = RoomFile(self.stream.file_selector.name, self.stream.file_selector.hash, self.stream.file_selector.size, sender) room.add_file(file) else: room.dispatch_server_message('File transfer for %s failed: %s' % (os.path.basename(self.stream.file_selector.name), failure_reason)) self.session = None self.stream = None self.handler = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidNotInitialize(self, notification): self._terminate(failure_reason=notification.data.reason) def _NH_FileTransferHandlerDidEnd(self, notification): if self.direction == 'incoming': if self.stream.direction == 'sendonly': reactor.callLater(3, self.session.end) else: reactor.callLater(1, self.session.end) else: self.session.end() self._terminate(failure_reason=notification.data.reason) diff --git a/sylk/applications/ircconference/room.py b/sylk/applications/ircconference/room.py index 08c05fe..3aaa7b1 100644 --- a/sylk/applications/ircconference/room.py +++ b/sylk/applications/ircconference/room.py @@ -1,672 +1,672 @@ import random import urllib import lxml.html import lxml.html.clean from itertools import count from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from eventlib import coros, proc from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError, SIPCoreInvalidStateError from sipsimple.payloads.conference import Conference, ConferenceDocument, ConferenceDescription, ConferenceState, Endpoint, EndpointStatus, HostInfo, JoiningInfo, Media, User, Users, WebPage from sipsimple.streams.msrp.chat import ChatIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from twisted.internet import protocol, reactor from twisted.words.protocols import irc from zope.interface import implements from sylk.applications.ircconference.configuration import get_room_configuration from sylk.applications.ircconference.logger import log from sylk.resources import Resources def format_identity(identity): uri = identity.uri if identity.display_name: return u'%s ' % (identity.display_name, uri.user, uri.host) else: return u'sip:%s@%s' % (uri.user, uri.host) def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type def html2text(data): try: doc = lxml.html.document_fromstring(data) cleaner = lxml.html.clean.Cleaner(style=True) doc = cleaner.clean_html(doc) return doc.text_content().strip('\n') except Exception: return '' class IRCMessage(object): def __init__(self, username, uri, content, content_type='text/plain'): self.sender = ChatIdentity(uri, display_name=username) self.content = content self.content_type = content_type class IRCRoom(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ __metaclass__ = Singleton implements(IObserver) def __init__(self, uri): self.uri = uri self.identity = ChatIdentity.parse('' % self.uri) self.sessions = [] self.sessions_with_proposals = [] self.subscriptions = [] self.pending_messages = [] self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.conference_info_payload = None self.conference_info_version = count(1) self.irc_connector = None self.irc_protocol = None @classmethod def get_room(cls, uri): room_uri = '%s@%s' % (uri.user, uri.host) room = cls(room_uri) return room @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' def start(self): if self.state != 'stopped': return config = get_room_configuration(self.uri.split('@')[0]) factory = IRCBotFactory(config) host, port = config.server self.irc_connector = reactor.connectTCP(host, port, factory) NotificationCenter().add_observer(self, sender=self.irc_connector.factory) self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.state = 'started' def stop(self): if self.state != 'started': return self.state = 'stopped' NotificationCenter().remove_observer(self, sender=self.irc_connector.factory) self.irc_connector.factory.stop_requested = True self.irc_connector.disconnect() self.irc_connector = None self.message_dispatcher.kill(proc.ProcExit) self.audio_conference = None def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'msrp_message': if data.sender.uri != session.remote_identity.uri: return self.dispatch_message(session, data) elif message_type == 'irc_message': self.dispatch_irc_message(data) def dispatch_message(self, session, message): identity = ChatIdentity.parse(format_identity(session.remote_identity)) for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: pass else: chat_stream.send_message(message.content, message.content_type, sender=identity, recipients=[self.identity], timestamp=message.timestamp) def dispatch_irc_message(self, message): for session in self.sessions: try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[self.identity]) def dispatch_server_message(self, content, content_type='text/plain', exclude=None): for session in (session for session in self.sessions if session is not exclude): try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: chat_stream.send_message(content, content_type, sender=self.identity, recipients=[self.identity]) def get_conference_info(self): # Send request to get participants list, we'll get a notification with it if self.irc_protocol is not None: self.irc_protocol.get_participants() else: self.dispatch_conference_info([]) def dispatch_conference_info(self, irc_participants): data = self.build_conference_info_payload(irc_participants) for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def build_conference_info_payload(self, irc_participants): irc_configuration = get_room_configuration(self.uri.split('@')[0]) if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = ConferenceDescription(display_text='#%s on %s' % (irc_configuration.channel, irc_configuration.server[0]), free_text='Hosted by %s' % settings.user_agent) host_info = HostInfo(web_page=WebPage(irc_configuration.website)) self.conference_info_payload = Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=Users()) self.conference_info_payload.version = next(self.conference_info_version) user_count = len({str(s.remote_identity.uri) for s in self.sessions}) + len(irc_participants) self.conference_info_payload.conference_state = ConferenceState(user_count=user_count, active=True) users = Users() for session in self.sessions: try: user = next(user for user in users if user.entity == str(session.remote_identity.uri)) except StopIteration: user = User(str(session.remote_identity.uri), display_text=session.remote_identity.display_name) users.add(user) joining_info = JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = EndpointStatus('on-hold' if session_on_hold else 'connected') endpoint = Endpoint(str(session._invitation.remote_contact_header.uri), display_text=session.remote_identity.display_name, joining_info=joining_info, status=hold_status) for stream in session.streams: endpoint.add(Media(id(stream), media_type=format_conference_stream_type(stream))) user.add(endpoint) for nick in irc_participants: irc_uri = '%s@%s' % (urllib.quote(nick), irc_configuration.server[0]) user = User(irc_uri, display_text=nick) users.add(user) endpoint = Endpoint(irc_uri, display_text=nick) endpoint.add(Media(random.randint(100000000, 999999999), media_type='message')) user.add(endpoint) self.conference_info_payload.users = users return self.conference_info_payload.toxml() def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams if stream.type == 'audio') except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.info(u'Audio stream using %s/%sHz, end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) welcome_handler = WelcomeHandler(self, session) welcome_handler.start() self.get_conference_info() if len(self.sessions) == 1: log.info(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams))) else: log.info(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), format_stream_types(session.streams)), exclude=session) def remove_session(self, session): notification_center = NotificationCenter() try: chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat') except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio') except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.audio_conference.hold() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.get_conference_info() log.info(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, format_session_duration(session))) if not self.sessions: log.info(u'Last participant left conference %s' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), format_session_duration(session))) def accept_proposal(self, session, streams): if session in self.sessions_with_proposals: session.accept_proposal(streams) self.sessions_with_proposals.remove(session) def handle_incoming_subscription(self, subscribe_request, data): if subscribe_request.event != 'conference': subscribe_request.reject(489) return NotificationCenter().add_observer(self, sender=subscribe_request) self.subscriptions.append(subscribe_request) try: subscribe_request.accept() - except SIPCoreError, e: + except SIPCoreError as e: log.warning('Error accepting SIP subscription: %s' % e) subscribe_request.end() self.get_conference_info() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification_center = NotificationCenter() notification_center.remove_observer(self, sender=subscription) self.subscriptions.remove(subscription) def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.info(u'%s has put the audio session on hold' % format_identity(session.remote_identity)) else: log.info(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity)) self.get_conference_info() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return if chat_streams: chat_streams[0].chatroom_capabilities = [] streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] self.sessions_with_proposals.append(session) reactor.callLater(4, self.accept_proposal, session, streams) def _NH_SIPSessionProposalRejected(self, notification): session = notification.sender self.sessions_with_proposals.remove(session) def _NH_SIPSessionDidRenegotiateStreams(self, notification): session = notification.sender for stream in notification.data.added_streams: notification.center.add_observer(self, sender=stream) log.info(u'%s has added %s to %s' % (format_identity(session.remote_identity), stream.type, self.uri)) self.dispatch_server_message('%s has added %s' % (format_identity(session.remote_identity), stream.type), exclude=session) if stream.type == 'audio': log.info(u'Audio stream using %s/%sHz, end-points: %s:%d <-> %s:%d' % (stream.codec, stream.sample_rate, stream.local_rtp_address, stream.local_rtp_port, stream.remote_rtp_address, stream.remote_rtp_port)) welcome_handler = WelcomeHandler(self, session) welcome_handler.start(welcome_prompt=False) for stream in notification.data.removed_streams: notification.center.remove_observer(self, sender=stream) log.info(u'%s has removed %s from %s' % (format_identity(session.remote_identity), stream.type, self.uri)) self.dispatch_server_message('%s has removed %s' % (format_identity(session.remote_identity), stream.type), exclude=session) if stream.type == 'audio': try: self.audio_conference.remove(stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.audio_conference.hold() if not session.streams: log.info(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri)) session.end() self.get_conference_info() def _NH_RTPStreamDidTimeout(self, notification): stream = notification.sender if stream.type != 'audio': return session = stream.session log.info(u'Audio stream for session %s timed out' % format_identity(session.remote_identity)) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender message = notification.data.message if message.content_type not in ('text/html', 'text/plain'): log.info(u'Unsupported content type: %s, ignoring message' % message.content_type) stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') return stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') # Send MSRP chat message to other participants session = stream.session self.incoming_message_queue.send((session, 'msrp_message', message)) # Send MSRP chat message to IRC chat room if message.content_type == 'text/html': content = html2text(message.content) elif message.content_type == 'text/plain': content = message.content else: log.warning('unexpected message type: %s' % message.content_type) return sender = message.sender irc_message = '%s: %s' % (format_identity(sender), content) if self.irc_protocol is not None: self.irc_protocol.send_message(irc_message.encode('utf-8')) else: self.pending_messages.append(irc_message) def _NH_ChatStreamGotNicknameRequest(self, notification): # Discard the nickname but pretend we accept it so that XMPP clients can work chunk = notification.data.chunk notification.sender.accept_nickname(chunk) def _NH_IRCBotGotConnected(self, notification): self.irc_protocol = notification.data.protocol # Send enqueued messages while self.pending_messages: message = self.pending_messages.pop(0) self.irc_protocol.send_message(message.encode('utf-8')) # Update participants list self.get_conference_info() def _NH_IRCBotGotDisconnected(self, notification): self.irc_protocol = None def _NH_IRCBotGotMessage(self, notification): message = notification.data.message self.incoming_message_queue.send((None, 'irc_message', message)) def _NH_IRCBotGotParticipantsList(self, notification): self.dispatch_conference_info(notification.data.participants) def _NH_IRCBotJoinedChannel(self, notification): self.get_conference_info() def _NH_IRCBotUserJoined(self, notification): self.dispatch_server_message('%s joined the IRC channel' % notification.data.user) self.get_conference_info() def _NH_IRCBotUserLeft(self, notification): self.dispatch_server_message('%s left the IRC channel' % notification.data.user) self.get_conference_info() def _NH_IRCBotUserQuit(self, notification): self.dispatch_server_message('%s quit the IRC channel: %s' % (notification.data.user, notification.data.reason)) self.get_conference_info() def _NH_IRCBotUserKicked(self, notification): data = notification.data self.dispatch_server_message('%s kicked %s out of the IRC channel: %s' % (data.kicker, data.kickee, data.reason)) self.get_conference_info() def _NH_IRCBotUserRenamed(self, notification): self.dispatch_server_message('%s changed his name to %s' % (notification.data.oldname, notification.data.newname)) self.get_conference_info() def _NH_IRCBotUserAction(self, notification): self.dispatch_server_message('%s %s' % (notification.data.user, notification.data.action)) class WelcomeHandler(object): implements(IObserver) def __init__(self, room, session): self.room = room self.session = session self.proc = None @run_in_green_thread def start(self, welcome_prompt=True): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) self.proc = proc.spawn(self.play_audio_welcome, welcome_prompt) self.proc.wait() notification_center.remove_observer(self, sender=self.session) self.session = None self.room = None self.proc = None def play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() - except WavePlayerError, e: + except WavePlayerError as e: log.warning(u'Error playing file %s: %s' % (file, e)) def play_audio_welcome(self, welcome_prompt): try: audio_stream = next(stream for stream in self.session.streams if stream.type == 'audio') except StopIteration: return player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_delay=1, volume=50) audio_stream.bridge.add(player) try: if welcome_prompt: file = Resources.get('sounds/co_welcome_conference.wav') self.play_file_in_player(player, file, 1) user_count = len({str(s.remote_identity.uri) for s in self.room.sessions if s.remote_identity.uri != self.session.remote_identity.uri and any(stream for stream in s.streams if stream.type == 'audio')}) if user_count == 0: file = Resources.get('sounds/co_only_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count == 1: file = Resources.get('sounds/co_there_is_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count < 100: file = Resources.get('sounds/co_there_are.wav') self.play_file_in_player(player, file, 0.2) if user_count <= 24: file = Resources.get('sounds/bi_%d.wav' % user_count) self.play_file_in_player(player, file, 0.1) else: file = Resources.get('sounds/bi_%d0.wav' % (user_count / 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/bi_%d.wav' % (user_count % 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/co_more_participants.wav') self.play_file_in_player(player, file, 0) file = Resources.get('sounds/connected_tone.wav') self.play_file_in_player(player, file, 0.1) except proc.ProcExit: # No need to remove the bridge from the stream, it's done automatically pass else: audio_stream.bridge.remove(player) self.room.audio_conference.add(audio_stream) self.room.audio_conference.unhold() finally: player.stop() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): self.proc.kill() class IRCBot(irc.IRCClient): nickname = 'SylkServer' def __init__(self): self._nick_collector = [] self.nicks = [] def connectionMade(self): irc.IRCClient.connectionMade(self) log.info('Connection to IRC has been established') NotificationCenter().post_notification('IRCBotGotConnected', self.factory, NotificationData(protocol=self)) def connectionLost(self, failure): irc.IRCClient.connectionLost(self, failure) NotificationCenter().post_notification('IRCBotGotDisconnected', self.factory, NotificationData()) def signedOn(self): log.info('Logging into %s channel...' % self.factory.channel) self.join(self.factory.channel) def kickedFrom(self, channel, kicker, message): log.info('Got kicked from %s by %s: %s. Rejoining...' % (channel, kicker, message)) self.join(self.factory.channel) def joined(self, channel): log.info('Logged into %s channel' % channel) NotificationCenter().post_notification('IRCBotJoinedChannel', self.factory, NotificationData(channel=self.factory.channel)) def privmsg(self, user, channel, message): if channel == '*': return username = user.split('!', 1)[0] if username == self.nickname: return if channel == self.nickname: self.msg(username, "Sorry, I don't support private messages, I'm a bot.") return uri = SIPURI.parse('sip:%s@%s' % (urllib.quote(username), self.factory.config.server[0])) irc_message = IRCMessage(username, uri, message.decode('utf-8')) data = NotificationData(message=irc_message) NotificationCenter().post_notification('IRCBotGotMessage', self.factory, data) def send_message(self, message): self.say(self.factory.channel, message) def get_participants(self): self.sendLine("NAMES #%s" % self.factory.channel) def got_participants(self, nicks): data = NotificationData(participants=nicks) NotificationCenter().post_notification('IRCBotGotParticipantsList', self.factory, data) def irc_RPL_NAMREPLY(self, prefix, params): """Collect usernames from this channel. Several of these messages may be sent to cover the channel's full nicklist. An RPL_ENDOFNAMES signals the end of the list. """ # We just separate these into individual nicks and stuff them in # the nickCollector, transferred to 'nicks' when we get the RPL_ENDOFNAMES. for name in params[3].split(): # Remove operator and voice prefixes if name[0] in '@+': name = name[1:] if name != self.nickname: self._nick_collector.append(name) def irc_RPL_ENDOFNAMES(self, prefix, params): """This is sent after zero or more RPL_NAMREPLY commands to terminate the list of users in a channel. """ self.nicks = self._nick_collector self._nick_collector = [] self.got_participants(self.nicks) def userJoined(self, user, channel): if channel.strip('#') == self.factory.channel: data = NotificationData(user=user) NotificationCenter().post_notification('IRCBotUserJoined', self.factory, data) def userLeft(self, user, channel): if channel.strip('#') == self.factory.channel: data = NotificationData(user=user) NotificationCenter().post_notification('IRCBotUserLeft', self.factory, data) def userQuit(self, user, reason): data = NotificationData(user=user, reason=reason) NotificationCenter().post_notification('IRCBotUserQuit', self.factory, data) def userKicked(self, kickee, channel, kicker, message): if channel.strip('#') == self.factory.channel: data = NotificationData(kickee=kickee, kicker=kicker, reason=message) NotificationCenter().post_notification('IRCBotUserKicked', self.factory, data) def userRenamed(self, oldname, newname): data = NotificationData(oldname=oldname, newname=newname) NotificationCenter().post_notification('IRCBotUserRenamed', self.factory, data) def action(self, user, channel, data): if channel.strip('#') == self.factory.channel: username = user.split('!', 1)[0] data = NotificationData(user=username, action=data) NotificationCenter().post_notification('IRCBotUserAction', self.factory, data) class IRCBotFactory(protocol.ClientFactory): protocol = IRCBot def __init__(self, config): self.config = config self.channel = config.channel self.stop_requested = False def clientConnectionLost(self, connector, failure): log.info('Disconnected from IRC: %s' % failure.getErrorMessage()) if not self.stop_requested: log.info('Reconnecting...') connector.connect() def clientConnectionFailed(self, connector, failure): log.error('Connection to IRC server failed: %s' % failure.getErrorMessage()) diff --git a/sylk/applications/playback/__init__.py b/sylk/applications/playback/__init__.py index b626098..43dbb41 100644 --- a/sylk/applications/playback/__init__.py +++ b/sylk/applications/playback/__init__.py @@ -1,169 +1,169 @@ import os from application.python import Null from application.notification import IObserver, NotificationCenter from eventlib import proc from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.audio import WavePlayer, WavePlayerError from twisted.internet import reactor from zope.interface import implements from sylk.applications import SylkApplication, ApplicationLogger from sylk.applications.playback.configuration import get_config from sylk.bonjour import BonjourService from sylk.configuration import ServerConfig log = ApplicationLogger(__package__) class PlaybackApplication(SylkApplication): def start(self): self.bonjour_services = [] if ServerConfig.enable_bonjour: application_map = dict((item.split(':')) for item in ServerConfig.application_map) for uri, app in application_map.iteritems(): if app == 'playback': config = get_config('%s' % uri) if config is None: continue if os.path.isfile(config.file) and os.access(config.file, os.R_OK): service = BonjourService(service='sipuri', name='Playback Test', uri_user=uri, is_focus=False) service.start() service.presence_state = BonjourPresenceState('available', u'File: %s' % os.path.basename(config.file)) self.bonjour_services.append(service) def stop(self): for service in self.bonjour_services: service.stop() del self.bonjour_services[:] def incoming_session(self, session): log.info('Incoming session %s from %s to %s' % (session.call_id, session.remote_identity.uri, session.local_identity.uri)) config = get_config('%s@%s' % (session.request_uri.user, session.request_uri.host)) if config is None: config = get_config('%s' % session.request_uri.user) if config is None: log.info(u'Session %s rejected: no configuration found for %s' % (session.call_id, session.request_uri)) session.reject(488) return stream_types = {'audio'} if config.enable_video: stream_types.add('video') streams = [stream for stream in session.proposed_streams if stream.type in stream_types] if not streams: log.info(u'Session %s rejected: invalid media, only RTP audio and video is supported' % session.call_id) session.reject(488) return handler = PlaybackHandler(config, session) handler.run() def incoming_subscription(self, request, data): request.reject(405) def incoming_referral(self, request, data): request.reject(405) def incoming_message(self, request, data): request.reject(405) class PlaybackHandler(object): implements(IObserver) def __init__(self, config, session): self.config = config self.session = session self.proc = None def run(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) self.session.send_ring_indication() stream_types = {'audio'} if self.config.enable_video: stream_types.add('video') streams = [stream for stream in self.session.proposed_streams if stream.type in stream_types] reactor.callLater(self.config.answer_delay, self._accept_session, self.session, streams) def _accept_session(self, session, streams): if session.state == 'incoming': session.accept(streams) def _play(self): config = get_config('%s@%s' % (self.session.request_uri.user, self.session.request_uri.host)) if config is None: config = get_config('%s' % self.session.request_uri.user) try: audio_stream = next(stream for stream in self.session.streams if stream.type=='audio') except StopIteration: self.proc = None return player = WavePlayer(audio_stream.mixer, config.file) audio_stream.bridge.add(player) log.info(u'Playing file %s for session %s' % (config.file, self.session.call_id)) try: player.play().wait() - except (ValueError, WavePlayerError), e: + except (ValueError, WavePlayerError) as e: log.warning(u'Error playing file %s: %s' % (config.file, e)) except proc.ProcExit: pass finally: player.stop() self.proc = None audio_stream.bridge.remove(player) self.session.end() self.session = None def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender stream_types = {'audio'} if self.config.enable_video: stream_types.add('video') streams = [stream for stream in session.proposed_streams if stream.type in stream_types] if not streams: session.reject_proposal() return session.accept_proposal(streams) def _NH_SIPSessionDidRenegotiateStreams(self, notification): session = notification.sender for stream in notification.data.added_streams: log.info('Session %s added %s' % (session.call_id, stream.type)) for stream in notification.data.removed_streams: log.info('Session %s removed %s' % (session.call_id, stream.type)) if notification.data.added_streams and self.proc is None: self.proc = proc.spawn(self._play) if notification.data.removed_streams and not session.streams: session.end() def _NH_SIPSessionDidStart(self, notification): session = notification.sender log.info('Session %s started' % session.call_id) self.proc = proc.spawn(self._play) def _NH_SIPSessionDidFail(self, notification): session = notification.sender log.info('Session %s failed' % session.call_id) notification.center.remove_observer(self, sender=session) def _NH_SIPSessionWillEnd(self, notification): if self.proc: self.proc.kill() def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.info('Session %s ended' % session.call_id) notification.center.remove_observer(self, sender=session) diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py index 4e5fbbe..52eab6f 100644 --- a/sylk/applications/xmppgateway/muc.py +++ b/sylk/applications/xmppgateway/muc.py @@ -1,465 +1,465 @@ import uuid from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute from eventlib import coros, proc from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError, Referral, sip_status_messages from sipsimple.core import ContactHeader, FromHeader, ToHeader, ReferToHeader, RouteHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.msrp.chat import ChatStreamError, ChatIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from time import time from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, OutgoingInvitationMessage, STANZAS_NS from sylk.configuration import SIPConfig from sylk.session import Session class ReferralError(Exception): def __init__(self, error, code=0): self.error = error self.code = code class SIPReferralDidFail(Exception): def __init__(self, data): self.data = data class MucInvitationFailure(object): def __init__(self, code, reason): self.code = code self.reason = reason def __str__(self): return '%s (%s)' % (self.code, self.reason) class X2SMucInvitationHandler(object): implements(IObserver) def __init__(self, sender, recipient, participant): self.sender = sender self.recipient = recipient self.participant = participant self.active = False self.route = None self._channel = coros.queue() self._referral = None self._failure = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='NetworkConditionsDidChange') proc.spawn(self._run) notification_center.post_notification('X2SMucInvitationHandlerDidStart', sender=self) def _run(self): notification_center = NotificationCenter() settings = SIPSimpleSettings() sender_uri = self.sender.uri.as_sip_uri() recipient_uri = self.recipient.uri.as_sip_uri() participant_uri = self.participant.uri.as_sip_uri() try: # Lookup routes account = DefaultAccount() if account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(recipient_uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() - except DNSLookupError, e: + except DNSLookupError as e: raise ReferralError(error='DNS lookup failed: %s' % e) timeout = time() + 30 for route in routes: self.route = route remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) refer_to_header = ReferToHeader(str(participant_uri)) refer_to_header.parameters['method'] = 'INVITE' referral = Referral(recipient_uri, FromHeader(sender_uri), ToHeader(recipient_uri), refer_to_header, ContactHeader(contact_uri), RouteHeader(route.uri), account.credentials) notification_center.add_observer(self, sender=referral) try: referral.send_refer(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=referral) timeout = 5 raise ReferralError(error='Internal error') self._referral = referral try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidStart': break - except SIPReferralDidFail, e: + except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=referral) self._referral = None if e.data.code in (403, 405): raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code) else: # Otherwise just try the next route continue else: break else: self.route = None raise ReferralError(error='No more routes to try') # At this point it is subscribed. Handle notifications and ending/failures. try: self.active = True while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidEnd': break - except SIPReferralDidFail, e: + except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=self._referral) raise ReferralError(error=e.data.reason, code=e.data.code) else: notification_center.remove_observer(self, sender=self._referral) finally: self.active = False - except ReferralError, e: + except ReferralError as e: self._failure = MucInvitationFailure(e.code, e.error) finally: notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._referral = None if self._failure is not None: notification_center.post_notification('X2SMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure)) else: notification_center.post_notification('X2SMucInvitationHandlerDidEnd', sender=self) def _refresh(self): account = DefaultAccount() transport = self.route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) contact_header = ContactHeader(contact_uri) self._referral.refresh(contact_header=contact_header, timeout=2) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPReferralDidStart(self, notification): self._channel.send(notification) def _NH_SIPReferralDidEnd(self, notification): self._channel.send(notification) def _NH_SIPReferralDidFail(self, notification): self._channel.send_exception(SIPReferralDidFail(notification.data)) def _NH_SIPReferralGotNotify(self, notification): self._channel.send(notification) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._refresh() class S2XMucInvitationHandler(object): implements(IObserver) def __init__(self, session, sender, recipient, inviter): self.session = session self.sender = sender self.recipient = recipient self.inviter = inviter self._timer = None self._failure = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) stanza = OutgoingInvitationMessage(self.sender, self.recipient, self.inviter, id='MUC.'+uuid.uuid4().hex) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) self._timer = reactor.callLater(90, self._timeout) notification_center.post_notification('S2XMucInvitationHandlerDidStart', sender=self) def stop(self): if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None notification_center = NotificationCenter() if self.session is not None: notification_center.remove_observer(self, sender=self.session) reactor.callLater(5, self._end_session, self.session) self.session = None if self._failure is not None: notification_center.post_notification('S2XMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure)) else: notification_center.post_notification('S2XMucInvitationHandlerDidEnd', sender=self) def _end_session(self, session): try: session.end(480) except Exception: pass def _timeout(self): NotificationCenter().remove_observer(self, sender=self.session) try: self.session.end(408) except Exception: pass self.session = None self._failure = MucInvitationFailure('Timeout', 408) self.stop() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidFail(self, notification): notification.center.remove_observer(self, sender=self.session) self.session = None self._failure = MucInvitationFailure(notification.data.reason or notification.data.failure_reason, notification.data.code) self.stop() class X2SMucHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity, nickname): self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.nickname = nickname self._xmpp_muc_session = None self._sip_session = None self._msrp_stream = None self._first_stanza = None self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza self._participants = set() # set of (URI, nickname) tuples self.ended = False def start(self): notification_center = NotificationCenter() self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session.start() notification_center.post_notification('X2SMucHandlerDidStart', sender=self) self._start_sip_session() def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_muc_session is not None: notification_center.remove_observer(self, sender=self._xmpp_muc_session) # Send indication that the user has been kicked from the room sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('307') xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) self._xmpp_muc_session.end() self._xmpp_muc_session = None if self._sip_session is not None: notification_center.remove_observer(self, sender=self._sip_session) self._sip_session.end() self._sip_session = None self.ended = True notification_center.post_notification('X2SMucHandlerDidEnd', sender=self) @run_in_green_thread def _start_sip_session(self): # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = self.sip_identity.uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) self.end() return self._msrp_stream = MediaStreamRegistry.get('chat')() route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self._sip_session = Session(account) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._sip_session) notification_center.add_observer(self, sender=self._msrp_stream) self._sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self._msrp_stream]) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.info("SIP multiparty session %s started" % self._sip_session.call_id) if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed: self.end() return message_id = self._msrp_stream.set_local_nickname(self.nickname) self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza) self._first_stanza = None def _NH_SIPSessionDidEnd(self, notification): log.info("SIP multiparty session %s ended" % self._sip_session.call_id) notification.center.remove_observer(self, sender=self._sip_session) notification.center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.info("SIP multiparty session %s failed" % self._sip_session.call_id) notification.center.remove_observer(self, sender=self._sip_session) notification.center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self._sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self._sip_session.reject_transfer(403) def _NH_SIPSessionGotConferenceInfo(self, notification): # Translate to XMPP payload xmpp_manager = XMPPManager() own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host) conference_info = notification.data.conference_info new_participants = set() for user in conference_info.users: user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity) nickname = user.display_text.value if user.display_text else user.entity new_participants.add((user_uri, nickname)) # Remove participants that are no longer in the room for uri, nickname in self._participants - new_participants: sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) xmpp_manager.send_muc_stanza(stanza) # Send presence for current participants for uri, nickname in new_participants: if uri == own_uri: continue sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = Identity(uri) xmpp_manager.send_muc_stanza(stanza) self._participants = new_participants # Send own status last sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('110') xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream if not self._xmpp_muc_session: return message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.content else: html_body = message.content body = None resource = message.sender.display_name or str(message.sender.uri) sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource)) self._xmpp_muc_session.send_message(sender, body, html_body, message_id='MUC.'+uuid.uuid4().hex) self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') def _NH_ChatStreamDidSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) self.nickname = nickname def _NH_ChatStreamDidNotSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', [('conflict', STANZAS_NS)]) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(error_stanza) def _NH_ChatStreamDidDeliverMessage(self, notification): # Echo back the message to the sender stanza = self._pending_messages_map.pop(notification.data.message_id) stanza.sender, stanza.recipient = stanza.recipient, stanza.sender stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamDidNotDeliverMessage(self, notification): self._pending_messages_map.pop(notification.data.message_id) def _NH_XMPPIncomingMucSessionDidEnd(self, notification): notification.center.remove_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session = None self.end() def _NH_XMPPIncomingMucSessionGotMessage(self, notification): if not self._sip_session: return message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = ChatIdentity(sender_uri, display_name=self.nickname) message_id = self._msrp_stream.send_message(message.body, 'text/plain', sender=sender) self._pending_messages_map[message_id] = message # Message will be echoed back to the sender on ChatStreamDidDeliverMessage def _NH_XMPPIncomingMucSessionChangedNickname(self, notification): if not self._sip_session: return nickname = notification.data.nickname try: message_id = self._msrp_stream.set_local_nickname(nickname) except ChatStreamError: return self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza) diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py index 20b7b77..db3a3a8 100644 --- a/sylk/applications/xmppgateway/presence.py +++ b/sylk/applications/xmppgateway/presence.py @@ -1,520 +1,520 @@ import hashlib import random from application.notification import IObserver, NotificationCenter from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute from eventlib import coros, proc from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Subscription from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import pidf, rpid, caps from sipsimple.payloads import ParserError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import ISOTimestamp from time import time from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.util import format_uri from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscription, XMPPIncomingSubscription from sylk.configuration import SIPConfig __all__ = ['S2XPresenceHandler', 'X2SPresenceHandler'] class S2XPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self._sip_subscriptions = [] self._stanza_cache = {} self._pidf = None self._xmpp_subscription = None self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None while self._sip_subscriptions: subscription = self._sip_subscriptions.pop() notification_center.remove_observer(self, sender=subscription) try: subscription.end() except SIPCoreError: pass self.ended = True notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self) def add_sip_subscription(self, subscription): # If s subscription is received after the handle has ended but before # S2XPresenceHandlerDidEnd has been processed we need to ignore it and wait for a retransmission # which we will handle by creating a new S2XPresenceHandler if self.ended: return self._sip_subscriptions.append(subscription) NotificationCenter().add_observer(self, sender=subscription) if self._xmpp_subscription.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None try: subscription.accept(content_type, pidf_doc) - except SIPCoreError, e: + except SIPCoreError as e: log.warning('Error accepting SIP subscription: %s' % e) subscription.end() else: try: subscription.accept_pending() - except SIPCoreError, e: + except SIPCoreError as e: log.warning('Error accepting SIP subscription: %s' % e) subscription.end() if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s added to presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _build_pidf(self): if not self._stanza_cache: self._pidf = None return None pidf_doc = pidf.PIDF(str(self.xmpp_identity)) uri = next(self._stanza_cache.iterkeys()) person = pidf.Person("PID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()) person.activities = rpid.Activities() pidf_doc.add(person) for stanza in self._stanza_cache.itervalues(): if not stanza.available: status = pidf.Status('closed') status.extended = 'offline' else: status = pidf.Status('open') if stanza.show == 'away': status.extended = 'away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'xa': status.extended = 'away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'dnd': status.extended = 'busy' if 'busy' not in person.activities: person.activities.add('busy') else: status.extended = 'available' if stanza.sender.uri.resource: resource = encode_resource(stanza.sender.uri.resource) else: # Workaround for clients not sending the resource under certain (unknown) circumstances resource = hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest() service_id = "SID-%s" % resource sip_uri = stanza.sender.uri.as_sip_uri() sip_uri.parameters['gr'] = resource sip_uri.parameters['xmpp'] = None contact = pidf.Contact(str(sip_uri)) service = pidf.Service(service_id, status=status, contact=contact) service.add(pidf.DeviceID(resource)) service.device_info = pidf.DeviceInfo(resource, description=stanza.sender.uri.resource) service.timestamp = pidf.ServiceTimestamp(stanza.timestamp) service.capabilities = caps.ServiceCapabilities(text=True, message=True) for lang, note in stanza.statuses.iteritems(): service.notes.add(pidf.PIDFNote(note, lang=lang)) pidf_doc.add(service) if not person.activities: person.activities = None self._pidf = pidf_doc.toxml() return self._pidf @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification.center.remove_observer(self, sender=subscription) self._sip_subscriptions.remove(subscription) if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s removed from presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) if not self._sip_subscriptions: self.end() def _NH_SIPIncomingSubscriptionNotifyDidFail(self, notification): if XMPPGatewayConfig.log_presence: log.info('Sending SIP NOTIFY failed from %s to %s for presence flow 0x%x: %s (%s)' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), notification.data.code, notification.data.reason)) def _NH_SIPIncomingSubscriptionGotUnsubscribe(self, notification): if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s was terminated by user for presence flow 1x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _NH_SIPIncomingSubscriptionGotRefreshingSubscribe(self, notification): if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s was refreshed for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _NH_SIPIncomingSubscriptionDidTimeout(self, notification): if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s timed out for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _NH_XMPPSubscriptionChangedState(self, notification): if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'): subscription.accept(content_type, pidf_doc) def _NH_XMPPSubscriptionGotNotify(self, notification): stanza = notification.data.presence self._stanza_cache[stanza.sender.uri] = stanza stanza.timestamp = ISOTimestamp.now() # TODO: mirror the one in the stanza, if present pidf_doc = self._build_pidf() if XMPPGatewayConfig.log_presence: log.info('XMPP notification from %s to %s for presence flow 0x%x' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self))) for subscription in self._sip_subscriptions: try: subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc) - except SIPCoreError, e: + except SIPCoreError as e: if XMPPGatewayConfig.log_presence: log.info('Failed to send SIP NOTIFY from %s to %s for presence flow 0x%x: %s' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), e)) if not stanza.available: # Only inform once about this device being unavailable del self._stanza_cache[stanza.sender.uri] def _NH_XMPPSubscriptionDidFail(self, notification): notification.center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription = None self.end() _NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class SubscriptionError(Exception): def __init__(self, error, timeout, refresh_interval=None, fatal=False): self.error = error self.refresh_interval = refresh_interval self.timeout = timeout self.fatal = fatal class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class X2SPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._sip_subscription = None self._sip_subscription_proc = None self._sip_subscription_timer = None self._xmpp_subscription = None def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() self._command_proc = proc.spawn(self._run) self._subscribe_sip() notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None if self._sip_subscription: self._unsubscribe_sip() self.ended = True notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self) @run_in_green_thread def _subscribe_sip(self): command = Command('subscribe') self._command_channel.send(command) @run_in_green_thread def _unsubscribe_sip(self): command = Command('unsubscribe') self._command_channel.send(command) command.wait() self._command_proc.kill() self._command_proc = None def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_subscribe(self, command): if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._sip_subscription_proc = None command.signal() def _process_pidf(self, body): try: pidf_doc = pidf.PIDF.parse(body) - except ParserError, e: + except ParserError as e: log.warn('Error parsing PIDF document: %s' % e) return # Build XML stanzas out of PIDF documents try: person = next(p for p in pidf_doc.persons) except StopIteration: person = None for service in pidf_doc.services: sip_contact = self.sip_identity.uri.as_sip_uri() if service.device_info is not None: sip_contact.parameters['gr'] = 'urn:uuid:%s' % service.device_info.id else: sip_contact.parameters['gr'] = service.id sender = Identity(FrozenURI.parse(sip_contact)) if service.status.extended is not None: available = service.status.extended != 'offline' else: available = service.status.basic == 'open' stanza = AvailabilityPresence(sender, self.xmpp_identity, available) for note in service.notes: stanza.statuses[note.lang] = note if service.status.extended is not None: if service.status.extended == 'away': stanza.show = 'away' elif service.status.extended == 'busy': stanza.show = 'dnd' elif person is not None and person.activities is not None: activities = set(list(person.activities)) if 'away' in activities: stanza.show = 'away' elif {'holiday', 'vacation'}.intersection(activities): stanza.show = 'xa' elif 'busy' in activities: stanza.show = 'dnd' self._xmpp_subscription.send_presence(stanza) def _sip_subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() account = DefaultAccount() refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval try: # Lookup routes if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() - except DNSLookupError, e: + except DNSLookupError as e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) subscription_uri = self.sip_identity.uri.as_sip_uri() subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()), ToHeader(subscription_uri), ContactHeader(contact_uri), 'presence', RouteHeader(route.uri), refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) raise SubscriptionError(error='Internal error', timeout=5) self._sip_subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break - except SIPSubscriptionDidFail, e: + except SIPSubscriptionDidFail as e: notification_center.remove_observer(self, sender=subscription) self._sip_subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120)) elif e.data.code == 403: # Forbidden raise SubscriptionError(error='Forbidden', timeout=None, fatal=True) elif e.data.code == 423: # Get the value of the Min-Expires header if e.data.min_expires is not None and e.data.min_expires > refresh_interval: interval = e.data.min_expires else: interval = None raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval) elif e.data.code in (405, 406, 489): raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True) elif e.data.code == 1400: raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True) else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, give up raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._sip_subscription: continue if self._xmpp_subscription is None: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'presence': subscription_state = notification.data.headers.get('Subscription-State').state if subscription_state == 'active' and self._xmpp_subscription.state != 'active': self._xmpp_subscription.accept() elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active': # The state went from active to pending, hide the presence state? pass if notification.data.body: if XMPPGatewayConfig.log_presence: log.info('SIP NOTIFY from %s to %s' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'))) self._process_pidf(notification.data.body) elif notification.name == 'SIPSubscriptionDidEnd': break - except SIPSubscriptionDidFail, e: + except SIPSubscriptionDidFail as e: if e.data.code == 0 and e.data.reason == 'rejected': self._xmpp_subscription.reject() else: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._sip_subscription) - except InterruptSubscription, e: + except InterruptSubscription as e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: notification_center.remove_observer(self, sender=self._sip_subscription) try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass - except TerminateSubscription, e: + except TerminateSubscription as e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._sip_subscription) - except SubscriptionError, e: + except SubscriptionError as e: if not e.fatal: self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval)) finally: self.subscribed = False self._sip_subscription = None self._sip_subscription_proc = None reactor.callLater(0, self.end) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification): self.end() def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification): if self._sip_subscription is not None and self._sip_subscription.state.lower() == 'active': self._xmpp_subscription.accept() _NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe diff --git a/sylk/applications/xmppgateway/xmpp/jingle/session.py b/sylk/applications/xmppgateway/xmpp/jingle/session.py index e16fe69..59b1181 100644 --- a/sylk/applications/xmppgateway/xmpp/jingle/session.py +++ b/sylk/applications/xmppgateway/xmpp/jingle/session.py @@ -1,806 +1,806 @@ import random import string from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from cStringIO import StringIO from datetime import datetime from eventlib import api, coros, proc from eventlib.twistedutil import block_on from lxml import etree from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SDPSession, SDPMediaStream, SDPConnection, SDPNegotiator from sipsimple.core import SIPCoreError from sipsimple.threading import run_in_twisted_thread from twisted.internet import reactor from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.xmlstream import TimeoutError as IqTimeoutError from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sylk.applications.xmppgateway.xmpp.jingle.util import jingle_to_sdp, sdp_to_jingle from sylk.applications.xmppgateway.xmpp.stanzas import jingle from sylk.configuration import SIPConfig def random_id(): return ''.join(random.choice(string.ascii_letters+string.digits) for x in xrange(32)) class MediaStreamDidFailError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class MediaStreamDidNotInitializeError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class Operation(object): __params__ = () def __init__(self, **params): for name, value in params.iteritems(): setattr(self, name, value) for param in set(self.__params__).difference(params): raise ValueError("missing operation parameter: '%s'" % param) self.channel = coros.queue() class AcceptOperation(Operation): __params__ = ('streams', 'is_focus') class SendRingIndicationOperation(Operation): __params__ = () class RejectOperation(Operation): __params__ = ('reason',) class EndOperation(Operation): __params__ = () class HoldOperation(Operation): __params__ = () class UnholdOperation(Operation): __params__ = () class ProcessRemoteOperation(Operation): __params__ = ('notification',) class ConnectOperation(Operation): __params__ = ('sender', 'recipient', 'streams', 'is_focus') class SendConferenceInfoOperation(Operation): __params__ = ('xml',) class JingleSession(object): implements(IObserver) jingle_stanza_timeout = 3 media_stream_timeout = 15 def __init__(self, protocol): self.account = DefaultAccount() self._protocol = protocol self._id = None self._local_identity = None self._remote_identity = None self._local_jid = None self._remote_jid = None self._channel = coros.queue() self._current_operation = None self._proc = proc.spawn(self._run) self._timer = None self._sdp_negotiator = None self._pending_transport_info_stanzas = [] self.direction = None self.state = None self.streams = None self.proposed_streams = None self.start_time = None self.end_time = None self.on_hold = False self.local_focus = False def init_incoming(self, stanza): self._id = stanza.jingle.sid self._local_identity = Identity(FrozenURI.parse(stanza.recipient)) self._remote_identity = Identity(FrozenURI.parse(stanza.sender)) self._local_jid = self._local_identity.uri.as_xmpp_jid() self._remote_jid = self._remote_identity.uri.as_xmpp_jid() remote_sdp = jingle_to_sdp(stanza.jingle) try: self._sdp_negotiator = SDPNegotiator.create_with_remote_offer(remote_sdp) - except SIPCoreError, e: + except SIPCoreError as e: self._fail(originator='local', reason='general-error', description=str(e)) return self.proposed_streams = [] for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, remote_sdp, index) except InvalidStreamError: break except UnknownStreamError: continue else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self.direction = 'incoming' self.state = 'incoming' NotificationCenter().post_notification('JingleSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams)) else: self._fail(originator='local', reason='unsupported-applications') def connect(self, sender_identity, recipient_identity, streams, is_focus=False): self._schedule_operation(ConnectOperation(sender=sender_identity, recipient=recipient_identity, streams=streams, is_focus=is_focus)) def send_ring_indication(self): self._schedule_operation(SendRingIndicationOperation()) def accept(self, streams, is_focus=False): self._schedule_operation(AcceptOperation(streams=streams, is_focus=is_focus)) def reject(self, reason='busy'): self._schedule_operation(RejectOperation(reason=reason)) def hold(self): self._schedule_operation(HoldOperation()) def unhold(self): self._schedule_operation(UnholdOperation()) def end(self): self._schedule_operation(EndOperation()) def add_stream(self): raise NotImplementedError def remove_stream(self): raise NotImplementedError @property def id(self): return self._id @property def local_identity(self): return self._local_identity @property def remote_identity(self): return self._remote_identity @run_in_twisted_thread def _send_conference_info(self, xml): # This function is not meant for users to call, entities with knowledge about JingleSession # internals will call it, such as the MediaSessionHandler self._schedule_operation(SendConferenceInfoOperation(xml=xml)) def _send_stanza(self, stanza): if self.direction == 'incoming': stanza.jingle.initiator = unicode(self._remote_jid) stanza.jingle.responder = unicode(self._local_jid) else: stanza.jingle.initiator = unicode(self._local_jid) stanza.jingle.responder = unicode(self._remote_jid) stanza.timeout = self.jingle_stanza_timeout return self._protocol.request(stanza) def _fail(self, originator='local', reason='general-error', description=None): reason = jingle.Reason(jingle.ReasonType(reason), text=description) stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason) self._send_stanza(stanza) self.state = 'terminated' failure_str = '%s%s' % (reason, ' %s' % description if description else '') NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=failure_str)) self._channel.send_exception(proc.ProcExit) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPStreamDidEnableEncryption(self, notification): if notification.sender.type != 'audio': return audio_stream = notification.sender if audio_stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: video_stream = next(stream for stream in self.streams or [] if stream.type=='video') except StopIteration: return if video_stream.encryption.type == 'ZRTP' and not video_stream.encryption.active: video_stream.encryption.zrtp._enable(audio_stream) def _NH_MediaStreamDidStart(self, notification): stream = notification.sender if stream.type == 'audio' and stream.encryption.type == 'ZRTP': stream.encryption.zrtp._enable() elif stream.type == 'video' and stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: audio_stream = next(stream for stream in self.streams or [] if stream.type=='audio') except StopIteration: pass else: if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active: stream.encryption.zrtp._enable(audio_stream) if self._current_operation is not None: self._current_operation.channel.send(notification) def _NH_MediaStreamDidInitialize(self, notification): if self._current_operation is not None: self._current_operation.channel.send(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if self._current_operation is not None: self._current_operation.channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data)) def _NH_MediaStreamDidFail(self, notification): if self._current_operation is not None: self._current_operation.channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) else: self.end() def _NH_XMPPGotJingleSessionAccept(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) def _NH_XMPPGotJingleSessionTerminate(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) def _NH_XMPPGotJingleSessionInfo(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) def _NH_XMPPGotJingleDescriptionInfo(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) def _NH_XMPPGotJingleTransportInfo(self, notification): self._schedule_operation(ProcessRemoteOperation(notification=notification)) # Operation handling @run_in_twisted_thread def _schedule_operation(self, operation): self._channel.send(operation) def _run(self): while True: self._current_operation = op = self._channel.wait() try: handler = getattr(self, '_OH_%s' % op.__class__.__name__) handler(op) except BaseException: self._proc = None raise finally: self._current_operation = None def _OH_AcceptOperation(self, operation): if self.state != 'incoming': return notification_center = NotificationCenter() settings = SIPSimpleSettings() streams = operation.streams for stream in self.proposed_streams: if stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 remote_sdp = self._sdp_negotiator.current_remote local_ip = SIPConfig.local_ip.normalized local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) else: media = SDPMediaStream.new(media) media.port = 0 media.attributes = [] local_sdp.media.append(media) try: self._sdp_negotiator.set_local_answer(local_sdp) self._sdp_negotiator.negotiate() - except SIPCoreError, e: + except SIPCoreError as e: self._fail(originator='local', reason='incompatible-parameters', description=str(e)) return self.local_focus = operation.is_focus notification_center.post_notification('JingleSessionWillStart', sender=self) # Get active SDPs (negotiator may make changes) local_sdp = self._sdp_negotiator.active_local remote_sdp = self._sdp_negotiator.active_remote # Build the payload and send it over payload = sdp_to_jingle(local_sdp) payload.sid = self._id if self.local_focus: payload.conference_info = jingle.ConferenceInfo(True) stanza = self._protocol.sessionAccept(self._local_jid, self._remote_jid, payload) d = self._send_stanza(stanza) block_on(d) wait_count = 0 stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map.get(index, None) if stream is not None: if remote_media.port: wait_count += 1 stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 - except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError, IqTimeoutError, StanzaError), e: + except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError, IqTimeoutError, StanzaError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' elif isinstance(e, IqTimeoutError): error = 'timeout sending IQ stanza' elif isinstance(e, StanzaError): error = str(e.condition) else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', reason='failed-application', description=error) else: self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = datetime.now() notification_center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams)) def _OH_ConnectOperation(self, operation): if self.state is not None: return settings = SIPSimpleSettings() notification_center = NotificationCenter() self.direction = 'outgoing' self.state = 'connecting' self.proposed_streams = operation.streams self.local_focus = operation.is_focus self._id = random_id() self._local_identity = operation.sender self._remote_identity = operation.recipient self._local_jid = self._local_identity.uri.as_xmpp_jid() self._remote_jid = self._remote_identity.uri.as_xmpp_jid() notification_center.post_notification('JingleSessionNewOutgoing', self, NotificationData(streams=operation.streams)) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 # Build local SDP and negotiator local_ip = SIPConfig.local_ip.normalized local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent) for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) local_sdp.media.append(media) self._sdp_negotiator = SDPNegotiator.create_with_local_offer(local_sdp) # Build the payload and send it over payload = sdp_to_jingle(local_sdp) payload.sid = self._id if self.local_focus: payload.conference_info = jingle.ConferenceInfo(True) stanza = self._protocol.sessionInitiate(self._local_jid, self._remote_jid, payload) d = self._send_stanza(stanza) block_on(d) - except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, IqTimeoutError, StanzaError, SIPCoreError), e: + except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, IqTimeoutError, StanzaError, SIPCoreError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, IqTimeoutError): error = 'timeout sending IQ stanza' elif isinstance(e, StanzaError): error = str(e.condition) elif isinstance(e, SIPCoreError): error = str(e) else: error = 'media stream failed: %s' % e.data.reason self.state = 'terminated' NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=error)) self._channel.send_exception(proc.ProcExit) else: self._timer = reactor.callLater(settings.sip.invite_timeout, self.end) def _OH_RejectOperation(self, operation): if self.state != 'incoming': return reason = jingle.Reason(jingle.ReasonType(operation.reason)) stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason) self._send_stanza(stanza) self.state = 'terminated' self._channel.send_exception(proc.ProcExit) def _OH_EndOperation(self, operation): if self.state not in ('connecting', 'connected'): return if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None prev_state = self.state self.state = 'terminating' notification_center = NotificationCenter() notification_center.post_notification('JingleSessionWillEnd', self) streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() if prev_state == 'connected': reason = jingle.Reason(jingle.ReasonType('success')) else: reason = jingle.Reason(jingle.ReasonType('cancel')) stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason) self._send_stanza(stanza) self.state = 'terminated' if prev_state == 'connected': self.end_time = datetime.now() notification_center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='local')) else: notification_center.post_notification('JingleSessionDidFail', self, NotificationData(originator='local', reason='cancel')) for stream in streams: stream.end() self._channel.send_exception(proc.ProcExit) def _OH_SendRingIndicationOperation(self, operation): if self.state != 'incoming': return stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('ringing')) self._send_stanza(stanza) def _OH_HoldOperation(self, operation): if self.state != 'connected': return if self.on_hold: return self.on_hold = True for stream in self.streams: stream.hold() stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('hold')) self._send_stanza(stanza) NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=False)) def _OH_UnholdOperation(self, operation): if self.state != 'connected': return if not self.on_hold: return self.on_hold = False for stream in self.streams: stream.unhold() stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('unhold')) self._send_stanza(stanza) NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) def _OH_SendConferenceInfoOperation(self, operation): if self.state != 'connected': return if not self.local_focus: return tree = etree.parse(StringIO(operation.xml)) tree.getroot().attrib['sid'] = self._id # FIXME: non-standard, but Jitsi does it data = etree.tostring(tree, xml_declaration=False) # Strip the XML heading stanza = jingle.ConferenceInfoIq(sender=self._local_jid, recipient=self._remote_jid, payload=data) stanza.timeout = self.jingle_stanza_timeout self._protocol.request(stanza) def _OH_ProcessRemoteOperation(self, operation): notification = operation.notification stanza = notification.data.stanza if notification.name == 'XMPPGotJingleSessionTerminate': if self.state not in ('incoming', 'connecting', 'connected_pending_accept', 'connected'): return if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None # Session ended remotely prev_state = self.state self.state = 'terminated' if prev_state == 'incoming': reason = stanza.jingle.reason.value if stanza.jingle.reason else 'cancel' notification.center.post_notification('JingleSessionDidFail', self, NotificationData(originator='remote', reason=reason)) else: notification.center.post_notification('JingleSessionWillEnd', self, NotificationData(originator='remote')) streams = self.proposed_streams if prev_state == 'connecting' else self.streams for stream in streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.end_time = datetime.now() notification.center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='remote')) self._channel.send_exception(proc.ProcExit) elif notification.name == 'XMPPGotJingleSessionInfo': info = stanza.jingle.info if not info: return if info == 'ringing': if self.state not in ('connecting', 'connected_pending_accept'): return notification.center.post_notification('JingleSessionGotRingIndication', self) elif info in ('hold', 'unhold'): if self.state != 'connected': return notification.center.post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=info=='hold', partial=False)) elif notification.name == 'XMPPGotJingleDescriptionInfo': if self.state != 'connecting': return # Add candidates acquired on transport-info stanzas for s in self._pending_transport_info_stanzas: for c in s.jingle.content: content = next(content for content in stanza.jingle.content if content.name == c.name) content.transport.candidates.extend(c.transport.candidates) if isinstance(content.transport, jingle.IceUdpTransport): if not content.transport.ufrag and c.transport.ufrag: content.transport.ufrag = c.transport.ufrag if not content.transport.password and c.transport.password: content.transport.password = c.transport.password remote_sdp = jingle_to_sdp(stanza.jingle) try: self._sdp_negotiator.set_remote_answer(remote_sdp) self._sdp_negotiator.negotiate() except SIPCoreError: # The description-info stanza may have been just a parameter change, not a full 'SDP' return if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None del self._pending_transport_info_stanzas[:] # Get active SDPs (negotiator may make changes) local_sdp = self._sdp_negotiator.active_local remote_sdp = self._sdp_negotiator.active_remote notification.center.post_notification('JingleSessionWillStart', sender=self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification.center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification.center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() try: with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 - except (MediaStreamDidFailError, api.TimeoutError), e: + except (MediaStreamDidFailError, api.TimeoutError) as e: for stream in self.proposed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', reason='failed-application', description=error) else: self.state = 'connected_pending_accept' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = datetime.now() # Hold the streams to prevent real RTP from flowing for stream in self.streams: stream.hold() elif notification.name == 'XMPPGotJingleSessionAccept': if self.state not in ('connecting', 'connected_pending_accept'): return if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None if self.state == 'connected_pending_accept': # We already negotiated ICE and media is 'flowing' (not really because streams are on hold) # unhold the streams and pretend the session just started for stream in self.streams: stream.unhold() self.state = 'connected' notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams)) return # Add candidates acquired on transport-info stanzas for s in self._pending_transport_info_stanzas: for c in s.jingle.content: content = next(content for content in stanza.jingle.content if content.name == c.name) content.transport.candidates.extend(c.transport.candidates) if isinstance(content.transport, jingle.IceUdpTransport): if not content.transport.ufrag and c.transport.ufrag: content.transport.ufrag = c.transport.ufrag if not content.transport.password and c.transport.password: content.transport.password = c.transport.password del self._pending_transport_info_stanzas[:] remote_sdp = jingle_to_sdp(stanza.jingle) try: self._sdp_negotiator.set_remote_answer(remote_sdp) self._sdp_negotiator.negotiate() - except SIPCoreError, e: + except SIPCoreError as e: for stream in self.proposed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', reason='incompatible-parameters', description=str(e)) return # Get active SDPs (negotiator may make changes) local_sdp = self._sdp_negotiator.active_local remote_sdp = self._sdp_negotiator.active_remote notification.center.post_notification('JingleSessionWillStart', sender=self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification.center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification.center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() try: with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = operation.channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 - except (MediaStreamDidFailError, api.TimeoutError), e: + except (MediaStreamDidFailError, api.TimeoutError) as e: for stream in self.proposed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', reason='failed-application', description=error) else: self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = datetime.now() notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams)) elif notification.name == 'XMPPGotJingleTransportInfo': if self.state != 'connecting': # ICE trickling not supported yet, so only accept candidates before accept return self._pending_transport_info_stanzas.append(stanza) class JingleSessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='JingleSessionNewIncoming') notification_center.add_observer(self, name='JingleSessionNewOutgoing') notification_center.add_observer(self, name='JingleSessionDidFail') notification_center.add_observer(self, name='JingleSessionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='JingleSessionNewIncoming') notification_center.remove_observer(self, name='JingleSessionNewOutgoing') notification_center.remove_observer(self, name='JingleSessionDidFail') notification_center.remove_observer(self, name='JingleSessionDidEnd') def handle_notification(self, notification): if notification.name in ('JingleSessionNewIncoming', 'JingleSessionNewOutgoing'): session = notification.sender self.sessions[session.id] = session elif notification.name in ('JingleSessionDidFail', 'JingleSessionDidEnd'): session = notification.sender del self.sessions[session.id] diff --git a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py index 1cdd6cc..f66fb7d 100644 --- a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py +++ b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py @@ -1,439 +1,439 @@ """ Handling of RTP media streams according to RFC3550, RFC3605, RFC3581, RFC2833 and RFC3711, RFC3489 and draft-ietf-mmusic-ice-19. """ __all__ = ['AudioStream'] from threading import RLock from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from zope.interface import implements from sipsimple.audio import AudioBridge, AudioDevice, IAudioPort from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioTransport, PJSIPError, RTPTransport, SIPCoreError from sipsimple.streams.rtp import RTPStreamEncryption from sylk.applications.xmppgateway.xmpp.jingle.streams import IMediaStream, InvalidStreamError, MediaStreamRegistrar, UnknownStreamError class AudioStream(object): __metaclass__ = MediaStreamRegistrar implements(IMediaStream, IAudioPort, IObserver) type = 'audio' priority = 1 hold_supported = True def __init__(self): from sipsimple.application import SIPApplication self.mixer = SIPApplication.voice_audio_mixer self.bridge = AudioBridge(self.mixer) self.device = AudioDevice(self.mixer) self.notification_center = NotificationCenter() self.on_hold_by_local = False self.on_hold_by_remote = False self.direction = None self.state = 'NULL' self._transport = None self._hold_request = None self._ice_state = 'NULL' self._lock = RLock() self._rtp_transport = None self.session = None self.encryption = RTPStreamEncryption(self) self._srtp_encryption = None self._try_ice = False self._initialized = False self._done = False self._failure_reason = None self.bridge.add(self.device) # Audio properties # @property def codec(self): return self._transport.codec if self._transport else None @property def consumer_slot(self): return self._transport.slot if self._transport else None @property def producer_slot(self): return self._transport.slot if self._transport and not self.muted else None @property def sample_rate(self): return self._transport.sample_rate if self._transport else None @property def statistics(self): return self._transport.statistics if self._transport else None def _get_muted(self): return self.__dict__.get('muted', False) def _set_muted(self, value): if not isinstance(value, bool): raise ValueError('illegal value for muted property: %r' % (value,)) if value == self.muted: return old_producer_slot = self.producer_slot self.__dict__['muted'] = value notification_center = NotificationCenter() data = NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot) notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=data) muted = property(_get_muted, _set_muted) del _get_muted, _set_muted # RTP properties # @property def local_rtp_address(self): return self._rtp_transport.local_rtp_address if self._rtp_transport else None @property def local_rtp_port(self): return self._rtp_transport.local_rtp_port if self._rtp_transport else None @property def remote_rtp_address(self): if self._ice_state == 'IN_USE': return self._rtp_transport.remote_rtp_address_received if self._rtp_transport else None else: return self._rtp_transport.remote_rtp_address_sdp if self._rtp_transport else None @property def remote_rtp_port(self): if self._ice_state == 'IN_USE': return self._rtp_transport.remote_rtp_port_received if self._rtp_transport else None else: return self._rtp_transport.remote_rtp_port_sdp if self._rtp_transport else None @property def local_rtp_candidate_type(self): return self._rtp_transport.local_rtp_candidate_type if self._rtp_transport else None @property def remote_rtp_candidate_type(self): return self._rtp_transport.remote_rtp_candidate_type if self._rtp_transport else None @property def ice_active(self): return self._ice_state == 'IN_USE' # Generic properties # @property def on_hold(self): return self.on_hold_by_local or self.on_hold_by_remote # Public methods # @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): # TODO: actually validate the SDP settings = SIPSimpleSettings() remote_stream = remote_sdp.media[stream_index] if remote_stream.media != 'audio': raise UnknownStreamError if remote_stream.transport not in ('RTP/AVP', 'RTP/SAVP'): raise InvalidStreamError('expected RTP/AVP or RTP/SAVP transport in audio stream, got %s' % remote_stream.transport) local_encryption_policy = 'sdes_optional' if local_encryption_policy == 'sdes_mandatory' and not 'crypto' in remote_stream.attributes: raise InvalidStreamError("SRTP/SDES is locally mandatory but it's not remotely enabled") if remote_stream.transport == 'RTP/SAVP' and 'crypto' in remote_stream.attributes and local_encryption_policy not in ('opportunistic', 'sdes_optional', 'sdes_mandatory'): raise InvalidStreamError("SRTP/SDES is remotely mandatory but it's not locally enabled") supported_codecs = session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs): raise InvalidStreamError('no compatible codecs found') stream = cls() stream._incoming_remote_sdp = remote_sdp stream._incoming_stream_index = stream_index if 'zrtp-hash' in remote_stream.attributes: stream._incoming_stream_encryption = 'zrtp' elif 'crypto' in remote_stream.attributes: stream._incoming_stream_encryption = 'sdes_mandatory' if remote_stream.transport == 'RTP/SAVP' else 'sdes_optional' else: stream._incoming_stream_encryption = None return stream def initialize(self, session, direction): with self._lock: if self.state != 'NULL': raise RuntimeError('AudioStream.initialize() may only be called in the NULL state') self.state = 'INITIALIZING' self.session = session local_encryption_policy = 'sdes_optional' if hasattr(self, '_incoming_remote_sdp'): # ICE attributes could come at the session level or at the media level remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index] self._try_ice = (remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates if self._incoming_stream_encryption is not None and local_encryption_policy == 'opportunistic': self._srtp_encryption = self._incoming_stream_encryption else: self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy del self._incoming_stream_encryption else: self._try_ice = True self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy self._init_rtp_transport() def get_local_media(self, remote_sdp=None, index=0): with self._lock: if self.state not in ['INITIALIZED', 'WAIT_ICE', 'ESTABLISHED']: raise RuntimeError('AudioStream.get_local_media() may only be called in the INITIALIZED, WAIT_ICE or ESTABLISHED states') if remote_sdp is None: # offer old_direction = self._transport.direction if old_direction is None: new_direction = 'sendrecv' elif 'send' in old_direction: new_direction = ('sendonly' if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else 'sendrecv') else: new_direction = ('inactive' if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else 'recvonly') else: new_direction = None return self._transport.get_local_media(remote_sdp, index, new_direction) def start(self, local_sdp, remote_sdp, stream_index): with self._lock: if self.state != 'INITIALIZED': raise RuntimeError('AudioStream.start() may only be called in the INITIALIZED state') settings = SIPSimpleSettings() self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout) self._check_hold(self._transport.direction, True) if self._try_ice: self.state = 'WAIT_ICE' else: self.state = 'ESTABLISHED' self.notification_center.post_notification('MediaStreamDidStart', sender=self) def validate_update(self, remote_sdp, stream_index): with self._lock: # TODO: implement return True def update(self, local_sdp, remote_sdp, stream_index): with self._lock: connection = remote_sdp.media[stream_index].connection or remote_sdp.connection if not self._rtp_transport.ice_active and (connection.address != self._rtp_transport.remote_rtp_address_sdp or self._rtp_transport.remote_rtp_port_sdp != remote_sdp.media[stream_index].port): settings = SIPSimpleSettings() old_consumer_slot = self.consumer_slot old_producer_slot = self.producer_slot self.notification_center.remove_observer(self, sender=self._transport) self._transport.stop() try: self._transport = AudioTransport(self.mixer, self._rtp_transport, remote_sdp, stream_index, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) - except SIPCoreError, e: + except SIPCoreError as e: self.state = 'ENDED' self._failure_reason = e.args[0] self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=self._failure_reason)) return self.notification_center.add_observer(self, sender=self._transport) self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout) self.notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=True, old_consumer_slot=old_consumer_slot, new_consumer_slot=self.consumer_slot, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot)) if connection.address == '0.0.0.0' and remote_sdp.media[stream_index].direction == 'sendrecv': self._transport.update_direction('recvonly') self._check_hold(self._transport.direction, False) self.notification_center.post_notification('RTPStreamDidChangeRTPParameters', sender=self) else: new_direction = local_sdp.media[stream_index].direction self._transport.update_direction(new_direction) self._check_hold(new_direction, False) self._hold_request = None def hold(self): with self._lock: if self.on_hold_by_local or self._hold_request == 'hold': return if self.state == 'ESTABLISHED' and self.direction != 'inactive': self.bridge.remove(self) self._hold_request = 'hold' def unhold(self): with self._lock: if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold': return if self.state == 'ESTABLISHED' and self._hold_request == 'hold': self.bridge.add(self) self._hold_request = None if self._hold_request == 'hold' else 'unhold' def deactivate(self): with self._lock: self.bridge.stop() def end(self): with self._lock: if not self._initialized or self._done: return self._done = True self.notification_center.post_notification('MediaStreamWillEnd', sender=self) if self._transport is not None: self._transport.stop() self.notification_center.remove_observer(self, sender=self._transport) self._transport = None self.notification_center.remove_observer(self, sender=self._rtp_transport) self._rtp_transport = None self.state = 'ENDED' self.notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason)) self.session = None def reset(self, stream_index): with self._lock: if self.direction == 'inactive' and not self.on_hold_by_local: new_direction = 'sendrecv' self._transport.update_direction(new_direction) self._check_hold(new_direction, False) # TODO: do a full reset, re-creating the AudioTransport, so that a new offer # would contain all codecs and ICE would be renegotiated -Saul def send_dtmf(self, digit): with self._lock: if self.state != 'ESTABLISHED': raise RuntimeError('AudioStream.send_dtmf() cannot be used in %s state' % self.state) try: self._transport.send_dtmf(digit) - except PJSIPError, e: + except PJSIPError as e: if not e.args[0].endswith('(PJ_ETOOMANY)'): raise # Notification handling # def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPTransportDidFail(self, notification): with self._lock: self.notification_center.remove_observer(self, sender=notification.sender) if self.state == 'ENDED': return self._try_next_rtp_transport(notification.data.reason) def _NH_RTPTransportDidInitialize(self, notification): settings = SIPSimpleSettings() rtp_transport = notification.sender with self._lock: if self.state == 'ENDED': return del self._rtp_args del self._stun_servers try: if hasattr(self, '_incoming_remote_sdp'): try: audio_transport = AudioTransport(self.mixer, rtp_transport, self._incoming_remote_sdp, self._incoming_stream_index, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) finally: del self._incoming_remote_sdp del self._incoming_stream_index else: audio_transport = AudioTransport(self.mixer, rtp_transport, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list)) - except SIPCoreError, e: + except SIPCoreError as e: self.state = "ENDED" self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=e.args[0])) return self._rtp_transport = rtp_transport self._transport = audio_transport self.notification_center.add_observer(self, sender=audio_transport) self._initialized = True self.state = 'INITIALIZED' self.notification_center.post_notification('MediaStreamDidInitialize', sender=self) def _NH_RTPAudioStreamGotDTMF(self, notification): self.notification_center.post_notification('AudioStreamGotDTMF', sender=self, data=NotificationData(digit=notification.data.digit)) def _NH_RTPAudioTransportDidTimeout(self, notification): self.notification_center.post_notification('RTPStreamDidTimeout', sender=self) def _NH_RTPTransportICENegotiationStateDidChange(self, notification): with self._lock: if self._ice_state != 'NULL' or self.state not in ('INITIALIZING', 'INITIALIZED', 'WAIT_ICE'): return self.notification_center.post_notification('RTPStreamICENegotiationStateDidChange', sender=self, data=notification.data) def _NH_RTPTransportICENegotiationDidSucceed(self, notification): with self._lock: if self.state != 'WAIT_ICE': return self._ice_state = 'IN_USE' self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidSucceed', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) def _NH_RTPTransportICENegotiationDidFail(self, notification): with self._lock: if self.state != 'WAIT_ICE': return self._ice_state = 'FAILED' self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidFail', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) # Private methods # def _init_rtp_transport(self, stun_servers=None): self._rtp_args = dict() self._rtp_args['encryption'] = self._srtp_encryption self._rtp_args['use_ice'] = self._try_ice self._stun_servers = [(None, None)] if stun_servers: self._stun_servers.extend(reversed(stun_servers)) self._try_next_rtp_transport() def _try_next_rtp_transport(self, failure_reason=None): if self._stun_servers: stun_address, stun_port = self._stun_servers.pop() rtp_transport = None try: rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args) self.notification_center.add_observer(self, sender=rtp_transport) rtp_transport.set_INIT() - except SIPCoreError, e: + except SIPCoreError as e: if rtp_transport is not None: self.notification_center.remove_observer(self, sender=rtp_transport) self._try_next_rtp_transport(e.args[0]) else: self.state = 'ENDED' self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=failure_reason)) def _check_hold(self, direction, is_initial): was_on_hold_by_local = self.on_hold_by_local was_on_hold_by_remote = self.on_hold_by_remote was_inactive = self.direction == 'inactive' self.direction = direction inactive = self.direction == 'inactive' self.on_hold_by_local = was_on_hold_by_local if inactive else direction == 'sendonly' self.on_hold_by_remote = 'send' not in direction if (is_initial or was_on_hold_by_local or was_inactive) and not inactive and not self.on_hold_by_local and self._hold_request != 'hold': self.bridge.add(self) if not was_on_hold_by_local and self.on_hold_by_local: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='local', on_hold=True)) if was_on_hold_by_local and not self.on_hold_by_local: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='local', on_hold=False)) if not was_on_hold_by_remote and self.on_hold_by_remote: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='remote', on_hold=True)) if was_on_hold_by_remote and not self.on_hold_by_remote: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='remote', on_hold=False)) diff --git a/sylk/bonjour.py b/sylk/bonjour.py index b7918c8..4e948cb 100644 --- a/sylk/bonjour.py +++ b/sylk/bonjour.py @@ -1,256 +1,256 @@ import uuid from application import log from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib import api, coros, proc from eventlib.green import select from sipsimple.account.bonjour import _bonjour, BonjourPresenceState, BonjourRegistrationFile from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.threading import call_in_twisted_thread, run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from threading import Lock from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount class RestartSelect(Exception): pass class BonjourService(object): implements(IObserver) def __init__(self, service='sipfocus', name='SylkServer', uri_user=None, is_focus=True): self.account = DefaultAccount() self.service = service self.name = name self.uri_user = uri_user self.is_focus = is_focus self.id = str(uuid.uuid4()) self._stopped = True self._files = [] self._command_channel = coros.queue() self._select_proc = None self._register_timer = None self._update_timer = None self._lock = Lock() self.__dict__['presence_state'] = None @run_in_green_thread def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='NetworkConditionsDidChange') self._select_proc = proc.spawn(self._process_files) proc.spawn(self._handle_commands) self._activate() @run_in_green_thread def stop(self): self._deactivate() notification_center = NotificationCenter() notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._select_proc.kill() self._command_channel.send_exception(api.GreenletExit) def _activate(self): self._stopped = False self._command_channel.send(Command('register')) def _deactivate(self): command = Command('stop') self._command_channel.send(command) command.wait() self._stopped = True def restart_registration(self): self._command_channel.send(Command('unregister')) self._command_channel.send(Command('register')) def update_registrations(self): self._command_channel.send(Command('update_registrations')) def _get_presence_state(self): return self.__dict__['presence_state'] def _set_presence_state(self, state): if state is not None and not isinstance(state, BonjourPresenceState): raise ValueError("state must be a %s instance or None" % BonjourPresenceState.__name__) with self._lock: old_state = self.__dict__['presence_state'] self.__dict__['presence_state'] = state if state != old_state: call_in_twisted_thread(self.update_registrations) presence_state = property(_get_presence_state, _set_presence_state) del _get_presence_state, _set_presence_state def _register_cb(self, file, flags, error_code, name, regtype, domain): notification_center = NotificationCenter() file = BonjourRegistrationFile.find_by_file(file) if error_code == _bonjour.kDNSServiceErr_NoError: notification_center.post_notification('BonjourServiceRegistrationDidSucceed', sender=self, data=NotificationData(name=name, transport=file.transport)) else: error = _bonjour.BonjourError(error_code) notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, data=NotificationData(reason=str(error), transport=file.transport)) self._files.remove(file) self._select_proc.kill(RestartSelect) file.close() if self._register_timer is None: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register')) def _process_files(self): while True: try: ready = select.select([f for f in self._files if not f.active and not f.closed], [], [])[0] except RestartSelect: continue else: for file in ready: file.active = True self._command_channel.send(Command('process_results', files=[f for f in ready if not f.closed])) def _handle_commands(self): while True: command = self._command_channel.wait() if not self._stopped: handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_unregister(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile)): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() def _CH_register(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None supported_transports = set(transport for transport in settings.sip.transport_list if transport!='tls' or self.account.tls.certificate is not None) registered_transports = set(file.transport for file in self._files if isinstance(file, BonjourRegistrationFile)) missing_transports = supported_transports - registered_transports added_transports = set() for transport in missing_transports: notification_center.post_notification('BonjourServiceWillRegister', sender=self, data=NotificationData(transport=transport)) try: contact_uri = self.account.contact[transport] contact_uri.user = self.uri_user if self.is_focus: contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri), instance_id=self.id) state = self.presence_state if state is not None: txtdata['state'] = state.state txtdata['note'] = state.note.encode('utf-8') file = _bonjour.DNSServiceRegister(name=str(contact_uri), regtype="_%s._%s" % (self.service, transport if transport == 'udp' else 'tcp'), port=contact_uri.port, callBack=self._register_cb, txtRecord=_bonjour.TXTRecord(items=txtdata)) - except (_bonjour.BonjourError, KeyError), e: + except (_bonjour.BonjourError, KeyError) as e: notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, data=NotificationData(reason=str(e), transport=transport)) else: self._files.append(BonjourRegistrationFile(file, transport)) added_transports.add(transport) if added_transports: self._select_proc.kill(RestartSelect) if added_transports != missing_transports: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register', command.event)) else: command.signal() def _CH_update_registrations(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None available_transports = settings.sip.transport_list old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile) and f.transport not in available_transports): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() update_failure = False for file in (f for f in self._files if isinstance(f, BonjourRegistrationFile)): try: contact_uri = self.account.contact[file.transport] contact_uri.user = self.uri_user if self.is_focus: contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri), instance_id=self.id) state = self.presence_state if state is not None: txtdata['state'] = state.state txtdata['note'] = state.note.encode('utf-8') _bonjour.DNSServiceUpdateRecord(file.file, None, flags=0, rdata=_bonjour.TXTRecord(items=txtdata), ttl=0) - except (_bonjour.BonjourError, KeyError), e: + except (_bonjour.BonjourError, KeyError) as e: notification_center.post_notification('BonjourServiceRegistrationUpdateDidFail', sender=self, data=NotificationData(reason=str(e), transport=file.transport)) update_failure = True self._command_channel.send(Command('register')) if update_failure: self._update_timer = reactor.callLater(1, self._command_channel.send, Command('update_registrations', command.event)) else: command.signal() def _CH_process_results(self, command): for file in (f for f in command.files if not f.closed): try: _bonjour.DNSServiceProcessResult(file.file) except: # Should we close the file? The documentation doesn't say anything about this. -Luci log.exception() for file in command.files: file.active = False self._files = [f for f in self._files if not f.closed] self._select_proc.kill(RestartSelect) def _CH_stop(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None old_files = self._files self._files = [] self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_NetworkConditionsDidChange(self, notification): if self._files: self.restart_registration() diff --git a/sylk/server.py b/sylk/server.py index 4a6cf6f..69583bc 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,256 +1,256 @@ import os import sys from threading import Event from uuid import uuid4 from application import log from application.notification import NotificationCenter from application.python import Null from application.system import makedirs from eventlib import proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer from sipsimple.lookup import DNSManager from sipsimple.storage import MemoryStorage from sipsimple.threading import ThreadManager from sipsimple.threading.green import run_in_green_thread from sipsimple.video import VideoDevice from twisted.internet import reactor # Load stream extensions needed for integration with SIP SIMPLE SDK import sylk.streams del sylk.streams from sylk.accounts import DefaultAccount from sylk.applications import IncomingRequestHandler from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import TraceLogManager from sylk.session import SessionManager from sylk.web import WebServer class SylkServer(SIPApplication): def __init__(self): self.request_handler = Null self.thor_interface = Null self.web_server = Null self.options = Null self.stopping_event = Event() self.stop_event = Event() self.failed = False def start(self, options): self.options = options if self.options.enable_bonjour: ServerConfig.enable_bonjour = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, name='ThorNetworkGotFatalError') Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: super(SylkServer, self).start(MemoryStorage()) - except Exception, e: + except Exception as e: log.fatal('Error starting SIP Application: %s' % e) sys.exit(1) def _initialize_core(self): # SylkServer needs to listen for extra events and request types notification_center = NotificationCenter() settings = SIPSimpleSettings() # initialize core options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP detect_sip_loops=False, udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # video video_codecs=list(settings.rtp.video_codec_list), enable_colorbar_device=True, # logging log_level=settings.logs.pjsip_level if settings.logs.trace_pjsip else 0, trace_sip=settings.logs.trace_sip, # events and requests to handle events={'conference': ['application/conference-info+xml'], 'presence': ['application/pidf+xml'], 'refer': ['message/sipfrag;version=2.0']}, incoming_events={'conference', 'presence'}, incoming_requests={'MESSAGE'}) notification_center.add_observer(self, sender=self.engine) self.engine.start(**options) @run_in_green_thread def _initialize_subsystems(self): notification_center = NotificationCenter() with self._lock: stop_pending = self._stop_pending if stop_pending: self.state = 'stopping' if stop_pending: notification_center.post_notification('SIPApplicationWillEnd', sender=self) reactor.stop() return account_manager = AccountManager() dns_manager = DNSManager() session_manager = SessionManager() settings = SIPSimpleSettings() # Initialize default account default_account = DefaultAccount() account_manager.default_account = default_account # initialize TLS self._initialize_tls() # initialize PJSIP internal resolver self.engine.set_nameservers(dns_manager.nameservers) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize video objects self.video_device = VideoDevice(u'Colorbar generator', settings.video.resolution, settings.video.framerate) # initialize instance id settings.instance_id = uuid4().urn settings.save() # initialize ZRTP cache makedirs(ServerConfig.spool_dir.normalized) self.engine.zrtp_cache = os.path.join(ServerConfig.spool_dir.normalized, 'zrtp.db') # initialize middleware components dns_manager.start() account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self) # start SylkServer components self.web_server = WebServer() self.web_server.start() self.request_handler = IncomingRequestHandler() self.request_handler.start() if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode self.thor_interface = ConferenceNode() thor_roles = [] if 'conference' in self.request_handler.application_registry: thor_roles.append('conference_server') if 'xmppgateway' in self.request_handler.application_registry: thor_roles.append('xmpp_gateway') if 'webrtcgateway' in self.request_handler.application_registry: thor_roles.append('webrtc_gateway') self.thor_interface.start(thor_roles) @run_in_green_thread def _shutdown_subsystems(self): dns_manager = DNSManager() account_manager = AccountManager() session_manager = SessionManager() # terminate all sessions p = proc.spawn(session_manager.stop) p.wait() # shutdown SylkServer components procs = [proc.spawn(self.web_server.stop), proc.spawn(self.request_handler.stop), proc.spawn(self.thor_interface.stop)] proc.waitall(procs) # shutdown other middleware components procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop)] proc.waitall(procs) # shutdown engine self.engine.stop() self.engine.join(timeout=5) # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _NH_AudioDevicesDidChange(self, notification): pass def _NH_DefaultAudioDeviceDidChange(self, notification): pass def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal('Could not set TLS options: %s' % notification.data.error) sys.exit(1) def _NH_SIPApplicationWillStart(self, notification): tracelog_manager = TraceLogManager() tracelog_manager.start() def _NH_SIPApplicationDidStart(self, notification): settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.info('SylkServer started, listening on:') for transport in settings.sip.transport_list: try: log.info(' %s:%d (%s)' % (local_ip, getattr(self.engine, '%s_port' % transport), transport.upper())) except TypeError: pass def _NH_SIPApplicationWillEnd(self, notification): self.stopping_event.set() def _NH_SIPApplicationDidEnd(self, notification): log.info('SIP application ended') tracelog_manager = TraceLogManager() tracelog_manager.stop() if not self.stopping_event.is_set(): log.warning('SIP application ended without shutting down all subsystems') self.stopping_event.set() self.stop_event.set() def _NH_SIPApplicationGotFatalError(self, notification): log.error('An exception occurred within the SIP core:\n%s\n' % notification.data.traceback) self.failed = True def _NH_SIPEngineDidFail(self, notification): log.error('SIP engine failed') self.failed = True super(SylkServer, self)._NH_SIPEngineDidFail(notification) def _NH_ThorNetworkGotFatalError(self, notification): log.error("All Thor Event Servers have unrecoverable errors.") diff --git a/sylk/session.py b/sylk/session.py index c336447..0f52841 100644 --- a/sylk/session.py +++ b/sylk/session.py @@ -1,1978 +1,1978 @@ import random from threading import RLock from time import time from application import log from application.notification import IObserver, Notification, NotificationCenter, NotificationData from application.python import Null, limit from application.python.decorator import decorator, preserve_signature from application.python.types import Singleton from application.system import host from eventlib import api, coros, proc from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, Invitation, Subscription, SIPCoreError, SIPCoreInvalidStateError, PJSIPError, sip_status_messages from sipsimple.core import ContactHeader, RouteHeader, FromHeader, ToHeader, ReasonHeader, WarningHeader from sipsimple.core import SIPURI, SDPConnection, SDPSession, SDPMediaStream from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import ParserError from sipsimple.payloads.conference import ConferenceDocument from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.configuration import SIPConfig class InvitationDisconnectedError(Exception): def __init__(self, invitation, data): self.invitation = invitation self.data = data class MediaStreamDidNotInitializeError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class MediaStreamDidFailError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class SubscriptionError(Exception): def __init__(self, error, timeout, **attributes): self.error = error self.timeout = timeout self.attributes = attributes class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class IllegalStateError(RuntimeError): pass @decorator def transition_state(required_state, new_state): def state_transitioner(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): with obj._lock: if obj.state != required_state: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) obj.state = new_state return func(obj, *args, **kwargs) return wrapper return state_transitioner @decorator def check_state(required_states): def state_checker(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): if obj.state not in required_states: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) return func(obj, *args, **kwargs) return wrapper return state_checker class ConferenceHandler(object): implements(IObserver) def __init__(self, session): self.session = session self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _activate(self): self.active = True command = Command('subscribe') self._command_channel.send(command) return command def _deactivate(self): self.active = False command = Command('unsubscribe') self._command_channel.send(command) return command def _resubscribe(self): command = Command('subscribe') self._command_channel.send(command) return command def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._deactivate() command = Command('terminate') self._command_channel.send(command) command.wait() self.session = None def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): command.signal() raise proc.ProcExit() def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() - except DNSLookupError, e: + except DNSLookupError as e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) target_uri = SIPURI.new(self.session.remote_identity.uri) refresh_interval = getattr(command, 'refresh_interval', account.sip.subscribe_interval) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[route] except KeyError: continue subscription = Subscription(target_uri, FromHeader(SIPURI.new(self.session.local_identity.uri)), ToHeader(target_uri), ContactHeader(contact_uri), 'conference', RouteHeader(route.uri), credentials=account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) timeout = 5 raise SubscriptionError(error='Internal error', timeout=timeout) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break - except SIPSubscriptionDidFail, e: + except SIPSubscriptionDidFail as e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time timeout = random.uniform(60, 120) raise SubscriptionError(error='Authentication failed', timeout=timeout) elif e.data.code == 423: # Get the value of the Min-Expires header timeout = random.uniform(60, 120) if e.data.min_expires is not None and e.data.min_expires > refresh_interval: raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires) else: raise SubscriptionError(error='Interval too short', timeout=timeout) elif e.data.code in (405, 406, 489, 1400): command.signal(e) return else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription timeout = random.uniform(60, 180) raise SubscriptionError(error='No more routes to try', timeout=timeout) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'conference' and notification.data.body: try: conference_info = ConferenceDocument.parse(notification.data.body) except ParserError: pass else: notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info)) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) - except InterruptSubscription, e: + except InterruptSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass - except TerminateSubscription, e: + except TerminateSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) - except SubscriptionError, e: + except SubscriptionError as e: if 'min_expires' in e.attributes: command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires']) else: command = Command('subscribe', command.event) self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_SIPSessionDidStart(self, notification): if self.session.remote_focus: self._activate() @run_in_green_thread def _NH_SIPSessionDidFail(self, notification): self._terminate() @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): self._terminate() def _NH_SIPSessionDidRenegotiateStreams(self, notification): if self.session.remote_focus and not self.active: self._activate() elif not self.session.remote_focus and self.active: self._deactivate() def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._resubscribe() class Session(object): implements(IObserver) media_stream_timeout = 15 short_reinvite_timeout = 5 def __init__(self, account): self.account = account self.direction = None self.end_time = None self.on_hold = False self.proposed_streams = None self.route = None self.state = None self.start_time = None self.streams = None self.transport = None self.local_focus = False self.remote_focus = False self.greenlet = None self.conference = None self._channel = coros.queue() self._hold_in_progress = False self._invitation = None self._local_identity = None self._remote_identity = None self._lock = RLock() def init_incoming(self, invitation, data): remote_sdp = invitation.sdp.proposed_remote if not remote_sdp: invitation.send_response(488) return self.proposed_streams = [] for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, remote_sdp, index) except UnknownStreamError: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break if not self.proposed_streams: invitation.send_response(488) return if 'Replaces' in data.headers: invitation.send_response(403) return self.direction = 'incoming' self.state = 'incoming' self.transport = invitation.transport self._invitation = invitation self.conference = ConferenceHandler(self) if 'isfocus' in invitation.remote_contact_header.parameters: self.remote_focus = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=invitation) notification_center.post_notification('SIPSessionNewIncoming', self, NotificationData(streams=self.proposed_streams, headers=data.headers)) @transition_state(None, 'connecting') @run_in_green_thread def connect(self, from_header, to_header, route, streams, is_focus=False, contact_header=None, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') self.direction = 'outgoing' self.proposed_streams = streams self.route = route self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() self._local_identity = from_header self._remote_identity = to_header self.conference = ConferenceHandler(self) notification_center.add_observer(self, sender=self._invitation) notification_center.post_notification('SIPSessionNewOutgoing', self, NotificationData(streams=streams[:])) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 if contact_header is None: try: contact_uri = self.account.contact[self.route] - except KeyError, e: + except KeyError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e)) return else: contact_header = ContactHeader(contact_uri) if SIPConfig.advertised_ip not in (None, '0.0.0.0'): local_ip = SIPConfig.advertised_ip.normalized elif SIPConfig.local_ip not in (None, '0.0.0.0'): local_ip = SIPConfig.local_ip.normalized else: local_ip = contact_header.uri.host connection = SDPConnection(local_ip) local_sdp = SDPSession(local_ip, name=settings.user_agent) for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection local_sdp.media.append(media) route_header = RouteHeader(self.route.uri) if is_focus: contact_header.parameters['isfocus'] = None self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, extra_headers=extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self, ) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connected': if not connected: connected = True else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.end() return notification_center.post_notification('SIPSessionWillStart', self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) [self._channel.send(notification) for notification in invitation_notifications] while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connected': if not connected: connected = True else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) - except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e: + except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=0, reason=None, error=error) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator)) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason else: code = getattr(e.data, 'code', 0) reason = getattr(e.data, 'reason', 'Session disconnected') if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None - except SIPCoreError, e: + except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in self.streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() def _reinvite_after_ice(self): # This function does not do any error checking, it's designed to be called at the end of connect and ad self.state = 'sending_proposal' self.greenlet = api.getcurrent() local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for index, stream in enumerate(self.streams): local_sdp.media[index] = stream.get_local_media(remote_sdp=None, index=index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False try: with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for index, stream in enumerate(self.streams): stream.update(local_sdp, remote_sdp, index) else: return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': self.end() return except Exception: pass finally: self.state = 'connected' self.greenlet = None @check_state(['incoming', 'received_proposal']) @run_in_green_thread def send_ring_indication(self): try: self._invitation.send_response(180) except SIPCoreInvalidStateError: pass # The INVITE session might have already been cancelled; ignore the error @transition_state('incoming', 'accepting') @run_in_green_thread def accept(self, streams, is_focus=False, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() self.local_focus = is_focus connected = False unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') for stream in self.proposed_streams: if stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') self.proposed_streams = streams wait_count = len(self.proposed_streams) try: while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 remote_sdp = self._invitation.sdp.proposed_remote if SIPConfig.advertised_ip not in (None, '0.0.0.0'): local_ip = SIPConfig.advertised_ip.normalized elif SIPConfig.local_ip not in (None, '0.0.0.0'): local_ip = SIPConfig.local_ip.normalized else: sdp_connection = remote_sdp.connection or next(media.connection for media in remote_sdp.media if media.connection is not None) local_ip = host.outgoing_ip_for(sdp_connection.address) if sdp_connection.address != '0.0.0.0' else sdp_connection.address if local_ip is None: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address') return connection = SDPConnection(local_ip) local_sdp = SDPSession(local_ip, name=settings.user_agent) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection else: media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) contact_header = ContactHeader.new(self._invitation.local_contact_header) try: local_contact_uri = self.account.contact[self._invitation.transport] contact_header.uri = local_contact_uri except KeyError: pass if is_focus: contact_header.parameters['isfocus'] = None self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp, extra_headers=extra_headers) notification_center.post_notification('SIPSessionWillStart', sender=self) # Local and remote SDPs will be set after the 200 OK is sent while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) wait_count = 0 stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map.get(index, None) if stream is not None: if remote_media.port: wait_count += 1 stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): while wait_count > 0 or not connected or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) - except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e: + except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() reason_header = None if isinstance(e, api.TimeoutError): if wait_count > 0: error = 'media stream timed out while starting' else: error = 'No ACK received' reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'Missing ACK' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream did not initialize' else: error = 'media stream failed: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream failed to start' self.start_time = ISOTimestamp.now() if self._invitation.state in ('incoming', 'early'): self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header) else: self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' if e.data.prev_state in ('incoming', 'early'): notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None)) elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK': notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote')) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason)) self.greenlet = None except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) - except SIPCoreError, e: + except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() @transition_state('incoming', 'terminating') @run_in_green_thread def reject(self, code=603, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'disconnected': break except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' self.proposed_streams = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) - except SIPCoreError, e: + except SIPCoreError as e: self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) except api.TimeoutError: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' self.proposed_streams = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None)) else: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' self.proposed_streams = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None)) @transition_state('received_proposal', 'accepting_proposal') @run_in_green_thread def accept_proposal(self, streams): self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] streams = [stream for stream in streams if stream in self.proposed_streams] for stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') try: wait_count = len(streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 remote_sdp = self._invitation.sdp.proposed_remote connection = SDPConnection(local_sdp.address) stream_map = dict((stream.index, stream) for stream in streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection if index < len(local_sdp.media): local_sdp.media[index] = media else: local_sdp.media.append(media) elif index >= len(local_sdp.media): # actually == is sufficient media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) self._invitation.send_response(200, sdp=local_sdp) prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) received_invitation_state = False received_sdp_update = False while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) for stream in streams: # TODO: check if port is 0 in local_sdp. In that case PJSIP disabled the stream becuase it couldn't # negotiation failed. If there are more streams, however, the negotiation is considered successful as a # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind io # OK, but we cannot really start the stream. -Saul stream.start(local_sdp, remote_sdp, stream.index) with api.timeout(self.media_stream_timeout): wait_count = len(streams) while wait_count > 0 or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 else: unhandled_notifications.append(notification) except api.TimeoutError: self._fail_proposal(originator='remote', error='media stream timed out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='remote', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='remote', error='media stream failed: {.data.reason}'.format(e)) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: self._fail_proposal(originator='remote', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) - except SIPCoreError, e: + except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.streams + streams proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() @transition_state('received_proposal', 'rejecting_proposal') @run_in_green_thread def reject_proposal(self, code=488, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1, None): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': break - except SIPCoreError, e: + except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() def add_stream(self, stream): self.add_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def add_streams(self, streams): streams = list(set(streams).difference(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() unhandled_notifications = [] self.proposed_streams = streams for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': # This is actually the only reason for which this notification could be received if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self._fail_proposal(originator='local', error='received stream proposal') self.handle_notification(notification) return local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.proposed_streams: # Try to reuse a disabled media stream to avoid an ever-growing SDP try: index = next(index for index, media in enumerate(local_sdp.media) if media.port == 0) reuse_media = True except StopIteration: index = len(local_sdp.media) reuse_media = False stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if reuse_media: local_sdp.media[index] = media else: local_sdp.media.append(media) self._invitation.send_reinvite(sdp=local_sdp) notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:])) received_invitation_state = False received_sdp_update = False try: with api.timeout(settings.sip.invite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) else: self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True if notification.data.code >= 300: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.greenlet = None self.state = 'connected' proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) return elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.cancel_proposal() return accepted_streams = [] for stream in self.proposed_streams: try: remote_media = remote_sdp.media[stream.index] except IndexError: self._fail_proposal(originator='local', error='SDP media missing in answer') return else: if remote_media.port: stream.start(local_sdp, remote_sdp, stream.index) accepted_streams.append(stream) else: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): wait_count = len(accepted_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 except api.TimeoutError: self._fail_proposal(originator='local', error='media stream timed out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='local', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='local', error='media stream failed: {.data.reason}'.format(e)) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: self._fail_proposal(originator='local', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) - except SIPCoreError, e: + except SIPCoreError as e: self._fail_proposal(originator='local', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams += accepted_streams proposed_streams = self.proposed_streams self.proposed_streams = None any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in accepted_streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=accepted_streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=accepted_streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() def remove_stream(self, stream): self.remove_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def remove_streams(self, streams): streams = list(set(streams).intersection(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() self.streams.remove(stream) media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True if not (200 <= notification.data.code < 300): break elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: for stream in streams: stream.end() self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except (api.TimeoutError, MediaStreamDidFailError, SIPCoreError): for stream in streams: stream.end() self.end() else: for stream in streams: stream.end() self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=streams)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() @transition_state('sending_proposal', 'cancelling_proposal') @run_in_green_thread def cancel_proposal(self): if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': if notification.data.code == 487: proposed_streams = (self.proposed_streams or [])[:] for stream in proposed_streams: stream.deactivate() stream.end() self.state = 'connected' self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) elif notification.data.code == 200: self.end() elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break - except SIPCoreError, e: + except SIPCoreError as e: proposed_streams = (self.proposed_streams or [])[:] for stream in proposed_streams: stream.deactivate() stream.end() self.greenlet = None self.state = 'connected' self.proposed_streams = None notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=proposed_streams)) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: self.proposed_streams = None self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) else: self.proposed_streams = None self.greenlet = None self.state = 'connected' finally: if self._hold_in_progress: self._send_hold() @run_in_green_thread def hold(self): if self.on_hold or self._hold_in_progress: return self._hold_in_progress = True streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.hold() if self.state == 'connected': self._send_hold() @run_in_green_thread def unhold(self): if not self.on_hold and not self._hold_in_progress: return self._hold_in_progress = False streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.unhold() if self.state == 'connected': self._send_unhold() @run_in_green_thread def end(self): if self.state in (None, 'terminating', 'terminated'): return if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = None notification_center = NotificationCenter() if self._invitation is None or self._invitation.state is None: # The invitation was not yet constructed self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) return invitation_state = self._invitation.state if invitation_state in ('disconnecting', 'disconnected'): return self.greenlet = api.getcurrent() self.state = 'terminating' if invitation_state == 'connected': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local')) streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() cancelling = invitation_state != 'connected' and self.direction == 'outgoing' try: self._invitation.end(timeout=1) while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': break - except SIPCoreError, e: + except SIPCoreError as e: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e))) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state == 'connected': self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) else: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request')) finally: for stream in streams: stream.end() notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' @property def local_identity(self): if self._invitation is not None and self._invitation.local_identity is not None: return self._invitation.local_identity else: return self._local_identity @property def peer_address(self): return self._invitation.peer_address if self._invitation is not None else None @property def remote_identity(self): if self._invitation is not None and self._invitation.remote_identity is not None: return self._invitation.remote_identity else: return self._remote_identity @property def remote_user_agent(self): return self._invitation.remote_user_agent if self._invitation is not None else None @property def call_id(self): return self._invitation.call_id if self._invitation is not None else None @property def request_uri(self): return self._invitation.request_uri if self._invitation is not None else None def _cancel_hold(self): notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': if notification.data.code == 200: self.end() return False elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError: pass - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return False return True def _send_hold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = True self.state = 'connected' hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams))) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._hold_in_progress = False else: for stream in self.streams: stream.unhold() self._send_unhold() def _send_unhold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = False self.state = 'connected' notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: for stream in self.streams: stream.hold() self._send_hold() def _fail(self, originator, code, reason, error, reason_header=None): notification_center = NotificationCenter() prev_inv_state = self._invitation.state self.state = 'terminating' if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'): notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator)) if self._invitation.state not in (None, 'disconnecting', 'disconnected'): try: if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'): if 400 <= code <= 699 and reason is not None: self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else []) else: self._invitation.end(extra_headers=[reason_header] if reason_header is not None else []) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': break except (api.TimeoutError, SIPCoreError): pass notification_center.remove_observer(self, sender=self._invitation) self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None)) self.greenlet = None def _fail_proposal(self, originator, error): notification_center = NotificationCenter() has_streams = bool(self.proposed_streams) for stream in self.proposed_streams: try: notification_center.remove_observer(self, sender=stream) except KeyError: # _fail_proposal can be called from reject_proposal, which means the stream will # not have been initialized or the session registered as an observer for it. pass else: stream.deactivate() stream.end() if originator == 'remote' and self._invitation.sub_state == 'received_proposal': try: self._invitation.send_response(488 if has_streams else 500) except SIPCoreError: pass notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:])) self.state = 'connected' self.proposed_streams = None self.greenlet = None @run_in_green_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPInvitationChangedState(self, notification): if self.state == 'terminated': return if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'): contact_header = notification.data.headers.get('Contact', None) if contact_header and 'isfocus' in contact_header[0].parameters: self.remote_focus = True if self.greenlet is not None: if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting': self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data)) else: self._channel.send(notification) else: self.greenlet = api.getcurrent() unhandled_notifications = [] try: if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self.state = 'received_proposal' try: proposed_remote_sdp = self._invitation.sdp.proposed_remote active_remote_sdp = self._invitation.sdp.active_remote if len(proposed_remote_sdp.media) < len(active_remote_sdp.media): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Streams cannot be deleted from the SDP')]) self.state = 'connected' return for stream in self.streams: if not stream.validate_update(proposed_remote_sdp, stream.index): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)]) self.state = 'connected' return added_media_indexes = set() removed_media_indexes = set() reused_media_indexes = set() for index, media_stream in enumerate(proposed_remote_sdp.media): if index >= len(active_remote_sdp.media): added_media_indexes.add(index) elif media_stream.port == 0 and active_remote_sdp.media[index].port > 0: removed_media_indexes.add(index) elif media_stream.port > 0 and active_remote_sdp.media[index].port == 0: reused_media_indexes.add(index) elif media_stream.media != active_remote_sdp.media[index].media: added_media_indexes.add(index) removed_media_indexes.add(index) if added_media_indexes | reused_media_indexes and removed_media_indexes: engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')]) self.state = 'connected' return elif added_media_indexes | reused_media_indexes: self.proposed_streams = [] for index in added_media_indexes | reused_media_indexes: media_stream = proposed_remote_sdp.media[index] if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index) except UnknownStreamError: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self._invitation.send_response(100) notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:])) else: self._invitation.send_response(488) self.state = 'connected' return else: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes] prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) for stream in removed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=proposed_remote_sdp, index=stream.index) try: self._invitation.send_response(200, sdp=local_sdp) except PJSIPError: for stream in removed_streams: self.streams.remove(stream) stream.end() if removed_streams: self.end() return else: try: self._invitation.send_response(488) except PJSIPError: self.end() return else: for stream in removed_streams: self.streams.remove(stream) stream.end() received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) if removed_media_indexes: notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams)) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: self.end() else: self.state = 'connected' elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request': self.state = 'received_proposal_request' try: # An empty proposal was received, generate an offer self._invitation.send_response(100) local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip) if local_sdp.connection is not None: local_sdp.connection.address = connection_address for index, stream in enumerate(self.streams): stream.reset(index) media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is not None: media.connection.address = connection_address local_sdp.media[stream.index] = media self._invitation.send_response(200, sdp=local_sdp) received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: # TODO pass elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) - except InvitationDisconnectedError, e: + except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: raise # FIXME else: self.state = 'connected' elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal': if notification.data.originator == 'local' and notification.data.code == 487: self.state = 'connected' proposed_streams = self.proposed_streams[:] self.proposed_streams = None notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() elif notification.data.state == 'disconnected': if self.state == 'incoming': self.state = 'terminated' if notification.data.originator == 'remote': notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: # There must have been an error involved notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator)) for stream in self.streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' self.end_time = ISOTimestamp.now() notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason)) notification.center.remove_observer(self, sender=self._invitation) finally: self.greenlet = None for notification in unhandled_notifications: self.handle_notification(notification) def _NH_SIPInvitationGotSDPUpdate(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_SIPInvitationTransferNewIncoming(self, notification): self._invitation.notify_transfer_progress(500) def _NH_RTPStreamDidEnableEncryption(self, notification): if notification.sender.type != 'audio': return audio_stream = notification.sender if audio_stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: video_stream = next(stream for stream in self.streams or [] if stream.type=='video') except StopIteration: return if video_stream.encryption.type == 'ZRTP' and not video_stream.encryption.active: video_stream.encryption.zrtp._enable(audio_stream) def _NH_MediaStreamDidStart(self, notification): stream = notification.sender if stream.type == 'audio' and stream.encryption.type == 'ZRTP': stream.encryption.zrtp._enable() elif stream.type == 'video' and stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: audio_stream = next(stream for stream in self.streams or [] if stream.type=='audio') except StopIteration: pass else: if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active: stream.encryption.zrtp._enable(audio_stream) if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidInitialize(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if self.greenlet is not None and self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data)) def _NH_MediaStreamDidFail(self, notification): if self.greenlet is not None: if self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) else: stream = notification.sender if self.streams == [stream]: self.end() else: try: self.remove_stream(stream) except IllegalStateError: self.end() class SessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = [] self.state = None self._channel = coros.queue() def start(self): self.state = 'starting' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillStart', sender=self) notification_center.add_observer(self, 'SIPInvitationChangedState') notification_center.add_observer(self, 'SIPSessionNewIncoming') notification_center.add_observer(self, 'SIPSessionNewOutgoing') notification_center.add_observer(self, 'SIPSessionDidFail') notification_center.add_observer(self, 'SIPSessionDidEnd') self.state = 'started' notification_center.post_notification('SIPSessionManagerDidStart', sender=self) def stop(self): self.state = 'stopping' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillEnd', sender=self) for session in self.sessions: session.end() while self.sessions: self._channel.wait() notification_center.remove_observer(self, 'SIPInvitationChangedState') notification_center.remove_observer(self, 'SIPSessionNewIncoming') notification_center.remove_observer(self, 'SIPSessionNewOutgoing') notification_center.remove_observer(self, 'SIPSessionDidFail') notification_center.remove_observer(self, 'SIPSessionDidEnd') self.state = 'stopped' notification_center.post_notification('SIPSessionManagerDidEnd', sender=self) @run_in_twisted_thread def handle_notification(self, notification): if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming': account_manager = AccountManager() account = account_manager.find_account(notification.data.request_uri) if account is None: account = DefaultAccount() notification.sender.send_response(100) session = Session(account) session.init_incoming(notification.sender, notification.data) elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'): self.sessions.append(notification.sender) elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): self.sessions.remove(notification.sender) if self.state == 'stopping': self._channel.send(notification) diff --git a/sylk/streams.py b/sylk/streams.py index a82badd..1e9bb99 100644 --- a/sylk/streams.py +++ b/sylk/streams.py @@ -1,382 +1,382 @@ import random from collections import defaultdict from functools import partial from application.notification import NotificationCenter, NotificationData from eventlib import api from eventlib.coros import queue from eventlib.proc import spawn, ProcExit from msrplib.connect import DirectConnector, DirectAcceptor from msrplib.protocol import URI, FailureReportHeader, SuccessReportHeader, UseNicknameHeader from msrplib.session import contains_mime_type, MSRPSession from msrplib.transport import make_response from sipsimple.core import SDPAttribute from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, State, LastActive, Refresh, ContentType from sipsimple.streams import InvalidStreamError, UnknownStreamError from sipsimple.streams.msrp import MSRPStreamBase as _MSRPStreamBase, MSRPStreamError, NotificationProxyLogger from sipsimple.streams.msrp.chat import ChatStream as _ChatStream, ChatStreamError, ChatIdentity, Message, QueuedMessage, CPIMPayload, CPIMParserError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from sylk.configuration import SIPConfig @run_in_green_thread def MSRPStreamBase_initialize(self, session, direction): self.greenlet = api.getcurrent() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) try: self.session = session self.transport = self.session.account.msrp.transport outgoing = direction=='outgoing' logger = NotificationProxyLogger() if self.session.account.msrp.connection_model == 'relay': if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' elif not outgoing: if self.transport=='tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger) self.local_role = 'passive' else: # outgoing self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if not outgoing and self.transport=='tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' full_local_path = self.msrp_connector.prepare(local_uri=URI(host=SIPConfig.local_ip.normalized, port=0, use_tls=self.transport=='tls', credentials=self.session.account.tls_credentials)) self.local_media = self._create_local_media(full_local_path) - except Exception, e: + except Exception as e: notification_center.post_notification('MediaStreamDidNotInitialize', self, NotificationData(reason=str(e))) else: self._initialized = True notification_center.post_notification('MediaStreamDidInitialize', self) finally: self.greenlet = None # Monkey-patch the initialize method (needed because we want every MSRP based stream to behave this way, including file transfers) # _MSRPStreamBase.initialize = MSRPStreamBase_initialize class ChatStream(_MSRPStreamBase): type = 'chat' priority = _ChatStream.priority + 1 msrp_session_class = MSRPSession media_type = 'message' accept_types = ['message/cpim'] accept_wrapped_types = ['*'] supported_chatroom_capabilities = ['nickname', 'private-messages', 'com.ag-projects.screen-sharing', 'com.ag-projects.zrtp-sas'] def __init__(self): super(ChatStream, self).__init__(direction='sendrecv') self.message_queue = queue() self.sent_messages = set() self.incoming_queue = defaultdict(list) self.message_queue_thread = None @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): remote_stream = remote_sdp.media[stream_index] if remote_stream.media != 'message': raise UnknownStreamError expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport=='tls' else 'TCP/MSRP' if remote_stream.transport != expected_transport: raise InvalidStreamError("expected %s transport in chat stream, got %s" % (expected_transport, remote_stream.transport)) if remote_stream.formats != ['*']: raise InvalidStreamError("wrong format list specified") stream = cls() stream.remote_role = remote_stream.attributes.getfirst('setup', 'active') if remote_stream.direction != 'sendrecv': raise InvalidStreamError("Unsupported direction for chat stream: %s" % remote_stream.direction) remote_accept_types = remote_stream.attributes.getfirst('accept-types') if remote_accept_types is None: raise InvalidStreamError("remote SDP media does not have 'accept-types' attribute") if not any(contains_mime_type(cls.accept_types, mime_type) for mime_type in remote_accept_types.split()): raise InvalidStreamError("no compatible media types found") return stream @property def local_identity(self): try: return ChatIdentity(self.session.local_identity.uri, self.session.account.display_name) except AttributeError: return None @property def remote_identity(self): try: return ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name) except AttributeError: return None @property def private_messages_allowed(self): return 'private-messages' in self.chatroom_capabilities @property def nickname_allowed(self): return 'nickname' in self.chatroom_capabilities @property def chatroom_capabilities(self): try: if self.session.local_focus: return ' '.join(self.local_media.attributes.getall('chatroom')).split() elif self.session.remote_focus: return ' '.join(self.remote_media.attributes.getall('chatroom')).split() except AttributeError: pass return [] def _NH_MediaStreamDidStart(self, notification): self.message_queue_thread = spawn(self._message_queue_handler) def _NH_MediaStreamDidNotInitialize(self, notification): message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream was closed') notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _NH_MediaStreamDidEnd(self, notification): if self.message_queue_thread is not None: self.message_queue_thread.kill() else: message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _create_local_media(self, uri_path): local_media = super(ChatStream, self)._create_local_media(uri_path) if self.session.local_focus and self.supported_chatroom_capabilities: local_media.attributes.append(SDPAttribute('chatroom', ' '.join(self.supported_chatroom_capabilities))) return local_media def _handle_REPORT(self, chunk): # in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message. if chunk.message_id in self.sent_messages: self.sent_messages.remove(chunk.message_id) notification_center = NotificationCenter() data = NotificationData(message_id=chunk.message_id, message=chunk, code=chunk.status.code, reason=chunk.status.comment) if chunk.status.code == 200: notification_center.post_notification('ChatStreamDidDeliverMessage', sender=self, data=data) else: notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _handle_SEND(self, chunk): # This ChatStream doesn't send MSRP REPORT chunks automatically, the developer needs to manually send them if chunk.size == 0: # keep-alive self.msrp_session.send_report(chunk, 200, 'OK') return if self.direction == 'sendonly': self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if chunk.content_type.lower() != 'message/cpim': self.incoming_queue.pop(chunk.message_id, None) self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') return if chunk.contflag == '#': self.incoming_queue.pop(chunk.message_id, None) self.msrp_session.send_report(chunk, 200, 'OK') return elif chunk.contflag == '+': self.incoming_queue[chunk.message_id].append(chunk.data) self.msrp_session.send_report(chunk, 200, 'OK') return else: data = ''.join(self.incoming_queue.pop(chunk.message_id, [])) + chunk.data try: payload = CPIMPayload.decode(data) except CPIMParserError: self.msrp_session.send_report(chunk, 400, 'CPIM Parser Error') return message = Message(**{name: getattr(payload, name) for name in Message.__slots__}) if not contains_mime_type(self.accept_wrapped_types, message.content_type): self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') return if message.timestamp is None: message.timestamp = ISOTimestamp.now() if message.sender is None: message.sender = self.remote_identity if payload.charset is not None: message.content = message.content.decode(payload.charset) private = self.session.remote_focus and len(message.recipients) == 1 and message.recipients[0] != self.remote_identity notification_center = NotificationCenter() if message.content_type.lower() == IsComposingDocument.content_type: try: data = IsComposingDocument.parse(message.content) except ParserError as e: self.msrp_session.send_report(chunk, 400, str(e)) return ndata = NotificationData(state=data.state.value, refresh=data.refresh.value if data.refresh is not None else 120, content_type=data.content_type.value if data.content_type is not None else None, last_active=data.last_active.value if data.last_active is not None else None, sender=message.sender, recipients=message.recipients, private=private, chunk=chunk) notification_center.post_notification('ChatStreamGotComposingIndication', self, ndata) else: ndata = NotificationData(message=message, private=private, chunk=chunk) notification_center.post_notification('ChatStreamGotMessage', self, ndata) def _handle_NICKNAME(self, chunk): nickname = chunk.headers['Use-Nickname'].decoded NotificationCenter().post_notification('ChatStreamGotNicknameRequest', self, NotificationData(nickname=nickname, chunk=chunk)) def _on_transaction_response(self, message_id, response): if message_id in self.sent_messages and response.code != 200: self.sent_messages.remove(message_id) data = NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment) NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _on_nickname_transaction_response(self, message_id, response): notification_center = NotificationCenter() if response.code == 200: notification_center.post_notification('ChatStreamDidSetNickname', sender=self, data=NotificationData(message_id=message_id, response=response)) else: notification_center.post_notification('ChatStreamDidNotSetNickname', sender=self, data=NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment)) def _message_queue_handler(self): notification_center = NotificationCenter() try: while True: message = self.message_queue.wait() if self.msrp_session is None: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) break try: if isinstance(message.content, unicode): message.content = message.content.encode('utf8') charset = 'utf8' else: charset = None message.sender = message.sender or self.local_identity message.recipients = message.recipients or [self.remote_identity] message.timestamp = message.timestamp or ISOTimestamp.now() payload = CPIMPayload(charset=charset, **{name: getattr(message, name) for name in Message.__slots__}) - except ChatStreamError, e: + except ChatStreamError as e: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason=e.args[0]) notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) continue else: content, content_type = payload.encode() message_id = message.id notify_progress = message.notify_progress report = 'yes' if notify_progress else 'no' chunk = self.msrp_session.make_message(content, content_type=content_type, message_id=message_id) chunk.add_header(FailureReportHeader(report)) chunk.add_header(SuccessReportHeader(report)) try: self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_transaction_response, message_id)) - except Exception, e: + except Exception as e: if notify_progress: data = NotificationData(message_id=message_id, message=None, code=0, reason=str(e)) notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) except ProcExit: if notify_progress: data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) raise else: if notify_progress: self.sent_messages.add(message_id) notification_center.post_notification('ChatStreamDidSendMessage', sender=self, data=NotificationData(message=chunk)) finally: self.message_queue_thread = None while self.sent_messages: message_id = self.sent_messages.pop() data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) @run_in_twisted_thread def _enqueue_message(self, message): if self._done: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) else: self.message_queue.send(message) @run_in_green_thread def _send_nickname_response(self, chunk, code, reason): response = make_response(chunk, code, reason) try: self.msrp_session.send_chunk(response) except Exception: pass def accept_nickname(self, chunk): if chunk.method != 'NICKNAME': raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method) self._send_nickname_response(chunk, 200, 'OK') def reject_nickname(self, chunk, code, reason): if chunk.method != 'NICKNAME': raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method) self._send_nickname_response(chunk, code, reason) def send_message(self, content, content_type='text/plain', sender=None, recipients=None, timestamp=None, additional_headers=None, message_id=None, notify_progress=True): message = QueuedMessage(content, content_type, sender=sender, recipients=recipients, timestamp=timestamp, additional_headers=additional_headers, id=message_id, notify_progress=notify_progress) self._enqueue_message(message) return message.id def send_composing_indication(self, state, refresh=None, last_active=None, sender=None, recipients=None, message_id=None, notify_progress=False): content = IsComposingDocument.create(state=State(state), refresh=Refresh(refresh) if refresh is not None else None, last_active=LastActive(last_active) if last_active is not None else None, content_type=ContentType('text')) message = QueuedMessage(content, IsComposingDocument.content_type, sender=sender, recipients=recipients, id=message_id, notify_progress=notify_progress) self._enqueue_message(message) return message.id @run_in_green_thread def _set_local_nickname(self, nickname, message_id): if self.msrp_session is None: # should we generate ChatStreamDidNotSetNickname here? return chunk = self.msrp.make_request('NICKNAME') chunk.add_header(UseNicknameHeader(nickname or u'')) try: self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_nickname_transaction_response, message_id)) - except Exception, e: + except Exception as e: self._failure_reason = str(e) NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='sending', reason=self._failure_reason)) def set_local_nickname(self, nickname): if not self.nickname_allowed: raise ChatStreamError('Setting nickname is not supported') message_id = '%x' % random.getrandbits(64) self._set_local_nickname(nickname, message_id) return message_id diff --git a/sylk/tls.py b/sylk/tls.py index 6106ae1..f6dc369 100644 --- a/sylk/tls.py +++ b/sylk/tls.py @@ -1,50 +1,50 @@ """TLS helper classes""" __all__ = ['Certificate', 'PrivateKey'] from gnutls.crypto import X509Certificate, X509PrivateKey from application import log from application.process import process def file_content(file): path = process.config_file(file) if path is None: raise Exception("File '%s' does not exist" % file) try: f = open(path, 'rt') except Exception: raise Exception("File '%s' could not be open" % file) try: return f.read() finally: f.close() class Certificate(object): """Configuration data type. Used to create a gnutls.crypto.X509Certificate object from a file given in the configuration file.""" def __new__(cls, value): if isinstance(value, basestring): try: return X509Certificate(file_content(value)) - except Exception, e: + except Exception as e: log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e))) return None else: raise TypeError('value should be a string') class PrivateKey(object): """Configuration data type. Used to create a gnutls.crypto.X509PrivateKey object from a file given in the configuration file.""" def __new__(cls, value): if isinstance(value, basestring): try: return X509PrivateKey(file_content(value)) - except Exception, e: + except Exception as e: log.warn("Private key file '%s' could not be loaded: %s" % (value, str(e))) return None else: raise TypeError('value should be a string')