diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py index 4a622bc..5dda11b 100644 --- a/sylk/applications/conference/__init__.py +++ b/sylk/applications/conference/__init__.py @@ -1,436 +1,438 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # import mimetypes import os import re import shutil from application.notification import IObserver, NotificationCenter from application.python import Null from gnutls.interfaces.twisted import X509Credentials from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.configuration.settings import SIPSimpleSettings -from sipsimple.core import Engine, SIPURI, SIPCoreError, Header, ContactHeader, FromHeader, ToHeader +from sipsimple.core import Engine, SIPURI, SIPCoreError +from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader, SubjectHeader from sipsimple.lookup import DNSLookup from sipsimple.session import IllegalStateError from sipsimple.streams import AudioStream from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications import SylkApplication from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.logger import log from sylk.applications.conference.room import Room from sylk.applications.conference.web import ScreenSharingWebServer from sylk.bonjour import BonjourServices from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.extensions import ChatStream from sylk.session import Session from sylk.tls import Certificate, PrivateKey class ACLValidationError(Exception): pass class RoomNotFoundError(Exception): pass class ConferenceApplication(SylkApplication): implements(IObserver) def __init__(self): self._rooms = {} self.invited_participants_map = {} self.bonjour_focus_service = Null self.bonjour_room_service = Null self.screen_sharing_web_server = None def start(self): # cleanup old files for path in (ConferenceConfig.file_transfer_dir, ConferenceConfig.screen_sharing_dir): try: shutil.rmtree(path) except EnvironmentError: pass if ServerConfig.enable_bonjour and ServerConfig.default_application == 'conference': self.bonjour_focus_service = BonjourServices(service='sipfocus') self.bonjour_focus_service.start() log.msg("Bonjour publication started for service 'sipfocus'") self.bonjour_room_service = BonjourServices(service='sipuri', name='Conference Room', uri_user='conference') self.bonjour_room_service.start() self.bonjour_room_service.presence_state = BonjourPresenceState('available', u'No participants') log.msg("Bonjour publication started for service 'sipuri'") self.screen_sharing_web_server = ScreenSharingWebServer(ConferenceConfig.screen_sharing_dir) if ConferenceConfig.screen_sharing_use_https and ConferenceConfig.screen_sharing_certificate is not None: cert = Certificate(ConferenceConfig.screen_sharing_certificate.normalized) key = PrivateKey(ConferenceConfig.screen_sharing_certificate.normalized) credentials = X509Credentials(cert, key) else: credentials = None self.screen_sharing_web_server.start(ConferenceConfig.screen_sharing_ip, ConferenceConfig.screen_sharing_port, credentials) listen_address = self.screen_sharing_web_server.listener.getHost() log.msg("ScreenSharing listener started on %s:%d" % (listen_address.host, listen_address.port)) def stop(self): self.bonjour_focus_service.stop() self.bonjour_room_service.stop() self.screen_sharing_web_server.stop() def get_room(self, uri, create=False): room_uri = '%s@%s' % (uri.user, uri.host) try: room = self._rooms[room_uri] except KeyError: if create: room = Room(room_uri) self._rooms[room_uri] = room return room else: raise RoomNotFoundError else: return room def remove_room(self, uri): room_uri = '%s@%s' % (uri.user, uri.host) self._rooms.pop(room_uri, None) def validate_acl(self, room_uri, from_uri): room_uri = '%s@%s' % (room_uri.user, room_uri.host) cfg = get_room_config(room_uri) if cfg.access_policy == 'allow,deny': if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri): return raise ACLValidationError else: if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri): raise ACLValidationError def incoming_session(self, session): log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer'] if not audio_streams and not chat_streams and not transfer_streams: log.msg(u'Session rejected: invalid media, only RTP audio and MSRP chat are supported') session.reject(488) return try: self.validate_acl(session._invitation.request_uri, session.remote_identity.uri) except ACLValidationError: log.msg(u'Session rejected: unauthorized by access list') session.reject(403) return # Check if requested files belong to this room for stream in (stream for stream in transfer_streams if stream.direction == 'sendonly'): try: room = self.get_room(session._invitation.request_uri) except RoomNotFoundError: log.msg(u'Session rejected: room not found') session.reject(404) return try: file = next(file for file in room.files if file.hash == stream.file_selector.hash) except StopIteration: log.msg(u'Session rejected: requested file not found') session.reject(404) return filename = os.path.basename(file.name) for dirpath, dirnames, filenames in os.walk(os.path.join(ConferenceConfig.file_transfer_dir, room.uri)): if filename in filenames: path = os.path.join(dirpath, filename) stream.file_selector.fd = open(path, 'r') if stream.file_selector.size is None: stream.file_selector.size = os.fstat(stream.file_selector.fd.fileno()).st_size if stream.file_selector.type is None: mime_type, encoding = mimetypes.guess_type(filename) if encoding is not None: type = 'application/x-%s' % encoding elif mime_type is not None: type = mime_type else: type = 'application/octet-stream' stream.file_selector.type = type break else: # File got removed from the filesystem log.msg(u'Session rejected: requested file removed from the filesystem') session.reject(404) return NotificationCenter().add_observer(self, sender=session) if audio_streams: session.send_ring_indication() streams = [streams[0] for streams in (audio_streams, chat_streams, transfer_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return if subscribe_request.event != 'conference': log.msg(u'Subscription rejected: only conference event is supported') subscribe_request.reject(489) return try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: try: self.validate_acl(to_header.uri, from_header.uri) except ACLValidationError: # Check if we need to skip the ACL because this was an invited participant if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})): log.msg(u'Subscription rejected: unauthorized by access list') subscribe_request.reject(403) return try: room = self.get_room(data.request_uri) except RoomNotFoundError: try: room = self.get_room(to_header.uri) except RoomNotFoundError: log.msg(u'Subscription rejected: room not yet created') subscribe_request.reject(480) return if not room.started: log.msg(u'Subscription rejected: room not started yet') subscribe_request.reject(480) else: room.handle_incoming_subscription(subscribe_request, data) def incoming_referral(self, refer_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) refer_to_header = data.headers.get('Refer-To', Null) if Null in (from_header, to_header, refer_to_header): refer_request.reject(400) return log.msg(u'Room %s - join request from %s to %s' % ('%s@%s' % (to_header.uri.user, to_header.uri.host), from_header.uri, refer_to_header.uri)) try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: log.msg(u'Room %s - invite participant request rejected: unauthorized by access list' % data.request_uri) refer_request.reject(403) return referral_handler = IncomingReferralHandler(refer_request, data) referral_handler.start() def incoming_message(self, message_request, data): log.msg(u'SIP MESSAGE is not supported, use MSRP media instead') message_request.answer(405) def accept_session(self, session, streams): if session.state == 'incoming': try: session.accept(streams, is_focus=True) except IllegalStateError: pass def add_participant(self, session, room_uri): # Keep track of the invited participants, we must skip ACL policy # for SUBSCRIBE requests room_uri_str = '%s@%s' % (room_uri.user, room_uri.host) log.msg(u'Room %s - outgoing session to %s started' % (room_uri_str, session.remote_identity.uri)) d = self.invited_participants_map.setdefault(room_uri_str, {}) d.setdefault(str(session.remote_identity.uri), 0) d[str(session.remote_identity.uri)] += 1 NotificationCenter().add_observer(self, sender=session) room = self.get_room(room_uri, True) room.start() room.add_session(session) def remove_participant(self, participant_uri, room_uri): try: room = self.get_room(room_uri) except RoomNotFoundError: pass else: log.msg('Room %s - %s removed from conference' % (room_uri, participant_uri)) room.terminate_sessions(participant_uri) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender room = self.get_room(session._invitation.request_uri, True) # FIXME room.start() room.add_session(session) @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) if session.direction == 'incoming': room_uri = session._invitation.request_uri # FIXME else: # Clear invited participants mapping room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host) d = self.invited_participants_map[room_uri_str] d[str(session.remote_identity.uri)] -= 1 if d[str(session.remote_identity.uri)] == 0: del d[str(session.remote_identity.uri)] room_uri = session.local_identity.uri # We could get this notifiction even if we didn't get SIPSessionDidStart try: room = self.get_room(room_uri) except RoomNotFoundError: return if session in room.sessions: room.remove_session(session) if not room.stopping and room.empty: self.remove_room(room_uri) room.stop() def _NH_SIPSessionDidFail(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) log.msg(u'Session from %s failed: %s' % (session.remote_identity.uri, notification.data.reason)) class IncomingReferralHandler(object): implements(IObserver) def __init__(self, refer_request, data): self._refer_request = refer_request self._refer_headers = data.headers self.room_uri = data.request_uri self.room_uri_str = '%s@%s' % (self.room_uri.user, self.room_uri.host) self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri) self.method = data.headers.get('Refer-To').parameters.get('method', 'INVITE').upper() self.session = None self.streams = [] def start(self): if not self.refer_to_uri.startswith(('sip:', 'sips:')): self.refer_to_uri = 'sip:%s' % self.refer_to_uri try: self.refer_to_uri = SIPURI.parse(self.refer_to_uri) except SIPCoreError: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.reject(488) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._refer_request) if self.method == 'INVITE': self._refer_request.accept() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.refer_to_uri lookup = DNSLookup() notification_center.add_observer(self, sender=lookup) lookup.lookup_sip_proxy(uri, settings.sip.transport_list) elif self.method == 'BYE': log.msg('Room %s - %s removed %s from the room' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self._refer_request.accept() conference_application = ConferenceApplication() conference_application.remove_participant(self.refer_to_uri, self.room_uri) self._refer_request.end(200) else: self._refer_request.reject(488) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) account = DefaultAccount() conference_application = ConferenceApplication() try: room = conference_application.get_room(self.room_uri) except RoomNotFoundError: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return else: active_media = room.active_media if not active_media: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return if 'audio' in active_media: self.streams.append(AudioStream()) if 'chat' in active_media: self.streams.append(ChatStream()) self.session = Session(account) notification_center.add_observer(self, sender=self.session) original_from_header = self._refer_headers.get('From') if original_from_header.display_name: original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host) else: original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host) from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call') to_header = ToHeader(self.refer_to_uri) transport = notification.data.result[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)) extra_headers = [] if self._refer_headers.get('Referred-By', None) is not None: extra_headers.append(Header.new(self._refer_headers.get('Referred-By'))) else: extra_headers.append(Header('Referred-By', str(original_from_header.uri))) if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(original_from_header.uri))) - subject = u'Join conference request from %s' % original_identity - self.session.connect(from_header, to_header, contact_header=contact_header, routes=notification.data.result, streams=self.streams, is_focus=True, subject=subject, extra_headers=extra_headers) + extra_headers.append(SubjectHeader(u'Join conference request from %s' % original_identity)) + route = notification.data.result[0] + self.session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=self.streams, is_focus=True, extra_headers=extra_headers) def _NH_DNSLookupDidFail(self, notification): notification.center.remove_observer(self, sender=notification.sender) def _NH_SIPSessionGotRingIndication(self, notification): if self._refer_request is not None: self._refer_request.send_notify(180) def _NH_SIPSessionGotProvisionalResponse(self, notification): if self._refer_request is not None: self._refer_request.send_notify(notification.data.code, notification.data.reason) def _NH_SIPSessionDidStart(self, notification): notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) conference_application = ConferenceApplication() conference_application.add_participant(self.session, self.room_uri) log.msg('Room %s - %s added %s' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self.session = None self.streams = [] def _NH_SIPSessionDidFail(self, notification): log.msg('Room %s - failed to add %s: %s' % (self.room_uri_str, self.refer_to_uri, notification.data.reason)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(notification.data.code or 500, notification.data.reason or notification.data.code) self.session = None self.streams = [] def _NH_SIPSessionDidEnd(self, notification): # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) self.session = None self.streams = [] def _NH_SIPIncomingReferralDidEnd(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._refer_request = None diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py index 1701e00..01f07ca 100644 --- a/sylk/applications/conference/room.py +++ b/sylk/applications/conference/room.py @@ -1,1146 +1,1146 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import hashlib import os import random import re import shutil import string import weakref from collections import defaultdict, deque from glob import glob from itertools import chain, count, cycle from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.system import makedirs from eventlib import api, coros, proc from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.application import SIPApplication from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPCoreError, SIPCoreInvalidStateError, SIPURI -from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader +from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader, SubjectHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import conference from sipsimple.session import IllegalStateError from sipsimple.streams import FileTransferStream from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.streams.msrp import ChatStreamError, FileSelector from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.logger import log from sylk.bonjour import BonjourServices from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.configuration.datatypes import ResourcePath, URL from sylk.session import Session def format_identity(identity): uri = identity.uri if identity.display_name: return u'%s <%s@%s>' % (identity.display_name, uri.user, uri.host) else: return u'%s@%s' % (uri.user, uri.host) class ScreenImage(object): def __init__(self, room, sender): self.room = weakref.ref(room) self.room_uri = room.uri self.sender = sender self.filename = os.path.join(ConferenceConfig.screen_sharing_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10)))) from sylk.applications.conference import ConferenceApplication port = ConferenceApplication().screen_sharing_web_server.port scheme = 'https' if ConferenceConfig.screen_sharing_use_https else 'http' host = ConferenceConfig.screen_sharing_hostname or ConferenceConfig.screen_sharing_ip.normalized self.url = URL('%s://%s:%s/' % (scheme, host, port)) self.url.query_items['image'] = os.path.join(room.uri, os.path.basename(self.filename)) self.state = None self.timer = None @property def active(self): return self.state == 'active' @property def idle(self): return self.state == 'idle' @run_in_thread('file-io') def save(self, image): makedirs(os.path.dirname(self.filename)) tmp_filename = self.filename + '.tmp' try: with open(tmp_filename, 'wb') as file: file.write(image) except EnvironmentError, e: log.msg('Room %s - cannot write screen sharing image: %s: %s' % (self.room_uri, self.filename, e)) else: try: os.rename(tmp_filename, self.filename) except EnvironmentError: pass self.advertise() @run_in_twisted_thread def advertise(self): if self.state == 'active': self.timer.reset(10) else: if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'active' self.timer = reactor.callLater(10, self.stop_advertising) room = self.room() or Null room.dispatch_conference_info() txt = 'Room %s - %s is sharing the screen at %s' % (self.room_uri, format_identity(self.sender), self.url) room.dispatch_server_message(txt) log.msg(txt) @run_in_twisted_thread def stop_advertising(self): if self.state != 'idle': if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'idle' self.timer = None room = self.room() or Null room.dispatch_conference_info() txt = '%s stopped sharing the screen' % format_identity(self.sender) room.dispatch_server_message(txt) log.msg(txt) class Room(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ implements(IObserver) def __init__(self, uri): self.config = get_room_config(uri) self.uri = uri self.identity = CPIMIdentity(SIPURI.parse('sip:%s' % self.uri), display_name='Conference Room') self.files = [] self.screen_images = {} self.sessions = [] self.subscriptions = [] self.transfer_handlers = weakref.WeakSet() self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.moh_player = None self.conference_info_payload = None self.conference_info_version = count(1) self.bonjour_services = Null self.session_nickname_map = {} self.last_nicknames_map = {} self.participants_counter = defaultdict(lambda: 0) self.history = deque(maxlen=ConferenceConfig.history_size) @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' @property def stopping(self): return self.state in ('stopping', 'stopped') @property def active_media(self): return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams)))) @property def conference_info(self): if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent) conference_description.conf_uris = conference.ConfUris() conference_description.conf_uris.add(conference.ConfUrisEntry('sip:%s' % self.uri, purpose='participation')) if self.config.advertise_xmpp_support: conference_description.conf_uris.add(conference.ConfUrisEntry('xmpp:%s' % self.uri, purpose='participation')) # TODO: add grouptextchat service uri for number in self.config.pstn_access_numbers: conference_description.conf_uris.add(conference.ConfUrisEntry('tel:%s' % number, purpose='participation')) host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com')) self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users()) self.conference_info_payload.version = next(self.conference_info_version) user_count = len(self.participants_counter.keys()) self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True) users = conference.Users() for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')): try: user = next(user for user in users if user.entity == str(session.remote_identity.uri)) except StopIteration: display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name) user = conference.User(str(session.remote_identity.uri), display_text=display_text) user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host) screen_image = self.screen_images.get(user_uri, None) if screen_image is not None and screen_image.active: user.screen_image_url = screen_image.url users.add(user) joining_info = conference.JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected') display_text = self.session_nickname_map.get(session, session.remote_identity.display_name) endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status) for stream in session.streams: if stream.type == 'file-transfer': continue endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream))) user.add(endpoint) self.conference_info_payload.users = users if self.files: files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, file.status) for file in self.files) self.conference_info_payload.conference_description.resources = conference.Resources(files=files) return self.conference_info_payload.toxml() def start(self): if self.started: return if ServerConfig.enable_bonjour and self.identity.uri.user != 'conference': room_user = self.identity.uri.user self.bonjour_services = BonjourServices(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user) self.bonjour_services.start() self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.moh_player = MoHPlayer(self.audio_conference) self.moh_player.start() self.state = 'started' def stop(self): if not self.started: return self.state = 'stopping' self.bonjour_services.stop() self.bonjour_services = None self.incoming_message_queue.send_exception(api.GreenletExit) self.incoming_message_queue = None self.message_dispatcher.kill(proc.ProcExit) self.message_dispatcher = None self.moh_player.stop() self.moh_player = None self.audio_conference = None [handler.stop() for handler in self.transfer_handlers] notification_center = NotificationCenter() for subscription in self.subscriptions: notification_center.remove_observer(self, sender=subscription) subscription.end() self.subscriptions = [] self.cleanup_files() self.conference_info_payload = None self.state = 'stopped' @run_in_thread('file-io') def cleanup_files(self): path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass path = os.path.join(ConferenceConfig.screen_sharing_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'message': message = data.message if message.sender.uri != session.remote_identity.uri: continue if message.body.startswith('?OTR:'): continue if message.timestamp is None: message.timestamp = ISOTimestamp.utcnow() message.sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), message.sender.display_name) recipient = message.recipients[0] private = len(message.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri if private: self.dispatch_private_message(session, message) else: self.history.append(message) self.dispatch_message(session, message) elif message_type == 'composing_indication': if data.sender.uri != session.remote_identity.uri: continue recipient = data.recipients[0] private = len(data.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri if private: self.dispatch_private_iscomposing(session, data) else: self.dispatch_iscomposing(session, data) def dispatch_message(self, session, message): for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue try: chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers) except ChatStreamError, e: log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_private_message(self, session, message): # Private messages are delivered to all sessions matching the recipient but also to the sender, # for replication in clients recipient = message.recipients[0] for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue try: chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers) except ChatStreamError, e: log.error(u'Error dispatching private message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_iscomposing(self, session, data): for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue identity = CPIMIdentity(session.remote_identity.uri, session.remote_identity.display_name) try: chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity, recipients=[self.identity]) except ChatStreamError, e: log.error(u'Error dispatching composing indication to %s: %s' % (s.remote_identity.uri, e)) def dispatch_private_iscomposing(self, session, data): recipient_uri = data.recipients[0].uri for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue identity = CPIMIdentity(session.remote_identity.uri, session.remote_identity.display_name) try: chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity) except ChatStreamError, e: log.error(u'Error dispatching private composing indication to %s: %s' % (s.remote_identity.uri, e)) def dispatch_server_message(self, body, content_type='text/plain', exclude=None): for session in (session for session in self.sessions if session is not exclude): try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(body, content_type, local_identity=self.identity, recipients=[self.identity]) def dispatch_conference_info(self): data = self.conference_info for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(conference.ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def dispatch_file(self, file): sender_uri = file.sender.uri for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)): handler = OutgoingFileTransferHandler(self, uri, file) self.transfer_handlers.add(handler) handler.start() def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] += 1 try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams if stream.type == 'audio') except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Room %s - audio stream %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (self.uri, audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) try: transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer') except StopIteration: pass else: if transfer_stream.direction == 'recvonly': transfer_handler = IncomingFileTransferHandler(self, session) transfer_handler.start() txt = u'Room %s - %s is uploading file %s (%s)' % (self.uri, format_identity(session.remote_identity), transfer_stream.file_selector.name.decode('utf-8'), self.format_file_size(transfer_stream.file_selector.size)) else: transfer_handler = OutgoingFileTransferRequestHandler(self, session) transfer_handler.start() txt = u'Room %s - %s requested file %s' % (self.uri, format_identity(session.remote_identity), transfer_stream.file_selector.name.decode('utf-8')) log.msg(txt) self.dispatch_server_message(txt) if len(session.streams) == 1: return welcome_handler = WelcomeHandler(self, session) welcome_handler.start() self.dispatch_conference_info() if len(self.sessions) == 1: log.msg(u'Room %s - started by %s with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams))) else: log.msg(u'Room %s - %s joined with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session) if ServerConfig.enable_bonjour: self._update_bonjour_presence() def remove_session(self, session): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.session_nickname_map.pop(session, None) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] -= 1 if self.participants_counter[remote_uri] <= 0: del self.participants_counter[remote_uri] self.last_nicknames_map.pop(remote_uri, None) try: chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat') except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio') except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() try: next(stream for stream in session.streams if stream.type == 'file-transfer') except StopIteration: pass else: if len(session.streams) == 1: return self.dispatch_conference_info() log.msg(u'Room %s - %s left conference after %s' % (self.uri, format_identity(session.remote_identity), self.format_session_duration(session))) if not self.sessions: log.msg(u'Room %s - Last participant left conference' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session))) if ServerConfig.enable_bonjour: self._update_bonjour_presence() def terminate_sessions(self, uri): if not self.started: return for session in (session for session in self.sessions if session.remote_identity.uri == uri): session.end() def handle_incoming_subscription(self, subscribe_request, data): log.msg('Room %s - subscription from %s' % (self.uri, data.headers['From'].uri)) if subscribe_request.event != 'conference': log.msg('Room %s - Subscription rejected: only conference event is supported' % self.uri) subscribe_request.reject(489) return NotificationCenter().add_observer(self, sender=subscribe_request) self.subscriptions.append(subscribe_request) subscribe_request.accept(conference.ConferenceDocument.content_type, self.conference_info) def _accept_proposal(self, session, streams): try: session.accept_proposal(streams) except IllegalStateError: pass session.proposal_timer = None def add_file(self, file): if file.status == 'INCOMPLETE': self.dispatch_server_message('%s has cancelled upload of file %s (%s)' % (format_identity(file.sender), os.path.basename(file.name), self.format_file_size(file.size))) else: self.dispatch_server_message('%s has uploaded file %s (%s)' % (format_identity(file.sender), os.path.basename(file.name), self.format_file_size(file.size))) self.files.append(file) self.dispatch_conference_info() if ConferenceConfig.push_file_transfer: self.dispatch_file(file) def add_screen_image(self, sender, image): sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host) screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender)) screen_image.save(image) def _update_bonjour_presence(self): num = len(self.sessions) if num == 0: num_str = 'No' elif num == 1: num_str = 'One' elif num == 2: num_str = 'Two' else: num_str = str(num) txt = u'%s participant%s' % (num_str, '' if num==1 else 's') presence_state = BonjourPresenceState('available', txt) if self.bonjour_services is Null: # This is the room being published all the time from sylk.applications.conference import ConferenceApplication ConferenceApplication().bonjour_room_service.presence_state = presence_state else: self.bonjour_services.presence_state = presence_state @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_AudioStreamDidTimeout(self, notification): stream = notification.sender session = stream.session log.msg(u'Room %s - audio stream for session %s timed out' % (self.uri, format_identity(session.remote_identity))) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session message = data.message content_type = message.content_type.lower() if content_type.startswith('text/'): self.incoming_message_queue.send((session, 'message', data)) elif content_type == 'application/blink-screensharing': self.add_screen_image(message.sender, message.body) def _NH_ChatStreamGotComposingIndication(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session self.incoming_message_queue.send((session, 'composing_indication', data)) def _NH_ChatStreamGotNicknameRequest(self, notification): nickname = notification.data.nickname session = notification.sender.session chunk = notification.data.chunk if nickname: if nickname in self.session_nickname_map.values() and (session not in self.session_nickname_map or self.session_nickname_map[session] != nickname): notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use') return self.session_nickname_map[session] = nickname self.last_nicknames_map[str(session.remote_identity.uri)] = nickname else: self.session_nickname_map.pop(session, None) self.last_nicknames_map.pop(str(session.remote_identity.uri), None) notification.sender.accept_nickname(chunk) self.dispatch_conference_info() def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender try: self.subscriptions.remove(subscription) except ValueError: pass else: notification.center.remove_observer(self, sender=subscription) def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.msg(u'Room %s - %s has put the audio session on hold' % (self.uri, format_identity(session.remote_identity))) else: log.msg(u'Room %s - %s has taken the audio session out of hold' % (self.uri, format_identity(session.remote_identity))) self.dispatch_conference_info() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] timer = reactor.callLater(3, self._accept_proposal, session, streams) old_timer = getattr(session, 'proposal_timer', None) assert old_timer is None session.proposal_timer = timer def _NH_SIPSessionProposalRejected(self, notification): if notification.data.originator == 'remote': session = notification.sender timer = getattr(session, 'proposal_timer', None) assert timer is not None timer.cancel() session.proposal_timer = None def _NH_SIPSessionHadProposalFailure(self, notification): if notification.data.originator == 'remote': session = notification.sender timer = getattr(session, 'proposal_timer', None) assert timer is not None timer.cancel() session.proposal_timer = None def _NH_SIPSessionDidRenegotiateStreams(self, notification): session = notification.sender for stream in notification.data.added_streams: notification.center.add_observer(self, sender=stream) txt = u'%s has added %s' % (format_identity(session.remote_identity), stream.type) log.msg(u'Room %s - %s' % (self.uri, txt)) self.dispatch_server_message(txt, exclude=session) if stream.type == 'audio': log.msg(u'Room %s - audio stream %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (self.uri, stream.codec, stream.sample_rate, 'encrypted' if stream.srtp_active else 'unencrypted', stream.local_rtp_address, stream.local_rtp_port, stream.remote_rtp_address, stream.remote_rtp_port)) welcome_handler = WelcomeHandler(self, session) welcome_handler.start(welcome_prompt=False) for stream in notification.data.removed_streams: notification.center.remove_observer(self, sender=stream) txt = u'%s has removed %s' % (format_identity(session.remote_identity), stream.type) log.msg(u'Room %s - %s' % (self.uri, txt)) self.dispatch_server_message(txt, exclude=session) if stream.type == 'audio': try: self.audio_conference.remove(stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() if not session.streams: log.msg(u'Room %s - %s has removed all streams, session will be terminated' % (self.uri, format_identity(session.remote_identity))) session.end() self.dispatch_conference_info() def _NH_SIPSessionTransferNewIncoming(self, notification): log.msg(u'Room %s - Call transfer request rejected, REFER must be out of dialog (RFC4579 5.5)' % self.uri) notification.sender.reject_transfer(403) def _NH_SIPSessionWillEnd(self, notification): session = notification.sender timer = getattr(session, 'proposal_timer', None) if timer is not None and timer.isActive(): timer.cancel() session.proposal_timer = None @staticmethod def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt @staticmethod def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type @staticmethod def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text @staticmethod def format_file_size(size): infinite = float('infinity') boundaries = [( 1024, '%d bytes', 1), ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] for boundary, format, divisor in boundaries: if size < boundary: return format % (size/divisor,) else: return "%d bytes" % size class MoHPlayer(object): implements(IObserver) def __init__(self, conference): self.conference = conference self.files = None self.paused = None self._player = None def start(self): files = glob('%s/*.wav' % ResourcePath('sounds/moh').normalized) if not files: log.error(u'No files found, MoH is disabled') return random.shuffle(files) self.files = cycle(files) self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_delay=1, volume=20) self.paused = True self.conference.bridge.add(self._player) NotificationCenter().add_observer(self, sender=self._player) def stop(self): if self._player is None: return NotificationCenter().remove_observer(self, sender=self._player) self._player.stop() self.paused = True self.conference.bridge.remove(self._player) self.conference = None def play(self): if self._player is not None and self.paused: self.paused = False self._play_next_file() def pause(self): if self._player is not None and not self.paused: self.paused = True self._player.stop() def _play_next_file(self): self._player.filename = next(self.files) self._player.play() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_WavePlayerDidFail(self, notification): if not self.paused: self._play_next_file() _NH_WavePlayerDidEnd = _NH_WavePlayerDidFail class WelcomeHandler(object): implements(IObserver) def __init__(self, room, session): self.room = room self.session = session self.proc = None @run_in_green_thread def start(self, welcome_prompt=True): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) self.render_chat_welcome() self.proc = proc.spawn(self.play_audio_welcome, welcome_prompt) self.proc.wait() notification_center.remove_observer(self, sender=self.session) self.session = None self.room = None self.proc = None def play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() except WavePlayerError, e: log.warning(u"Error playing file %s: %s" % (file, e)) def play_audio_welcome(self, welcome_prompt): try: audio_stream = next(stream for stream in self.session.streams if stream.type == 'audio') except StopIteration: return player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_delay=1, volume=50) audio_stream.bridge.add(player) try: if welcome_prompt: file = ResourcePath('sounds/co_welcome_conference.wav').normalized self.play_file_in_player(player, file, 1) user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(self.session.remote_identity.uri)])) if user_count == 0: file = ResourcePath('sounds/co_only_one.wav').normalized self.play_file_in_player(player, file, 0.5) elif user_count == 1: file = ResourcePath('sounds/co_there_is_one.wav').normalized self.play_file_in_player(player, file, 0.5) elif user_count < 100: file = ResourcePath('sounds/co_there_are.wav').normalized self.play_file_in_player(player, file, 0.2) if user_count <= 24: file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized self.play_file_in_player(player, file, 0.1) else: file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized self.play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized self.play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/co_more_participants.wav').normalized self.play_file_in_player(player, file, 0) file = ResourcePath('sounds/connected_tone.wav').normalized self.play_file_in_player(player, file, 0.1) except proc.ProcExit: # No need to remove the bridge from the stream, it's done automatically pass else: audio_stream.bridge.remove(player) self.room.audio_conference.add(audio_stream) self.room.audio_conference.unhold() if len(self.room.audio_conference.streams) == 1: self.room.moh_player.play() else: self.room.moh_player.pause() finally: player.stop() def render_chat_welcome(self): try: chat_stream = next(stream for stream in self.session.streams if stream.type == 'chat') except StopIteration: return txt = 'Welcome to SylkServer!' user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions) - set([str(self.session.remote_identity.uri)])) if user_count == 0: txt += ' You are the first participant' else: if user_count == 1: txt += ' There is one more participant' else: txt += ' There are %s more participants' % user_count txt += ' in this conference room.' if not ServerConfig.enable_bonjour: if self.room.config.advertise_xmpp_support or self.room.config.pstn_access_numbers: txt += '\n\nOther participants can join at these addresses:\n\n' if self.room.config.pstn_access_numbers: if len(self.room.config.pstn_access_numbers) == 1: nums = self.room.config.pstn_access_numbers[0] else: nums = ', '.join(self.room.config.pstn_access_numbers[:-1]) + ' or %s' % self.room.config.pstn_access_numbers[-1] txt += ' - Using a landline or mobile phone, dial %s (audio)\n' % nums if self.room.config.advertise_xmpp_support: txt += ' - Using an XMPP client, connect to group chat room %s (chat)\n' % self.room.uri txt += ' - Using an XMPP Jingle capable client, add contact %s and call it (audio)\n' % self.room.uri txt += ' - Using a SIP client, initiate a session to %s (audio and chat)\n' % self.room.uri chat_stream.send_message(txt, 'text/plain', local_identity=self.room.identity, recipients=[self.room.identity]) for msg in self.room.history: chat_stream.send_message(msg.body, msg.content_type, local_identity=msg.sender, recipients=[self.room.identity], timestamp=msg.timestamp) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): self.proc.kill() class RoomFile(object): def __init__(self, name, hash, size, sender, status): self.name = name self.hash = hash self.size = size self.sender = sender self.status = status @property def file_selector(self): return FileSelector.for_file(self.name.encode('utf-8'), hash=self.hash) class IncomingFileTransferHandler(object): implements(IObserver) def __init__(self, room, session): self.room = weakref.ref(room) self.room_uri = room.uri self.session = session self.stream = next(stream for stream in self.session.streams if stream.type == 'file-transfer' and stream.direction == 'recvonly') self.error = False self.ended = False self.file = None self.file_selector = None self.filename = None self.hash = None self.status = None self.timer = None self.transfer_finished = False def start(self): self.file_selector = self.stream.file_selector path = os.path.join(ConferenceConfig.file_transfer_dir, self.room_uri) makedirs(path) self.filename = filename = os.path.join(path, self.file_selector.name.decode('utf-8')) basename, ext = os.path.splitext(filename) i = 1 while os.path.exists(filename): filename = '%s_%d%s' % (basename, i, ext) i += 1 self.filename = filename try: self.file = open(self.filename, 'wb') except EnvironmentError: log.msg('Room %s - cannot write destination filename: %s' % (self.room_uri, self.filename)) self.session.end() return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) self.hash = hashlib.sha1() @run_in_thread('file-transfer') def write_chunk(self, data): notification_center = NotificationCenter() if data is not None: try: self.file.write(data) except EnvironmentError, e: notification_center.post_notification('IncomingFileTransferHandlerGotError', sender=self, data=NotificationData(error=str(e))) else: self.hash.update(data) else: self.file.close() if self.error: notification_center.post_notification('IncomingFileTransferHandlerDidFail', sender=self) else: notification_center.post_notification('IncomingFileTransferHandlerDidEnd', sender=self) @run_in_thread('file-io') def remove_bogus_file(self, filename): try: os.unlink(filename) except OSError: pass @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidEnd(self, notification): self.ended = True if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification.center.remove_observer(self, sender=self.stream) notification.center.remove_observer(self, sender=self.session) # Mark end of write operation self.write_chunk(None) def _NH_FileTransferStreamGotChunk(self, notification): self.write_chunk(notification.data.content) def _NH_FileTransferStreamDidFinish(self, notification): self.transfer_finished = True if self.timer is None: self.timer = reactor.callLater(5, self.session.end) def _NH_IncomingFileTransferHandlerGotError(self, notification): log.error('Error while handling incoming file transfer: %s' % notification.data.error) self.error = True self.status = notification.data.error if not self.ended and self.timer is None: self.timer = reactor.callLater(5, self.session.end) def _NH_IncomingFileTransferHandlerDidEnd(self, notification): notification.center.remove_observer(self, sender=self) remote_hash = self.file_selector.hash if not self.transfer_finished: log.msg('File transfer of %s cancelled' % os.path.basename(self.filename)) self.remove_bogus_file(self.filename) self.status = 'INCOMPLETE' else: local_hash = 'sha1:' + ':'.join(re.findall(r'..', self.hash.hexdigest().upper())) if local_hash != remote_hash: log.warning('Hash of transferred file does not match the remote hash (file may have changed).') self.status = 'Hash missmatch' self.remove_bogus_file(self.filename) else: self.status = 'OK' sender = CPIMIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name) file = RoomFile(self.filename, remote_hash, self.file_selector.size, sender, self.status) room = self.room() or Null room.add_file(file) self.session = None self.stream = None def _NH_IncomingFileTransferHandlerDidFail(self, notification): notification.center.remove_observer(self, sender=self) sender = CPIMIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name) file = RoomFile(self.filename, self.file_selector.hash, self.file_selector.size, sender, self.status) room = self.room() or Null room.add_file(file) self.session = None self.stream = None class OutgoingFileTransferRequestHandler(object): implements(IObserver) def __init__(self, room, session): self.room = weakref.ref(room) self.session = session self.stream = next(stream for stream in self.session.streams if stream.type == 'file-transfer') self.timer = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_FileTransferStreamDidFinish(self, notification): if self.timer is None: self.timer = reactor.callLater(2, self.session.end) def _NH_SIPSessionDidEnd(self, notification): if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) self.session = None self.stream = None _NH_SIPSessionDidFail = _NH_SIPSessionDidEnd class InterruptFileTransfer(Exception): pass class OutgoingFileTransferHandler(object): implements(IObserver) def __init__(self, room, destination, file): self.room_uri = room.identity.uri self.destination = destination self.file = file self.session = None self.stream = None self.timer = None @run_in_green_thread def start(self): self.greenlet = api.getcurrent() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI.new(self.destination) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: return notification_center = NotificationCenter() self.session = Session(account) self.stream = FileTransferStream(self.file.file_selector, 'sendonly') notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) - subject = u'File uploaded by %s' % self.file.sender from_header = FromHeader(SIPURI.new(self.room_uri), room.config.display_name) to_header = ToHeader(SIPURI.new(self.destination)) transport = routes[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)) extra_headers = [] if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(self.file.sender.uri))) - self.session.connect(from_header, to_header, contact_header=contact_header, routes=routes, streams=[self.stream], is_focus=True, subject=subject, extra_headers=extra_headers) + extra_headers.append(SubjectHeader(u'File uploaded by %s' % self.file.sender)) + self.session.connect(from_header, to_header, contact_header=contact_header, routes=routes, streams=[self.stream], is_focus=True, extra_headers=extra_headers) def stop(self): if self.session is not None: self.session.end() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_FileTransferStreamDidFinish(self, notification): if self.timer is None: self.timer = reactor.callLater(2, self.session.end) def _NH_SIPSessionDidEnd(self, notification): if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) self.session = None self.stream = None _NH_SIPSessionDidFail = _NH_SIPSessionDidEnd diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py index 151b3e1..3fffa22 100644 --- a/sylk/applications/xmppgateway/im.py +++ b/sylk/applications/xmppgateway/im.py @@ -1,451 +1,451 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from collections import deque from eventlib import coros from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Message as SIPMessageRequest from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage from sylk.extensions import ChatStream from sylk.session import Session __all__ = ['ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError'] SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout class ChatSessionHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self): self.started = False self.ended = False self.sip_session = None self.msrp_stream = None self._sip_session_timer = None self.use_receipts = False self.xmpp_session = None self._xmpp_message_queue = deque() self._pending_msrp_chunks = {} self._pending_xmpp_stanzas = {} def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('ChatSessionDidStart', sender=self) self._send_queued_messages() def _get_started(self): return self.__dict__['started'] started = property(_get_started, _set_started) del _get_started, _set_started def _set_xmpp_session(self, session): self.__dict__['xmpp_session'] = session if session is not None: # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) NotificationCenter().add_observer(self, sender=session) session.start() # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) def _get_xmpp_session(self): return self.__dict__['xmpp_session'] xmpp_session = property(_get_xmpp_session, _set_xmpp_session) del _get_xmpp_session, _set_xmpp_session @classmethod def new_from_sip_session(cls, sip_identity, session): instance = cls() instance.sip_identity = sip_identity instance._start_incoming_sip_session(session) return instance @classmethod def new_from_xmpp_stanza(cls, xmpp_identity, recipient): instance = cls() instance.xmpp_identity = xmpp_identity instance._start_outgoing_sip_session(recipient) return instance @run_in_green_thread def _start_incoming_sip_session(self, session): self.sip_session = session self.msrp_stream = next(stream for stream in session.proposed_streams if stream.type=='chat') notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.accept([self.msrp_stream]) @run_in_green_thread def _start_outgoing_sip_session(self, target_uri): notification_center = NotificationCenter() # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = target_uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return self.msrp_stream = ChatStream() route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) - self.sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self.msrp_stream]) + self.sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self.msrp_stream]) def end(self): if self.ended: return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.cancel() self._sip_session_timer = None notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session.end() self.sip_session = None self.msrp_stream = None if self.xmpp_session is not None: notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session.end() self.xmpp_session = None self.ended = True if self.started: notification_center.post_notification('ChatSessionDidEnd', sender=self) else: notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='Ended before actually started')) def enqueue_xmpp_message(self, message): if self.started: raise RuntimeError('session is already started') self._xmpp_message_queue.append(message) def _send_queued_messages(self): if self._xmpp_message_queue: while self._xmpp_message_queue: message = self._xmpp_message_queue.popleft() if message.body is None: continue if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' sender_uri = message.sender.uri.as_sip_uri() sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8')) sender = CPIMIdentity(sender_uri) self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _inactivity_timeout(self): log.msg("Ending SIP session %s due to inactivity" % self.sip_session._invitation.call_id) self.sip_session.end() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP session %s started" % notification.sender._invitation.call_id) self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout) if self.sip_session.direction == 'outgoing': # Time to set sip_identity and create the XMPPChatSession contact_uri = self.sip_session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = self.sip_session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name) session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) self.xmpp_session = session # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: if self.xmpp_session is not None: # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: # Try to wakeup XMPP clients sender = self.sip_identity tmp = self.sip_session.local_identity.uri recipient_uri = FrozenURI(tmp.user, tmp.host) recipient = Identity(recipient_uri) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, ' ', 'text/plain')) # Send queued messages self._send_queued_messages() def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP session %s ended" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP session %s failed" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.body else: html_body = message.body body = None if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) chunk = notification.data.chunk if self.started: self.xmpp_session.send_message(body, html_body, message_id=chunk.message_id) if self.use_receipts: self._pending_msrp_chunks[chunk.message_id] = chunk else: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') else: sender = self.sip_identity recipient_uri = FrozenURI.parse(message.recipients[0].uri) recipient = Identity(recipient_uri, message.recipients[0].display_name) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, body, html_body)) self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_ChatStreamGotComposingIndication(self, notification): # Notification is sent by the MSRP stream if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) if not self.started: return state = None if notification.data.state == 'active': state = 'composing' elif notification.data.state == 'idle': state = 'paused' if state is not None: self.xmpp_session.send_composing_indication(state) def _NH_ChatStreamDidDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_ChatStreamDidNotDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_error(message, 'TODO', []) # TODO def _NH_XMPPChatSessionDidStart(self, notification): if self.sip_session is not None: # Session is now established on both ends self.started = True def _NH_XMPPChatSessionDidEnd(self, notification): notification.center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session = None self.end() def _NH_XMPPChatSessionGotMessage(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': self._xmpp_message_queue.append(notification.data.message) return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.use_receipts = message.use_receipt if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' self._pending_xmpp_stanzas[message.id] = message # Prefer plaintext self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _NH_XMPPChatSessionGotComposingIndication(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message state = None if message.state == 'composing': state = 'active' elif message.state == 'paused': state = 'idle' if state is not None: sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.msrp_stream.send_composing_indication(state, 30, local_identity=sender) if message.use_receipt: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_XMPPChatSessionDidDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason) def chunks(text, size): for i in xrange(0, len(text), size): yield text[i:i+size] class SIPMessageError(Exception): def __init__(self, code, reason): Exception.__init__(self, reason) self.code = code self.reason = reason class SIPMessageSender(object): implements(IObserver) def __init__(self, message): # TODO: sometimes we may want to send it to the GRUU, for example when a XMPP client # replies to one of our messages. MESSAGE requests don't need a Contact header, though # so how should we communicate our GRUU to the recipient? self.from_uri = message.sender.uri.as_sip_uri() self.from_uri.parameters.pop('gr', None) # No GRUU in From header self.to_uri = message.recipient.uri.as_sip_uri() self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU self.body = message.body self.content_type = 'text/plain' self._requests = set() self._channel = coros.queue() @run_in_waitable_green_thread def send(self): lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: msg = 'DNS lookup error while looking for %s proxy' % uri log.warning(msg) raise SIPMessageError(0, msg) else: route = routes.pop(0) from_header = FromHeader(self.from_uri) to_header = ToHeader(self.to_uri) route_header = RouteHeader(route.uri) notification_center = NotificationCenter() for chunk in chunks(self.body, 1000): request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body) notification_center.add_observer(self, sender=request) self._requests.add(request) request.send() error = None count = len(self._requests) while count > 0: notification = self._channel.wait() if notification.name == 'SIPMessageDidFail': error = (notification.data.code, notification.data.reason) count -= 1 self._requests.clear() if error is not None: raise SIPMessageError(*error) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPMessageDidSucceed(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._channel.send(notification) def _NH_SIPMessageDidFail(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._channel.send(notification) diff --git a/sylk/applications/xmppgateway/media.py b/sylk/applications/xmppgateway/media.py index a6b617c..1cabae7 100644 --- a/sylk/applications/xmppgateway/media.py +++ b/sylk/applications/xmppgateway/media.py @@ -1,333 +1,333 @@ # Copyright (C) 2013 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib.twistedutil import block_on from sipsimple.audio import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import ContactHeader, FromHeader, ToHeader from sipsimple.core import Engine, SIPURI, SIPCoreError from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry as SIPMediaStreamRegistry from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource, decode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry as JingleMediaStreamRegistry from sylk.applications.xmppgateway.xmpp.stanzas import jingle from sylk.configuration import SIPConfig from sylk.session import Session __all__ = ['MediaSessionHandler'] class MediaSessionHandler(object): implements(IObserver) def __init__(self): self.started = False self.ended = False self._sip_identity = None self._xmpp_identity = None self._audio_bidge = AudioConference() self.sip_session = None self.jingle_session = None @classmethod def new_from_sip_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = JingleMediaStreamRegistry().get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject(488) return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) instance._sip_identity = Identity(sip_leg_uri) # Get URI representing the XMPP side request_uri = session._invitation.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) instance._xmpp_identity = Identity(xmpp_leg_uri) instance.sip_session = session instance._start_outgoing_jingle_session(streams) return instance @classmethod def new_from_jingle_session(cls, session): proposed_stream_types = set([stream.type for stream in session.proposed_streams]) streams = [] for stream_type in proposed_stream_types: try: klass = SIPMediaStreamRegistry().get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject('unsupported-applications') return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) instance._xmpp_identity = session.remote_identity instance._sip_identity = session.local_identity instance.jingle_session = session instance._start_outgoing_sip_session(streams) return instance @property def sip_identity(self): return self._sip_identity @property def xmpp_identity(self): return self._xmpp_identity def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('MediaSessionHandlerDidStart', sender=self) def _get_started(self): return self.__dict__['started'] started = property(_get_started, _set_started) del _get_started, _set_started @run_in_green_thread def _start_outgoing_sip_session(self, streams): notification_center = NotificationCenter() # self.xmpp_identity is our local identity on the SIP side from_uri = self.xmpp_identity.uri.as_sip_uri() from_uri.parameters.pop('gr', None) # no GRUU in From header to_uri = self.sip_identity.uri.as_sip_uri() to_uri.parameters.pop('gr', None) # no GRUU in To header # TODO: need to fix GRUU in the proxy #contact_uri = self.xmpp_identity.uri.as_sip_uri() #contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('MedialSessionHandlerDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) contact_header = ContactHeader(contact_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) - self.sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=streams) + self.sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=streams) @run_in_green_thread def _start_outgoing_jingle_session(self, streams): if self.xmpp_identity.uri.resource is not None: self.sip_session.reject() return xmpp_manager = XMPPManager() local_jid = self.sip_identity.uri.as_xmpp_jid() remote_jid = self.xmpp_identity.uri.as_xmpp_jid() # If this was an invitation to a conference, use the information in the Referred-By header if self.sip_identity.uri.host in xmpp_manager.muc_domains and self.sip_session.transfer_info and self.sip_session.transfer_info.referred_by: try: referred_by_uri = SIPURI.parse(self.sip_session.transfer_info.referred_by) except SIPCoreError: self.sip_session.reject(488) return else: inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) local_jid = inviter_uri.as_xmpp_jid() # Use disco to gather potential JIDs to call d = xmpp_manager.disco_client_protocol.requestItems(remote_jid, sender=local_jid) try: items = block_on(d) except Exception: items = [] if not items: self.sip_session.reject(480) return # Check which items support Jingle valid = [] for item in items: d = xmpp_manager.disco_client_protocol.requestInfo(item.entity, nodeIdentifier=item.nodeIdentifier, sender=local_jid) try: info = block_on(d) except Exception: continue if jingle.NS_JINGLE in info.features and jingle.NS_JINGLE_APPS_RTP in info.features: valid.append(item.entity) if not valid: self.sip_session.reject(480) return # TODO: start multiple sessions? self._xmpp_identity = Identity(FrozenURI.parse(valid[0])) notification_center = NotificationCenter() if self.sip_identity.uri.host in xmpp_manager.muc_domains: self.jingle_session = JingleSession(xmpp_manager.jingle_coin_protocol) else: self.jingle_session = JingleSession(xmpp_manager.jingle_protocol) notification_center.add_observer(self, sender=self.jingle_session) self.jingle_session.connect(self.sip_identity, self.xmpp_identity, streams, is_focus=self.sip_session.remote_focus) def end(self): if self.ended: return notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) if self.sip_session.direction == 'incoming' and not self.started: self.sip_session.reject() else: self.sip_session.end() self.sip_session = None if self.jingle_session is not None: notification_center.remove_observer(self, sender=self.jingle_session) if self.jingle_session.direction == 'incoming' and not self.started: self.jingle_session.reject() else: self.jingle_session.end() self.jingle_session = None self.ended = True if self.started: notification_center.post_notification('MediaSessionHandlerDidEnd', sender=self) else: notification_center.post_notification('MediaSessionHandlerDidFail', sender=self) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP session %s started" % notification.sender._invitation.call_id) if self.sip_session.direction == 'outgoing': # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.jingle_session.accept(self.jingle_session.proposed_streams, is_focus=self.sip_session.remote_focus) else: # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP session %s ended" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP session %s failed (%s)" % (notification.sender._invitation.call_id, notification.data.reason)) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_SIPSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.jingle_session.hold() else: self.jingle_session.unhold() def _NH_SIPSessionGotConferenceInfo(self, notification): self.jingle_session._send_conference_info(notification.data.conference_info.toxml()) def _NH_JingleSessionDidStart(self, notification): log.msg("Jingle session %s started" % notification.sender.id) if self.jingle_session.direction == 'incoming': # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) else: # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.sip_session.accept(self.sip_session.proposed_streams) def _NH_JingleSessionDidEnd(self, notification): log.msg("Jingle session %s ended" % notification.sender.id) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidFail(self, notification): log.msg("Jingle session %s failed (%s)" % (notification.sender.id, notification.data.reason)) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.sip_session.hold() else: self.sip_session.unhold() diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py index 07a9abf..ed8fac2 100644 --- a/sylk/applications/xmppgateway/muc.py +++ b/sylk/applications/xmppgateway/muc.py @@ -1,470 +1,470 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import random import uuid from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute from eventlib import coros, proc from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError, Referral, sip_status_messages from sipsimple.core import ContactHeader, FromHeader, ToHeader, ReferToHeader, RouteHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams.msrp import ChatStreamError from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from time import time from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, OutgoingInvitationMessage, STANZAS_NS from sylk.configuration import SIPConfig from sylk.extensions import ChatStream from sylk.session import Session class ReferralError(Exception): def __init__(self, error, code=0): self.error = error self.code = code class SIPReferralDidFail(Exception): def __init__(self, data): self.data = data class MucInvitationFailure(object): def __init__(self, code, reason): self.code = code self.reason = reason def __str__(self): return '%s (%s)' % (self.code, self.reason) class X2SMucInvitationHandler(object): implements(IObserver) def __init__(self, sender, recipient, participant): self.sender = sender self.recipient = recipient self.participant = participant self.active = False self.route = None self._channel = coros.queue() self._referral = None self._failure = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='NetworkConditionsDidChange') proc.spawn(self._run) notification_center.post_notification('X2SMucInvitationHandlerDidStart', sender=self) def _run(self): notification_center = NotificationCenter() settings = SIPSimpleSettings() sender_uri = self.sender.uri.as_sip_uri() recipient_uri = self.recipient.uri.as_sip_uri() participant_uri = self.participant.uri.as_sip_uri() try: # Lookup routes account = DefaultAccount() if account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(recipient_uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise ReferralError(error='DNS lookup failed: %s' % e) timeout = time() + 30 for route in routes: self.route = route remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) refer_to_header = ReferToHeader(str(participant_uri)) refer_to_header.parameters['method'] = 'INVITE' referral = Referral(recipient_uri, FromHeader(sender_uri), ToHeader(recipient_uri), refer_to_header, ContactHeader(contact_uri), RouteHeader(route.uri), account.credentials) notification_center.add_observer(self, sender=referral) try: referral.send_refer(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=referral) timeout = 5 raise ReferralError(error='Internal error') self._referral = referral try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidStart': break except SIPReferralDidFail, e: notification_center.remove_observer(self, sender=referral) self._referral = None if e.data.code in (403, 405): raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code) else: # Otherwise just try the next route continue else: break else: self.route = None raise ReferralError(error='No more routes to try') # At this point it is subscribed. Handle notifications and ending/failures. try: self.active = True while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidEnd': break except SIPReferralDidFail, e: notification_center.remove_observer(self, sender=self._referral) raise ReferralError(error=e.data.reason, code=e.data.code) else: notification_center.remove_observer(self, sender=self._referral) finally: self.active = False except ReferralError, e: self._failure = MucInvitationFailure(e.code, e.error) finally: notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._referral = None if self._failure is not None: notification_center.post_notification('X2SMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure)) else: notification_center.post_notification('X2SMucInvitationHandlerDidEnd', sender=self) def _refresh(self): account = DefaultAccount() transport = self.route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) contact_header = ContactHeader(contact_uri) self._referral.refresh(contact_header=contact_header, timeout=2) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPReferralDidStart(self, notification): self._channel.send(notification) def _NH_SIPReferralDidEnd(self, notification): self._channel.send(notification) def _NH_SIPReferralDidFail(self, notification): self._channel.send_exception(SIPReferralDidFail(notification.data)) def _NH_SIPReferralGotNotify(self, notification): self._channel.send(notification) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._refresh() class S2XMucInvitationHandler(object): implements(IObserver) def __init__(self, session, sender, recipient, inviter): self.session = session self.sender = sender self.recipient = recipient self.inviter = inviter self._timer = None self._failure = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) stanza = OutgoingInvitationMessage(self.sender, self.recipient, self.inviter, id='MUC.'+uuid.uuid4().hex) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) self._timer = reactor.callLater(90, self._timeout) notification_center.post_notification('S2XMucInvitationHandlerDidStart', sender=self) def stop(self): if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None notification_center = NotificationCenter() if self.session is not None: notification_center.remove_observer(self, sender=self.session) reactor.callLater(5, self._end_session, self.session) self.session = None if self._failure is not None: notification_center.post_notification('S2XMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure)) else: notification_center.post_notification('S2XMucInvitationHandlerDidEnd', sender=self) def _end_session(self, session): try: session.end(480) except Exception: pass def _timeout(self): NotificationCenter().remove_observer(self, sender=self.session) try: self.session.end(408) except Exception: pass self.session = None self._failure = MucInvitationFailure('Timeout', 408) self.stop() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidFail(self, notification): notification.center.remove_observer(self, sender=self.session) self.session = None self._failure = MucInvitationFailure(notification.data.reason or notification.data.failure_reason, notification.data.code) self.stop() class X2SMucHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity, nickname): self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.nickname = nickname self._xmpp_muc_session = None self._sip_session = None self._msrp_stream = None self._first_stanza = None self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza self._participants = set() # set of (URI, nickname) tuples self.ended = False def start(self): notification_center = NotificationCenter() self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session.start() notification_center.post_notification('X2SMucHandlerDidStart', sender=self) self._start_sip_session() def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_muc_session is not None: notification_center.remove_observer(self, sender=self._xmpp_muc_session) # Send indication that the user has been kicked from the room sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('307') xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) self._xmpp_muc_session.end() self._xmpp_muc_session = None if self._sip_session is not None: notification_center.remove_observer(self, sender=self._sip_session) self._sip_session.end() self._sip_session = None self.ended = True notification_center.post_notification('X2SMucHandlerDidEnd', sender=self) @run_in_green_thread def _start_sip_session(self): # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = self.sip_identity.uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) self.end() return self._msrp_stream = ChatStream() route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self._sip_session = Session(account) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._sip_session) notification_center.add_observer(self, sender=self._msrp_stream) - self._sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self._msrp_stream]) + self._sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self._msrp_stream]) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP multiparty session %s started" % notification.sender._invitation.call_id) if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed: self.end() return message_id = self._msrp_stream.set_local_nickname(self.nickname) self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza) self._first_stanza = None def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP multiparty session %s ended" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self._sip_session) notification.center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP multiparty session %s failed" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self._sip_session) notification.center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self._sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self._sip_session.reject_transfer(403) def _NH_SIPSessionGotConferenceInfo(self, notification): # Translate to XMPP payload xmpp_manager = XMPPManager() own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host) conference_info = notification.data.conference_info new_participants = set() for user in conference_info.users: user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity) nickname = user.display_text.value if user.display_text else user.entity new_participants.add((user_uri, nickname)) # Remove participants that are no longer in the room for uri, nickname in self._participants - new_participants: sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) xmpp_manager.send_muc_stanza(stanza) # Send presence for current participants for uri, nickname in new_participants: if uri == own_uri: continue sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = Identity(uri) xmpp_manager.send_muc_stanza(stanza) self._participants = new_participants # Send own status last sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('110') xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream if not self._xmpp_muc_session: return message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.body else: html_body = message.body body = None resource = message.sender.display_name or str(message.sender.uri) sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource)) self._xmpp_muc_session.send_message(sender, body, html_body, message_id='MUC.'+uuid.uuid4().hex) self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') def _NH_ChatStreamDidSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) self.nickname = nickname def _NH_ChatStreamDidNotSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', [('conflict', STANZAS_NS)]) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(error_stanza) def _NH_ChatStreamDidDeliverMessage(self, notification): # Echo back the message to the sender stanza = self._pending_messages_map.pop(notification.data.message_id) stanza.sender, stanza.recipient = stanza.recipient, stanza.sender stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamDidNotDeliverMessage(self, notification): self._pending_messages_map.pop(notification.data.message_id) def _NH_XMPPIncomingMucSessionDidEnd(self, notification): notification.center.remove_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session = None self.end() def _NH_XMPPIncomingMucSessionGotMessage(self, notification): if not self._sip_session: return message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri, display_name=self.nickname) message_id = self._msrp_stream.send_message(message.body, 'text/plain', local_identity=sender) self._pending_messages_map[message_id] = message # Message will be echoed back to the sender on ChatStreamDidDeliverMessage def _NH_XMPPIncomingMucSessionChangedNickname(self, notification): if not self._sip_session: return nickname = notification.data.nickname try: message_id = self._msrp_stream.set_local_nickname(nickname) except ChatStreamError: return self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza) diff --git a/sylk/session.py b/sylk/session.py index 60f81f1..fa8d903 100644 --- a/sylk/session.py +++ b/sylk/session.py @@ -1,595 +1,1931 @@ # Copyright (C) 2011 AG Projects. See LICENSE for details. # import random -from datetime import datetime +from threading import RLock from time import time -from application.notification import IObserver, NotificationCenter, NotificationData +from application.notification import IObserver, Notification, NotificationCenter, NotificationData from application.python import Null, limit +from application.python.decorator import decorator, preserve_signature from application.python.types import Singleton +from application.system import host from eventlib import api, coros, proc from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings -from sipsimple.core import Engine, Invitation, Subscription, SIPCoreError, sip_status_messages -from sipsimple.core import ContactHeader, RouteHeader, SubjectHeader, FromHeader, ToHeader -from sipsimple.core import SIPURI, SDPConnection, SDPSession +from sipsimple.core import Engine, Invitation, Subscription, SIPCoreError, SIPCoreInvalidStateError, PJSIPError, sip_status_messages +from sipsimple.core import ContactHeader, RouteHeader, FromHeader, ToHeader, ReasonHeader, WarningHeader +from sipsimple.core import SIPURI, SDPConnection, SDPSession, SDPMediaStream from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import ParserError from sipsimple.payloads.conference import ConferenceDocument -from sipsimple.session import Session as _Session -from sipsimple.session import SessionReplaceHandler, TransferHandler, DialogID, TransferInfo -from sipsimple.session import InvitationDisconnectedError, MediaStreamDidFailError, InterruptSubscription, TerminateSubscription, SubscriptionError, SIPSubscriptionDidFail -from sipsimple.session import transition_state from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread +from sipsimple.util import ISOTimestamp from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount -from sylk.configuration import SIPConfig + + +class InvitationDisconnectedError(Exception): + def __init__(self, invitation, data): + self.invitation = invitation + self.data = data + +class MediaStreamDidNotInitializeError(Exception): + def __init__(self, stream, data): + self.stream = stream + self.data = data + +class MediaStreamDidFailError(Exception): + def __init__(self, stream, data): + self.stream = stream + self.data = data + +class SubscriptionError(Exception): + def __init__(self, error, timeout, **attributes): + self.error = error + self.timeout = timeout + self.attributes = attributes + +class SIPSubscriptionDidFail(Exception): + def __init__(self, data): + self.data = data + +class InterruptSubscription(Exception): + pass + +class TerminateSubscription(Exception): + pass + +class IllegalStateError(RuntimeError): + pass + +@decorator +def transition_state(required_state, new_state): + def state_transitioner(func): + @preserve_signature(func) + def wrapper(obj, *args, **kwargs): + with obj._lock: + if obj.state != required_state: + raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) + obj.state = new_state + return func(obj, *args, **kwargs) + return wrapper + return state_transitioner + +@decorator +def check_state(required_states): + def state_checker(func): + @preserve_signature(func) + def wrapper(obj, *args, **kwargs): + if obj.state not in required_states: + raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) + return func(obj, *args, **kwargs) + return wrapper + return state_checker class ConferenceHandler(object): implements(IObserver) def __init__(self, session): self.session = session self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _activate(self): self.active = True command = Command('subscribe') self._command_channel.send(command) return command def _deactivate(self): self.active = False command = Command('unsubscribe') self._command_channel.send(command) return command def _resubscribe(self): command = Command('subscribe') self._command_channel.send(command) return command def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._deactivate() command = Command('terminate') self._command_channel.send(command) command.wait() self.session = None def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): command.signal() raise proc.ProcExit() def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) target_uri = SIPURI.new(self.session.remote_identity.uri) - refresh_interval = getattr(command, 'refresh_interval', account.sip.subscribe_interval) + refresh_interval = getattr(command, 'refresh_interval', account.sip.subscribe_interval) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: - transport = route.transport - parameters = {} if transport=='udp' else {'transport': transport} - contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) - subscription = Subscription(target_uri, FromHeader(SIPURI.new(self.session.local_identity.uri)), + try: + contact_uri = account.contact[route] + except KeyError: + continue + subscription = Subscription(target_uri, + FromHeader(SIPURI.new(self.session.local_identity.uri)), ToHeader(target_uri), ContactHeader(contact_uri), 'conference', RouteHeader(route.uri), credentials=account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) timeout = 5 raise SubscriptionError(error='Internal error', timeout=timeout) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail, e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time timeout = random.uniform(60, 120) raise SubscriptionError(error='Authentication failed', timeout=timeout) elif e.data.code == 423: # Get the value of the Min-Expires header timeout = random.uniform(60, 120) if e.data.min_expires is not None and e.data.min_expires > refresh_interval: raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires) else: raise SubscriptionError(error='Interval too short', timeout=timeout) elif e.data.code in (405, 406, 489, 1400): command.signal(e) return else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription timeout = random.uniform(60, 180) raise SubscriptionError(error='No more routes to try', timeout=timeout) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'conference' and notification.data.body: try: conference_info = ConferenceDocument.parse(notification.data.body) except ParserError: pass else: notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info)) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) except InterruptSubscription, e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription, e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) except SubscriptionError, e: if 'min_expires' in e.attributes: command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires']) else: command = Command('subscribe', command.event) self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_SIPSessionDidStart(self, notification): if self.session.remote_focus: self._activate() @run_in_green_thread def _NH_SIPSessionDidFail(self, notification): self._terminate() @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): self._terminate() def _NH_SIPSessionDidRenegotiateStreams(self, notification): if self.session.remote_focus and not self.active: self._activate() elif not self.session.remote_focus and self.active: self._deactivate() def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._resubscribe() -class Session(_Session): +class Session(object): + implements(IObserver) + + media_stream_timeout = 15 + short_reinvite_timeout = 5 + + def __init__(self, account): + self.account = account + self.direction = None + self.end_time = None + self.on_hold = False + self.proposed_streams = None + self.route = None + self.state = None + self.start_time = None + self.streams = None + self.transport = None + self.local_focus = False + self.remote_focus = False + self.greenlet = None + self.conference = None + self._channel = coros.queue() + self._hold_in_progress = False + self._invitation = None + self._local_identity = None + self._remote_identity = None + self._lock = RLock() def init_incoming(self, invitation, data): remote_sdp = invitation.sdp.proposed_remote if not remote_sdp: invitation.send_response(488) return self.proposed_streams = [] for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry(): try: stream = stream_type.new_from_sdp(self, remote_sdp, index) except InvalidStreamError: break except UnknownStreamError: continue else: stream.index = index self.proposed_streams.append(stream) break if not self.proposed_streams: invitation.send_response(488) return self.direction = 'incoming' self.state = 'incoming' self.transport = invitation.transport self._invitation = invitation self.conference = ConferenceHandler(self) - self.transfer_handler = TransferHandler(self) if 'isfocus' in invitation.remote_contact_header.parameters: self.remote_focus = True - try: - self.__dict__['subject'] = data.headers['Subject'].subject - except KeyError: - pass - if 'Referred-By' in data.headers or 'Replaces' in data.headers: - self.transfer_info = TransferInfo() - if 'Referred-By' in data.headers: - self.transfer_info.referred_by = data.headers['Referred-By'].body - if 'Replaces' in data.headers: - replaces_header = data.headers.get('Replaces') - replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=replaces_header.from_tag) - session_manager = SessionManager() - try: - self.replaced_session = next(session for session in session_manager.sessions if session._invitation is not None and session._invitation.dialog_id == replaced_dialog_id) - except StopIteration: - invitation.send_response(481) - return - else: - self.transfer_info.replaced_dialog_id = replaced_dialog_id - replace_handler = SessionReplaceHandler(self) - replace_handler.start() notification_center = NotificationCenter() notification_center.add_observer(self, sender=invitation) notification_center.post_notification('SIPSessionNewIncoming', self, NotificationData(streams=self.proposed_streams, headers=data.headers)) @transition_state(None, 'connecting') @run_in_green_thread - def connect(self, from_header, to_header, routes, streams, contact_header=None, is_focus=False, subject=None, extra_headers=[]): + def connect(self, from_header, to_header, route, streams, is_focus=False, contact_header=None, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False unhandled_notifications = [] + extra_headers = extra_headers or [] + + if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): + raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') self.direction = 'outgoing' self.proposed_streams = streams - self.route = routes[0] + self.route = route self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() self._local_identity = from_header self._remote_identity = to_header self.conference = ConferenceHandler(self) - self.transfer_handler = Null - self.__dict__['subject'] = subject notification_center.add_observer(self, sender=self._invitation) notification_center.post_notification('SIPSessionNewOutgoing', self, NotificationData(streams=streams[:])) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 if contact_header is None: try: contact_uri = self.account.contact[self.route] except KeyError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e)) return else: contact_header = ContactHeader(contact_uri) local_ip = contact_header.uri.host connection = SDPConnection(local_ip) local_sdp = SDPSession(local_ip, name=settings.user_agent) for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection local_sdp.media.append(media) route_header = RouteHeader(self.route.uri) if is_focus: contact_header.parameters['isfocus'] = None - if self.subject: - extra_headers.append(SubjectHeader(self.subject)) self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, extra_headers=extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self, ) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connected': if not connected: connected = True else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.end() return notification_center.post_notification('SIPSessionWillStart', self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) [self._channel.send(notification) for notification in invitation_notifications] while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connected': if not connected: connected = True else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) - except (MediaStreamDidFailError, api.TimeoutError), e: + except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' + elif isinstance(e, MediaStreamDidNotInitializeError): + error = 'media stream did not initialize: %s' % e.data.reason else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=0, reason=None, error=error) except InvitationDisconnectedError, e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator)) - self.end_time = datetime.now() + self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' elif e.data.originator == 'local' and e.data.code == 408: code = e.data.code reason = e.data.reason else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None except SIPCoreError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None - self.start_time = datetime.now() + self.start_time = ISOTimestamp.now() + any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in self.streams) + if any_stream_ice: + self._reinvite_after_ice() + notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) + for notification in unhandled_notifications: + self.handle_notification(notification) + if self._hold_in_progress: + self._send_hold() + + def _reinvite_after_ice(self): + # This function does not do any error checking, it's designed to be called at the end of connect and ad + self.state = 'sending_proposal' + self.greenlet = api.getcurrent() + + local_sdp = SDPSession.new(self._invitation.sdp.active_local) + local_sdp.version += 1 + for index, stream in enumerate(self.streams): + local_sdp.media[index] = stream.get_local_media(remote_sdp=None, index=index) + self._invitation.send_reinvite(sdp=local_sdp) + + received_invitation_state = False + received_sdp_update = False + try: + with api.timeout(self.short_reinvite_timeout): + while not received_invitation_state or not received_sdp_update: + notification = self._channel.wait() + if notification.name == 'SIPInvitationGotSDPUpdate': + received_sdp_update = True + if notification.data.succeeded: + local_sdp = notification.data.local_sdp + remote_sdp = notification.data.remote_sdp + for index, stream in enumerate(self.streams): + stream.update(local_sdp, remote_sdp, index) + else: + return + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + received_invitation_state = True + elif notification.data.state == 'disconnected': + self.end() + return + except Exception: + pass + finally: + self.state = 'connected' + self.greenlet = None + + @check_state(['incoming', 'received_proposal']) + @run_in_green_thread + def send_ring_indication(self): + try: + self._invitation.send_response(180) + except SIPCoreInvalidStateError: + pass # The INVITE session might have already been cancelled; ignore the error + + @transition_state('incoming', 'accepting') + @run_in_green_thread + def accept(self, streams, is_focus=False, extra_headers=None): + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + settings = SIPSimpleSettings() + + self.local_focus = is_focus + connected = False + unhandled_notifications = [] + extra_headers = extra_headers or [] + + if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): + raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') + + for stream in self.proposed_streams: + if stream in streams: + notification_center.add_observer(self, sender=stream) + stream.initialize(self, direction='incoming') + self.proposed_streams = streams + + try: + wait_count = len(self.proposed_streams) + while wait_count > 0: + notification = self._channel.wait() + if notification.name == 'MediaStreamDidInitialize': + wait_count -= 1 + + remote_sdp = self._invitation.sdp.proposed_remote + sdp_connection = remote_sdp.connection or next(media.connection for media in remote_sdp.media if media.connection is not None) + local_ip = host.outgoing_ip_for(sdp_connection.address) if sdp_connection.address != '0.0.0.0' else sdp_connection.address + if local_ip is None: + for stream in self.proposed_streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address') + return + connection = SDPConnection(local_ip) + local_sdp = SDPSession(local_ip, name=settings.user_agent) + stream_map = dict((stream.index, stream) for stream in self.proposed_streams) + for index, media in enumerate(remote_sdp.media): + stream = stream_map.get(index, None) + if stream is not None: + media = stream.get_local_media(remote_sdp=remote_sdp, index=index) + if not media.has_ice_attributes and not media.has_ice_candidates: + media.connection = connection + else: + media = SDPMediaStream.new(media) + media.connection = connection + media.port = 0 + media.attributes = [] + media.bandwidth_info = [] + local_sdp.media.append(media) + contact_header = ContactHeader.new(self._invitation.local_contact_header) + try: + local_contact_uri = self.account.contact[self._invitation.transport] + contact_header.uri = local_contact_uri + except KeyError: + pass + if is_focus: + contact_header.parameters['isfocus'] = None + self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp, extra_headers=extra_headers) + notification_center.post_notification('SIPSessionWillStart', sender=self) + # Local and remote SDPs will be set after the 200 OK is sent + while True: + notification = self._channel.wait() + if notification.name == 'SIPInvitationGotSDPUpdate': + if notification.data.succeeded: + local_sdp = notification.data.local_sdp + remote_sdp = notification.data.remote_sdp + break + else: + for stream in self.proposed_streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) + return + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected': + if not connected: + connected = True + elif notification.data.prev_state == 'connected': + unhandled_notifications.append(notification) + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + wait_count = 0 + stream_map = dict((stream.index, stream) for stream in self.proposed_streams) + for index, local_media in enumerate(local_sdp.media): + remote_media = remote_sdp.media[index] + stream = stream_map.get(index, None) + if stream is not None: + if remote_media.port: + wait_count += 1 + stream.start(local_sdp, remote_sdp, index) + else: + notification_center.remove_observer(self, sender=stream) + self.proposed_streams.remove(stream) + del stream_map[stream.index] + stream.deactivate() + stream.end() + removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] + for stream in removed_streams: + notification_center.remove_observer(self, sender=stream) + self.proposed_streams.remove(stream) + del stream_map[stream.index] + stream.deactivate() + stream.end() + with api.timeout(self.media_stream_timeout): + while wait_count > 0 or not connected or self._channel: + notification = self._channel.wait() + if notification.name == 'MediaStreamDidStart': + wait_count -= 1 + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected': + if not connected: + connected = True + elif notification.data.prev_state == 'connected': + unhandled_notifications.append(notification) + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + else: + unhandled_notifications.append(notification) + except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e: + for stream in self.proposed_streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + reason_header = None + if isinstance(e, api.TimeoutError): + if wait_count > 0: + error = 'media stream timed out while starting' + else: + error = 'No ACK received' + reason_header = ReasonHeader('SIP') + reason_header.cause = 500 + reason_header.text = 'Missing ACK' + elif isinstance(e, MediaStreamDidNotInitializeError): + error = 'media stream did not initialize: %s' % e.data.reason + reason_header = ReasonHeader('SIP') + reason_header.cause = 500 + reason_header.text = 'media stream did not initialize' + else: + error = 'media stream failed: %s' % e.data.reason + reason_header = ReasonHeader('SIP') + reason_header.cause = 500 + reason_header.text = 'media stream failed to start' + self.start_time = ISOTimestamp.now() + if self._invitation.state in ('incoming', 'early'): + self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header) + else: + self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header) + except InvitationDisconnectedError, e: + notification_center.remove_observer(self, sender=self._invitation) + for stream in self.proposed_streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + self.state = 'terminated' + if e.data.prev_state in ('incoming', 'early'): + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None)) + elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK': + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None)) + else: + notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote')) + self.end_time = ISOTimestamp.now() + notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason)) + self.greenlet = None + except SIPCoreInvalidStateError: + # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party + notification_center.remove_observer(self, sender=self._invitation) + for stream in self.proposed_streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + self.greenlet = None + self.state = 'terminated' + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) + except SIPCoreError, e: + for stream in self.proposed_streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) + else: + self.greenlet = None + self.state = 'connected' + self.streams = self.proposed_streams + self.proposed_streams = None + self.start_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() + @transition_state('incoming', 'terminating') + @run_in_green_thread + def reject(self, code=603, reason=None): + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + + try: + self._invitation.send_response(code, reason) + with api.timeout(1): + while True: + notification = self._channel.wait() + if notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'disconnected': + break + except SIPCoreInvalidStateError: + # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party + notification_center.remove_observer(self, sender=self._invitation) + self.greenlet = None + self.state = 'terminated' + self.proposed_streams = None + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) + except SIPCoreError, e: + self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) + except api.TimeoutError: + notification_center.remove_observer(self, sender=self._invitation) + self.greenlet = None + self.state = 'terminated' + self.proposed_streams = None + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None)) + else: + notification_center.remove_observer(self, sender=self._invitation) + self.greenlet = None + self.state = 'terminated' + self.proposed_streams = None + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None)) + + @transition_state('received_proposal', 'accepting_proposal') + @run_in_green_thread + def accept_proposal(self, streams): + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + + unhandled_notifications = [] + + streams = [stream for stream in streams if stream in self.proposed_streams] + for stream in streams: + notification_center.add_observer(self, sender=stream) + stream.initialize(self, direction='incoming') + + try: + wait_count = len(streams) + while wait_count > 0: + notification = self._channel.wait() + if notification.name == 'MediaStreamDidInitialize': + wait_count -= 1 + + local_sdp = SDPSession.new(self._invitation.sdp.active_local) + local_sdp.version += 1 + remote_sdp = self._invitation.sdp.proposed_remote + connection = SDPConnection(local_sdp.address) + stream_map = dict((stream.index, stream) for stream in streams) + for index, media in enumerate(remote_sdp.media): + stream = stream_map.get(index, None) + if stream is not None: + media = stream.get_local_media(remote_sdp=remote_sdp, index=index) + if not media.has_ice_attributes and not media.has_ice_candidates: + media.connection = connection + if index < len(local_sdp.media): + local_sdp.media[index] = media + else: + local_sdp.media.append(media) + elif index >= len(local_sdp.media): # actually == is sufficient + media = SDPMediaStream.new(media) + media.connection = connection + media.port = 0 + media.attributes = [] + media.bandwidth_info = [] + local_sdp.media.append(media) + self._invitation.send_response(200, sdp=local_sdp) + + prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) + + received_invitation_state = False + received_sdp_update = False + while not received_invitation_state or not received_sdp_update: + notification = self._channel.wait() + if notification.name == 'SIPInvitationGotSDPUpdate': + received_sdp_update = True + if notification.data.succeeded: + local_sdp = notification.data.local_sdp + remote_sdp = notification.data.remote_sdp + for stream in self.streams: + stream.update(local_sdp, remote_sdp, stream.index) + else: + self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error) + return + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + received_invitation_state = True + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + + on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) + if on_hold_streams != prev_on_hold_streams: + hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) + notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), + partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) + + for stream in streams: + # TODO: check if port is 0 in local_sdp. In that case PJSIP disabled the stream becuase it couldn't + # negotiation failed. If there are more streams, however, the negotiation is considered successful as a + # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind io + # OK, but we cannot really start the stream. -Saul + stream.start(local_sdp, remote_sdp, stream.index) + with api.timeout(self.media_stream_timeout): + wait_count = len(streams) + while wait_count > 0 or self._channel: + notification = self._channel.wait() + if notification.name == 'MediaStreamDidStart': + wait_count -= 1 + else: + unhandled_notifications.append(notification) + except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e: + if isinstance(e, api.TimeoutError): + error = 'media stream timed out while starting' + elif isinstance(e, MediaStreamDidNotInitializeError): + error = 'media stream did not initialize: %s' % e.data.reason + else: + error = 'media stream failed: %s' % e.data.reason + self._fail_proposal(originator='remote', error=error) + except InvitationDisconnectedError, e: + self._fail_proposal(originator='remote', error='session ended') + notification = Notification('SIPInvitationChangedState', e.invitation, e.data) + notification.center = notification_center + self.handle_notification(notification) + except SIPCoreError, e: + self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) + else: + self.greenlet = None + self.state = 'connected' + self.streams = self.streams + streams + proposed_streams = self.proposed_streams[:] + self.proposed_streams = None + notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=proposed_streams)) + notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[])) + for notification in unhandled_notifications: + self.handle_notification(notification) + if self._hold_in_progress: + self._send_hold() + + @transition_state('received_proposal', 'rejecting_proposal') + @run_in_green_thread + def reject_proposal(self, code=488, reason=None): + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + + try: + self._invitation.send_response(code, reason) + with api.timeout(1, None): + while True: + notification = self._channel.wait() + if notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + break + except SIPCoreError, e: + self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) + else: + self.greenlet = None + self.state = 'connected' + proposed_streams = self.proposed_streams[:] + self.proposed_streams = None + notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=proposed_streams)) + if self._hold_in_progress: + self._send_hold() + + def add_stream(self, stream): + self.add_streams([stream]) + + @transition_state('connected', 'sending_proposal') + @run_in_green_thread + def add_streams(self, streams): + streams = list(set(streams).difference(self.streams)) + if not streams: + self.state = 'connected' + return + + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + settings = SIPSimpleSettings() + unhandled_notifications = [] + + self.proposed_streams = streams + for stream in self.proposed_streams: + notification_center.add_observer(self, sender=stream) + stream.initialize(self, direction='outgoing') + + try: + wait_count = len(self.proposed_streams) + while wait_count > 0: + notification = self._channel.wait() + if notification.name == 'MediaStreamDidInitialize': + wait_count -= 1 + elif notification.name == 'SIPInvitationChangedState': + # This is actually the only reason for which this notification could be received + if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': + self._fail_proposal(originator='local', error='received stream proposal') + self.handle_notification(notification) + return + + local_sdp = SDPSession.new(self._invitation.sdp.active_local) + local_sdp.version += 1 + for stream in self.proposed_streams: + # Try to reuse a disabled media stream to avoid an ever-growing SDP + try: + index = next(index for index, media in enumerate(local_sdp.media) if media.port == 0) + reuse_media = True + except StopIteration: + index = len(local_sdp.media) + reuse_media = False + stream.index = index + media = stream.get_local_media(remote_sdp=None, index=index) + if reuse_media: + local_sdp.media[index] = media + else: + local_sdp.media.append(media) + self._invitation.send_reinvite(sdp=local_sdp) + notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:])) + + received_invitation_state = False + received_sdp_update = False + try: + with api.timeout(settings.sip.invite_timeout): + while not received_invitation_state or not received_sdp_update: + notification = self._channel.wait() + if notification.name == 'SIPInvitationGotSDPUpdate': + received_sdp_update = True + if notification.data.succeeded: + local_sdp = notification.data.local_sdp + remote_sdp = notification.data.remote_sdp + for s in self.streams: + s.update(local_sdp, remote_sdp, s.index) + else: + self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error) + return + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + received_invitation_state = True + if notification.data.code >= 300: + for stream in self.proposed_streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + self.greenlet = None + self.state = 'connected' + proposed_streams = self.proposed_streams[:] + self.proposed_streams = None + notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) + return + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + except api.TimeoutError: + self.cancel_proposal() + return + + accepted_streams = [] + for stream in self.proposed_streams: + try: + remote_media = remote_sdp.media[stream.index] + except IndexError: + self._fail_proposal(originator='local', error='SDP media missing in answer') + return + else: + if remote_media.port: + stream.start(local_sdp, remote_sdp, stream.index) + accepted_streams.append(stream) + else: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + + with api.timeout(self.media_stream_timeout): + wait_count = len(accepted_streams) + while wait_count > 0: + notification = self._channel.wait() + if notification.name == 'MediaStreamDidStart': + wait_count -= 1 + except (MediaStreamDidFailError, api.TimeoutError), e: + if isinstance(e, api.TimeoutError): + error = 'media stream timed out while starting' + else: + error = 'media stream failed: %s' % e.data.reason + self._fail_proposal(originator='local', error=error) + except InvitationDisconnectedError, e: + self._fail_proposal(originator='local', error='session ended') + notification = Notification('SIPInvitationChangedState', e.invitation, e.data) + notification.center = notification_center + self.handle_notification(notification) + except SIPCoreError, e: + self._fail_proposal(originator='local', error='SIP core error: %s' % str(e)) + else: + self.greenlet = None + self.state = 'connected' + self.streams += accepted_streams + proposed_streams = self.proposed_streams + self.proposed_streams = None + any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in accepted_streams) + if any_stream_ice: + self._reinvite_after_ice() + notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=accepted_streams, proposed_streams=proposed_streams)) + notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=accepted_streams, removed_streams=[])) + for notification in unhandled_notifications: + self.handle_notification(notification) + if self._hold_in_progress: + self._send_hold() + + def remove_stream(self, stream): + self.remove_streams([stream]) + + @transition_state('connected', 'sending_proposal') + @run_in_green_thread + def remove_streams(self, streams): + streams = list(set(streams).intersection(self.streams)) + if not streams: + self.state = 'connected' + return + + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + unhandled_notifications = [] + + try: + local_sdp = SDPSession.new(self._invitation.sdp.active_local) + local_sdp.version += 1 + for stream in streams: + notification_center.remove_observer(self, sender=stream) + stream.deactivate() + self.streams.remove(stream) + media = local_sdp.media[stream.index] + media.port = 0 + media.attributes = [] + media.bandwidth_info = [] + + self._invitation.send_reinvite(sdp=local_sdp) + + received_invitation_state = False + received_sdp_update = False + + with api.timeout(self.short_reinvite_timeout): + while not received_invitation_state or not received_sdp_update: + notification = self._channel.wait() + if notification.name == 'SIPInvitationGotSDPUpdate': + received_sdp_update = True + if notification.data.succeeded: + local_sdp = notification.data.local_sdp + remote_sdp = notification.data.remote_sdp + for s in self.streams: + s.update(local_sdp, remote_sdp, s.index) + else: + # TODO + pass + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + received_invitation_state = True + if not (200 <= notification.data.code < 300): + break + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + except InvitationDisconnectedError, e: + for stream in streams: + stream.end() + self.greenlet = None + notification = Notification('SIPInvitationChangedState', e.invitation, e.data) + notification.center = notification_center + self.handle_notification(notification) + except (api.TimeoutError, MediaStreamDidFailError, SIPCoreError): + for stream in streams: + stream.end() + self.end() + else: + for stream in streams: + stream.end() + self.greenlet = None + self.state = 'connected' + notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=streams)) + for notification in unhandled_notifications: + self.handle_notification(notification) + if self._hold_in_progress: + self._send_hold() + + @transition_state('sending_proposal', 'cancelling_proposal') + @run_in_green_thread + def cancel_proposal(self): + if self.greenlet is not None: + api.kill(self.greenlet, api.GreenletExit()) + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + try: + self._invitation.cancel_reinvite() + while True: + try: + notification = self._channel.wait() + except MediaStreamDidFailError: + continue + if notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + if notification.data.code == 487: + proposed_streams = (self.proposed_streams or [])[:] + for stream in proposed_streams: + stream.deactivate() + stream.end() + self.state = 'connected' + self.proposed_streams = None + notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) + elif notification.data.code == 200: + self.end() + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + break + except SIPCoreError, e: + proposed_streams = (self.proposed_streams or [])[:] + for stream in proposed_streams: + stream.deactivate() + stream.end() + self.greenlet = None + self.state = 'connected' + self.proposed_streams = None + notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=proposed_streams)) + except InvitationDisconnectedError, e: + self.proposed_streams = None + self.greenlet = None + notification = Notification('SIPInvitationChangedState', e.invitation, e.data) + notification.center = notification_center + self.handle_notification(notification) + else: + self.proposed_streams = None + self.greenlet = None + self.state = 'connected' + finally: + if self._hold_in_progress: + self._send_hold() + + @run_in_green_thread + def hold(self): + if self.on_hold or self._hold_in_progress: + return + self._hold_in_progress = True + streams = (self.streams or []) + (self.proposed_streams or []) + if not streams: + return + for stream in streams: + stream.hold() + if self.state == 'connected': + self._send_hold() + + @run_in_green_thread + def unhold(self): + if not self.on_hold and not self._hold_in_progress: + return + self._hold_in_progress = False + streams = (self.streams or []) + (self.proposed_streams or []) + if not streams: + return + for stream in streams: + stream.unhold() + if self.state == 'connected': + self._send_unhold() + + @run_in_green_thread + def end(self): + if self.state in (None, 'terminating', 'terminated'): + return + if self.greenlet is not None: + api.kill(self.greenlet, api.GreenletExit()) + self.greenlet = None + notification_center = NotificationCenter() + if self._invitation is None or self._invitation.state is None: + # The invitation was not yet constructed + self.state = 'terminated' + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) + return + invitation_state = self._invitation.state + if invitation_state in ('disconnecting', 'disconnected'): + return + self.greenlet = api.getcurrent() + self.state = 'terminating' + if invitation_state == 'connected': + notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local')) + streams = (self.streams or []) + (self.proposed_streams or []) + for stream in streams[:]: + try: + notification_center.remove_observer(self, sender=stream) + except KeyError: + streams.remove(stream) + else: + stream.deactivate() + cancelling = invitation_state != 'connected' and self.direction == 'outgoing' + try: + self._invitation.end(timeout=1) + while True: + try: + notification = self._channel.wait() + except MediaStreamDidFailError: + continue + if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': + break + except SIPCoreError, e: + if cancelling: + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) + else: + self.end_time = ISOTimestamp.now() + notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e))) + except InvitationDisconnectedError, e: + # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE + if e.data.prev_state == 'connected': + self.end_time = ISOTimestamp.now() + notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) + elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote': + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None)) + else: + if e.data.originator == 'remote': + code = e.data.code + reason = e.data.reason + elif e.data.disconnect_reason == 'timeout': + code = 408 + reason = 'timeout' + else: + code = 0 + reason = None + if e.data.originator == 'remote' and code // 100 == 3: + redirect_identities = e.data.headers.get('Contact', []) + else: + redirect_identities = None + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) + else: + if cancelling: + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) + else: + self.end_time = ISOTimestamp.now() + notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request')) + finally: + for stream in streams: + stream.end() + notification_center.remove_observer(self, sender=self._invitation) + self.greenlet = None + self.state = 'terminated' + + @property + def local_identity(self): + if self._invitation is not None and self._invitation.local_identity is not None: + return self._invitation.local_identity + else: + return self._local_identity + + @property + def peer_address(self): + return self._invitation.peer_address if self._invitation is not None else None + + @property + def remote_identity(self): + if self._invitation is not None and self._invitation.remote_identity is not None: + return self._invitation.remote_identity + else: + return self._remote_identity + + @property + def remote_user_agent(self): + return self._invitation.remote_user_agent if self._invitation is not None else None + + def _cancel_hold(self): + notification_center = NotificationCenter() + try: + self._invitation.cancel_reinvite() + while True: + try: + notification = self._channel.wait() + except MediaStreamDidFailError: + continue + if notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + if notification.data.code == 200: + self.end() + return False + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + break + except SIPCoreError: + pass + except InvitationDisconnectedError, e: + self.greenlet = None + notification = Notification('SIPInvitationChangedState', e.invitation, e.data) + notification.center = notification_center + self.handle_notification(notification) + return False + return True + + def _send_hold(self): + self.state = 'sending_proposal' + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + + unhandled_notifications = [] + + try: + local_sdp = SDPSession.new(self._invitation.sdp.active_local) + local_sdp.version += 1 + for stream in self.streams: + local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) + self._invitation.send_reinvite(sdp=local_sdp) + + received_invitation_state = False + received_sdp_update = False + + with api.timeout(self.short_reinvite_timeout): + while not received_invitation_state or not received_sdp_update: + notification = self._channel.wait() + if notification.name == 'SIPInvitationGotSDPUpdate': + received_sdp_update = True + if notification.data.succeeded: + local_sdp = notification.data.local_sdp + remote_sdp = notification.data.remote_sdp + for stream in self.streams: + stream.update(local_sdp, remote_sdp, stream.index) + else: + # TODO + pass + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + received_invitation_state = True + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + except InvitationDisconnectedError, e: + self.greenlet = None + notification = Notification('SIPInvitationChangedState', e.invitation, e.data) + notification.center = notification_center + self.handle_notification(notification) + return + except api.TimeoutError: + if not self._cancel_hold(): + return + except SIPCoreError: + pass + + self.greenlet = None + self.on_hold = True + self.state = 'connected' + hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) + notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams))) + for notification in unhandled_notifications: + self.handle_notification(notification) + if self._hold_in_progress: + self._hold_in_progress = False + else: + for stream in self.streams: + stream.unhold() + self._send_unhold() + + def _send_unhold(self): + self.state = 'sending_proposal' + self.greenlet = api.getcurrent() + notification_center = NotificationCenter() + + unhandled_notifications = [] + + try: + local_sdp = SDPSession.new(self._invitation.sdp.active_local) + local_sdp.version += 1 + for stream in self.streams: + local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) + self._invitation.send_reinvite(sdp=local_sdp) + + received_invitation_state = False + received_sdp_update = False + + with api.timeout(self.short_reinvite_timeout): + while not received_invitation_state or not received_sdp_update: + notification = self._channel.wait() + if notification.name == 'SIPInvitationGotSDPUpdate': + received_sdp_update = True + if notification.data.succeeded: + local_sdp = notification.data.local_sdp + remote_sdp = notification.data.remote_sdp + for stream in self.streams: + stream.update(local_sdp, remote_sdp, stream.index) + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + received_invitation_state = True + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + except InvitationDisconnectedError, e: + self.greenlet = None + notification = Notification('SIPInvitationChangedState', e.invitation, e.data) + notification.center = notification_center + self.handle_notification(notification) + return + except api.TimeoutError: + if not self._cancel_hold(): + return + except SIPCoreError: + pass + + self.greenlet = None + self.on_hold = False + self.state = 'connected' + notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) + for notification in unhandled_notifications: + self.handle_notification(notification) + if self._hold_in_progress: + for stream in self.streams: + stream.hold() + self._send_hold() + + def _fail(self, originator, code, reason, error, reason_header=None): + notification_center = NotificationCenter() + prev_inv_state = self._invitation.state + self.state = 'terminating' + if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'): + notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator)) + if self._invitation.state not in (None, 'disconnecting', 'disconnected'): + try: + if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'): + if 400 <= code <= 699 and reason is not None: + self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else []) + else: + self._invitation.end(extra_headers=[reason_header] if reason_header is not None else []) + with api.timeout(1): + while True: + notification = self._channel.wait() + if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': + break + except (api.TimeoutError, SIPCoreError): + pass + notification_center.remove_observer(self, sender=self._invitation) + self.state = 'terminated' + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None)) + self.greenlet = None + + def _fail_proposal(self, originator, error): + notification_center = NotificationCenter() + has_streams = bool(self.proposed_streams) + for stream in self.proposed_streams: + try: + notification_center.remove_observer(self, sender=stream) + except KeyError: + # _fail_proposal can be called from reject_proposal, which means the stream will + # not have been initialized or the session registered as an observer for it. + pass + else: + stream.deactivate() + stream.end() + if originator == 'remote' and self._invitation.sub_state == 'received_proposal': + try: + self._invitation.send_response(488 if has_streams else 500) + except SIPCoreError: + pass + notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:])) + self.state = 'connected' + self.proposed_streams = None + self.greenlet = None + + @run_in_green_thread + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_SIPInvitationChangedState(self, notification): + if self.state == 'terminated': + return + if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'): + contact_header = notification.data.headers.get('Contact', None) + if contact_header and 'isfocus' in contact_header[0].parameters: + self.remote_focus = True + if self.greenlet is not None: + if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting': + self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data)) + else: + self._channel.send(notification) + else: + self.greenlet = api.getcurrent() + unhandled_notifications = [] + try: + if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': + self.state = 'received_proposal' + try: + proposed_remote_sdp = self._invitation.sdp.proposed_remote + active_remote_sdp = self._invitation.sdp.active_remote + if len(proposed_remote_sdp.media) < len(active_remote_sdp.media): + engine = Engine() + self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Streams cannot be deleted from the SDP')]) + self.state = 'connected' + return + for stream in self.streams: + if not stream.validate_update(proposed_remote_sdp, stream.index): + engine = Engine() + self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)]) + self.state = 'connected' + return + added_media_indexes = set() + removed_media_indexes = set() + reused_media_indexes = set() + for index, media_stream in enumerate(proposed_remote_sdp.media): + if index >= len(active_remote_sdp.media): + added_media_indexes.add(index) + elif media_stream.port == 0 and active_remote_sdp.media[index].port > 0: + removed_media_indexes.add(index) + elif media_stream.port > 0 and active_remote_sdp.media[index].port == 0: + reused_media_indexes.add(index) + elif media_stream.media != active_remote_sdp.media[index].media: + added_media_indexes.add(index) + removed_media_indexes.add(index) + if added_media_indexes | reused_media_indexes and removed_media_indexes: + engine = Engine() + self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')]) + self.state = 'connected' + return + elif added_media_indexes | reused_media_indexes: + self.proposed_streams = [] + for index in added_media_indexes | reused_media_indexes: + media_stream = proposed_remote_sdp.media[index] + if media_stream.port != 0: + for stream_type in MediaStreamRegistry(): + try: + stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index) + except InvalidStreamError: + break + except UnknownStreamError: + continue + else: + stream.index = index + self.proposed_streams.append(stream) + break + if self.proposed_streams: + self._invitation.send_response(100) + notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:])) + else: + self._invitation.send_response(488) + self.state = 'connected' + return + else: + local_sdp = SDPSession.new(self._invitation.sdp.active_local) + local_sdp.version += 1 + removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes] + prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) + for stream in removed_streams: + notification.center.remove_observer(self, sender=stream) + stream.deactivate() + media = local_sdp.media[stream.index] + media.port = 0 + media.attributes = [] + media.bandwidth_info = [] + for stream in self.streams: + local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=proposed_remote_sdp, index=stream.index) + try: + self._invitation.send_response(200, sdp=local_sdp) + except PJSIPError: + for stream in removed_streams: + self.streams.remove(stream) + stream.end() + if removed_streams: + self.end() + return + else: + try: + self._invitation.send_response(488) + except PJSIPError: + self.end() + return + else: + for stream in removed_streams: + self.streams.remove(stream) + stream.end() + + received_invitation_state = False + received_sdp_update = False + while not received_sdp_update or not received_invitation_state or self._channel: + notification = self._channel.wait() + if notification.name == 'SIPInvitationGotSDPUpdate': + received_sdp_update = True + if notification.data.succeeded: + local_sdp = notification.data.local_sdp + remote_sdp = notification.data.remote_sdp + for stream in self.streams: + stream.update(local_sdp, remote_sdp, stream.index) + else: + # TODO + pass + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + received_invitation_state = True + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + else: + unhandled_notifications.append(notification) + else: + unhandled_notifications.append(notification) + on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) + if on_hold_streams != prev_on_hold_streams: + hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) + notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), + partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) + if removed_media_indexes: + notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams)) + except InvitationDisconnectedError, e: + self.greenlet = None + self.state == 'connected' + notification = Notification('SIPInvitationChangedState', e.invitation, e.data) + notification.center = NotificationCenter() + self.handle_notification(notification) + except SIPCoreError: + self.end() + else: + self.state = 'connected' + elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request': + self.state = 'received_proposal_request' + try: + # An empty proposal was received, generate an offer + self._invitation.send_response(100) + local_sdp = SDPSession.new(self._invitation.sdp.active_local) + local_sdp.version += 1 + connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip) + if local_sdp.connection is not None: + local_sdp.connection.address = connection_address + for index, stream in enumerate(self.streams): + stream.reset(index) + media = stream.get_local_media(remote_sdp=None, index=index) + if media.connection is not None: + media.connection.address = connection_address + local_sdp.media[stream.index] = media + self._invitation.send_response(200, sdp=local_sdp) + received_invitation_state = False + received_sdp_update = False + while not received_sdp_update or not received_invitation_state or self._channel: + notification = self._channel.wait() + if notification.name == 'SIPInvitationGotSDPUpdate': + received_sdp_update = True + if notification.data.succeeded: + local_sdp = notification.data.local_sdp + remote_sdp = notification.data.remote_sdp + for stream in self.streams: + stream.update(local_sdp, remote_sdp, stream.index) + else: + # TODO + pass + elif notification.name == 'SIPInvitationChangedState': + if notification.data.state == 'connected' and notification.data.sub_state == 'normal': + received_invitation_state = True + elif notification.data.state == 'disconnected': + raise InvitationDisconnectedError(notification.sender, notification.data) + else: + unhandled_notifications.append(notification) + else: + unhandled_notifications.append(notification) + except InvitationDisconnectedError, e: + self.greenlet = None + self.state == 'connected' + notification = Notification('SIPInvitationChangedState', e.invitation, e.data) + notification.center = NotificationCenter() + self.handle_notification(notification) + except SIPCoreError: + raise # FIXME + else: + self.state = 'connected' + elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal': + if notification.data.originator == 'local' and notification.data.code == 487: + self.state = 'connected' + proposed_streams = self.proposed_streams[:] + self.proposed_streams = None + notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) + if self._hold_in_progress: + self._send_hold() + elif notification.data.state == 'disconnected': + if self.state == 'incoming': + self.state = 'terminated' + if notification.data.originator == 'remote': + notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None)) + else: + # There must have been an error involved + notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None)) + else: + notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator)) + for stream in self.streams: + notification.center.remove_observer(self, sender=stream) + stream.deactivate() + stream.end() + self.state = 'terminated' + self.end_time = ISOTimestamp.now() + notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason)) + notification.center.remove_observer(self, sender=self._invitation) + finally: + self.greenlet = None + for notification in unhandled_notifications: + self.handle_notification(notification) + + def _NH_SIPInvitationGotSDPUpdate(self, notification): + if self.greenlet is not None: + self._channel.send(notification) + + def _NH_SIPInvitationTransferNewIncoming(self, notification): + self._invitation.notify_transfer_progress(500) + + def _NH_MediaStreamDidInitialize(self, notification): + if self.greenlet is not None: + self._channel.send(notification) + + def _NH_MediaStreamDidStart(self, notification): + if self.greenlet is not None: + self._channel.send(notification) + + def _NH_MediaStreamDidNotInitialize(self, notification): + if self.greenlet is not None and self.state not in ('terminating', 'terminated'): + self._channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data)) + + def _NH_MediaStreamDidFail(self, notification): + if self.greenlet is not None: + if self.state not in ('terminating', 'terminated'): + self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) + else: + stream = notification.sender + if self.streams == [stream]: + self.end() + else: + try: + self.remove_stream(stream) + except IllegalStateError: + self.end() + class SessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = [] self.state = None self._channel = coros.queue() def start(self): self.state = 'starting' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillStart', sender=self) notification_center.add_observer(self, 'SIPInvitationChangedState') notification_center.add_observer(self, 'SIPSessionNewIncoming') notification_center.add_observer(self, 'SIPSessionNewOutgoing') notification_center.add_observer(self, 'SIPSessionDidFail') notification_center.add_observer(self, 'SIPSessionDidEnd') self.state = 'started' notification_center.post_notification('SIPSessionManagerDidStart', sender=self) def stop(self): self.state = 'stopping' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillEnd', sender=self) for session in self.sessions: session.end() while self.sessions: self._channel.wait() notification_center.remove_observer(self, 'SIPInvitationChangedState') notification_center.remove_observer(self, 'SIPSessionNewIncoming') notification_center.remove_observer(self, 'SIPSessionNewOutgoing') notification_center.remove_observer(self, 'SIPSessionDidFail') notification_center.remove_observer(self, 'SIPSessionDidEnd') self.state = 'stopped' notification_center.post_notification('SIPSessionManagerDidEnd', sender=self) @run_in_twisted_thread def handle_notification(self, notification): if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming': account_manager = AccountManager() account = account_manager.find_account(notification.data.request_uri) if account is None: account = DefaultAccount() notification.sender.send_response(100) session = Session(account) session.init_incoming(notification.sender, notification.data) elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'): self.sessions.append(notification.sender) elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): self.sessions.remove(notification.sender) if self.state == 'stopping': self._channel.send(notification)