diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py index 38ae059..1ed2d36 100644 --- a/sylk/applications/conference/room.py +++ b/sylk/applications/conference/room.py @@ -1,1078 +1,1078 @@ import os import random import shutil import string import weakref from collections import Counter, deque from glob import glob from itertools import chain, count, cycle from application.notification import IObserver, NotificationCenter from application.python import Null from application.system import makedirs from eventlib import api, coros, proc from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.application import SIPApplication from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPCoreError, SIPCoreInvalidStateError, SIPURI from sipsimple.core import Header, FromHeader, ToHeader, SubjectHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import conference from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.msrp.chat import ChatIdentity, CPIMHeader, CPIMNamespace from sipsimple.streams.msrp.filetransfer import FileSelector from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.logger import log from sylk.bonjour import BonjourService from sylk.configuration import ServerConfig, ThorNodeConfig from sylk.configuration.datatypes import URL from sylk.resources import Resources from sylk.session import Session, IllegalStateError from sylk.web import server as web_server def format_identity(identity): uri = identity.uri if identity.display_name: return u'%s <%s@%s>' % (identity.display_name, uri.user, uri.host) else: return u'%s@%s' % (uri.user, uri.host) class ScreenImage(object): def __init__(self, room, sender): self.room = weakref.ref(room) self.room_uri = room.uri self.sender = sender self.filename = os.path.join(ConferenceConfig.screensharing_images_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10)))) self.url = URL(web_server.url + '/conference/' + room.uri + '/screensharing') self.url.query_items['image'] = os.path.basename(self.filename) self.state = None self.timer = None @property def active(self): return self.state == 'active' @property def idle(self): return self.state == 'idle' @run_in_thread('file-io') def save(self, image): makedirs(os.path.dirname(self.filename)) tmp_filename = self.filename + '.tmp' try: with open(tmp_filename, 'wb') as file: file.write(image) except EnvironmentError as e: log.info('Room %s - cannot write screen sharing image: %s: %s' % (self.room_uri, self.filename, e)) else: try: os.rename(tmp_filename, self.filename) except EnvironmentError: pass self.advertise() @run_in_twisted_thread def advertise(self): if self.state == 'active': self.timer.reset(10) else: if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'active' self.timer = reactor.callLater(10, self.stop_advertising) room = self.room() or Null room.dispatch_conference_info() txt = 'Room %s - %s is sharing the screen at %s' % (self.room_uri, format_identity(self.sender), self.url) room.dispatch_server_message(txt) log.info(txt) @run_in_twisted_thread def stop_advertising(self): if self.state != 'idle': if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'idle' self.timer = None room = self.room() or Null room.dispatch_conference_info() txt = '%s stopped sharing the screen' % format_identity(self.sender) room.dispatch_server_message(txt) log.info(txt) class Room(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ implements(IObserver) def __init__(self, uri): self.config = get_room_config(uri) self.uri = uri self.identity = ChatIdentity(SIPURI.parse('sip:%s' % self.uri), display_name='Conference Room') self.files = [] self.screen_images = {} self.sessions = [] self.subscriptions = [] self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.moh_player = None self.conference_info_payload = None self.conference_info_version = count(1) self.bonjour_services = Null self.session_nickname_map = {} self.last_nicknames_map = {} self.participants_counter = Counter() self.history = deque(maxlen=ConferenceConfig.history_size) @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' @property def stopping(self): return self.state in ('stopping', 'stopped') @property def active_media(self): - return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams)))) + return set(stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams))) @property def conference_info(self): if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent) conference_description.conf_uris = conference.ConfUris() conference_description.conf_uris.add(conference.ConfUrisEntry('sip:%s' % self.uri, purpose='participation')) if self.config.advertise_xmpp_support: conference_description.conf_uris.add(conference.ConfUrisEntry('xmpp:%s' % self.uri, purpose='participation')) # TODO: add grouptextchat service uri for number in self.config.pstn_access_numbers: conference_description.conf_uris.add(conference.ConfUrisEntry('tel:%s' % number, purpose='participation')) host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com')) self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users()) self.conference_info_payload.version = next(self.conference_info_version) user_count = len(self.participants_counter) self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True) users = conference.Users() for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')): try: user = next(user for user in users if user.entity == str(session.remote_identity.uri)) except StopIteration: display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name) user = conference.User(str(session.remote_identity.uri), display_text=display_text) user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host) screen_image = self.screen_images.get(user_uri, None) if screen_image is not None and screen_image.active: user.screen_image_url = screen_image.url users.add(user) joining_info = conference.JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected') display_text = self.session_nickname_map.get(session, session.remote_identity.display_name) endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status) for stream in session.streams: if stream.type == 'file-transfer': continue endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream))) user.add(endpoint) self.conference_info_payload.users = users if self.files: files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, 'OK') for file in self.files) self.conference_info_payload.conference_description.resources = conference.Resources(files=files) return self.conference_info_payload.toxml() def start(self): if self.started: return if ServerConfig.enable_bonjour and self.identity.uri.user != 'conference': room_user = self.identity.uri.user self.bonjour_services = BonjourService(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user) self.bonjour_services.start() self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.moh_player = MoHPlayer(self.audio_conference) self.moh_player.start() self.state = 'started' def stop(self): if not self.started: return self.state = 'stopping' self.bonjour_services.stop() self.bonjour_services = None self.incoming_message_queue.send_exception(api.GreenletExit) self.incoming_message_queue = None self.message_dispatcher.kill(proc.ProcExit) self.message_dispatcher = None self.moh_player.stop() self.moh_player = None self.audio_conference = None notification_center = NotificationCenter() for subscription in self.subscriptions: notification_center.remove_observer(self, sender=subscription) subscription.end() self.subscriptions = [] self.cleanup_files() self.conference_info_payload = None self.state = 'stopped' @run_in_thread('file-io') def cleanup_files(self): path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass path = os.path.join(ConferenceConfig.screensharing_images_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'message': message = data.message if message.sender.uri != session.remote_identity.uri: continue if message.content.startswith('?OTR:'): continue if message.timestamp is None: message.timestamp = ISOTimestamp.utcnow() message.sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), message.sender.display_name) recipient = message.recipients[0] private = len(message.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri if private: self.dispatch_private_message(session, message) else: self.history.append(message) self.dispatch_message(session, message) elif message_type == 'composing_indication': if data.sender.uri != session.remote_identity.uri: continue recipient = data.recipients[0] private = len(data.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri if private: self.dispatch_private_iscomposing(session, data) else: self.dispatch_iscomposing(session, data) def dispatch_message(self, session, message): for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers) def dispatch_private_message(self, session, message): # Private messages are delivered to all sessions matching the recipient but also to the sender, # for replication in clients recipient = message.recipients[0] for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers) def dispatch_iscomposing(self, session, data): identity = ChatIdentity(session.remote_identity.uri, session.remote_identity.display_name) for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_composing_indication(data.state, data.refresh, sender=identity, recipients=[self.identity]) def dispatch_private_iscomposing(self, session, data): identity = ChatIdentity(session.remote_identity.uri, session.remote_identity.display_name) recipient_uri = data.recipients[0].uri for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_composing_indication(data.state, data.refresh, sender=identity) def dispatch_server_message(self, content, content_type='text/plain', exclude=None): ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp') message_type = CPIMHeader('Message-Type', ns, 'status') for session in (session for session in self.sessions if session is not exclude): try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(content, content_type, sender=self.identity, recipients=[self.identity], additional_headers=[message_type]) def dispatch_conference_info(self): data = self.conference_info for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(conference.ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def dispatch_file(self, file): sender_uri = file.sender.uri for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)): handler = FileTransferHandler(self) handler.init_outgoing(uri, file) def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] += 1 try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams if stream.type == 'audio') except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.info(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, audio_stream.codec, audio_stream.sample_rate, audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) if audio_stream.encryption.type != 'ZRTP': # We don't listen for stream notifications early enough if audio_stream.encryption.active: log.info(u'Room %s - %s audio stream enabled %s encryption' % (self.uri, format_identity(session.remote_identity), audio_stream.encryption.type)) else: log.info(u'Room %s - %s audio stream did not enable encryption' % (self.uri, format_identity(session.remote_identity))) try: transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer') except StopIteration: pass else: transfer_handler = FileTransferHandler(self) transfer_handler.init_incoming(transfer_stream) if transfer_stream.direction == 'recvonly': filename = os.path.basename(os.path.splitext(transfer_stream.file_selector.name)[0]) txt = u'Room %s - %s is uploading file %s (%s)' % (self.uri, format_identity(session.remote_identity), filename,self.format_file_size(transfer_stream.file_selector.size)) else: filename = os.path.basename(transfer_stream.file_selector.name) txt = u'Room %s - %s requested file %s' % (self.uri, format_identity(session.remote_identity), filename) log.info(txt) self.dispatch_server_message(txt) if len(session.streams) == 1: return welcome_handler = WelcomeHandler(self, initial=True, session=session, streams=session.streams) welcome_handler.run() self.dispatch_conference_info() if len(self.sessions) == 1: log.info(u'Room %s - started by %s with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams))) else: log.info(u'Room %s - %s joined with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session) if ServerConfig.enable_bonjour: self._update_bonjour_presence() def remove_session(self, session): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.session_nickname_map.pop(session, None) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] -= 1 if self.participants_counter[remote_uri] == 0: del self.participants_counter[remote_uri] self.last_nicknames_map.pop(remote_uri, None) try: chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat') except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio') except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() try: next(stream for stream in session.streams if stream.type == 'file-transfer') except StopIteration: pass else: if len(session.streams) == 1: return self.dispatch_conference_info() log.info(u'Room %s - %s left conference after %s' % (self.uri, format_identity(session.remote_identity), self.format_session_duration(session))) if not self.sessions: log.info(u'Room %s - Last participant left conference' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session))) if ServerConfig.enable_bonjour: self._update_bonjour_presence() def terminate_sessions(self, uri): if not self.started: return for session in (session for session in self.sessions if session.remote_identity.uri == uri): session.end() def handle_incoming_subscription(self, subscribe_request, data): log.info('Room %s - subscription from %s' % (self.uri, data.headers['From'].uri)) if subscribe_request.event != 'conference': log.info('Room %s - Subscription for event %s rejected: only conference event is supported' % (self.uri, subscribe_request.event)) subscribe_request.reject(489) return NotificationCenter().add_observer(self, sender=subscribe_request) self.subscriptions.append(subscribe_request) try: subscribe_request.accept(conference.ConferenceDocument.content_type, self.conference_info) except SIPCoreError as e: log.warning('Error accepting SIP subscription: %s' % e) subscribe_request.end() def _accept_proposal(self, session, streams): try: session.accept_proposal(streams) except IllegalStateError: pass session.proposal_timer = None def add_file(self, file): self.dispatch_server_message('%s has uploaded file %s (%s)' % (format_identity(file.sender), os.path.basename(file.name), self.format_file_size(file.size))) self.files.append(file) self.dispatch_conference_info() if ConferenceConfig.push_file_transfer: self.dispatch_file(file) def add_screen_image(self, sender, image): sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host) screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender)) screen_image.save(image) def _update_bonjour_presence(self): num = len(self.sessions) if num == 0: num_str = 'No' elif num == 1: num_str = 'One' elif num == 2: num_str = 'Two' else: num_str = str(num) txt = u'%s participant%s' % (num_str, '' if num==1 else 's') presence_state = BonjourPresenceState('available', txt) if self.bonjour_services is Null: # This is the room being published all the time from sylk.applications.conference import ConferenceApplication ConferenceApplication().bonjour_room_service.presence_state = presence_state else: self.bonjour_services.presence_state = presence_state @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPStreamDidEnableEncryption(self, notification): stream = notification.sender session = stream.session log.info(u'Room %s - %s %s stream enabled %s encryption' % (self.uri, format_identity(session.remote_identity), stream.type, stream.encryption.type)) def _NH_RTPStreamDidNotEnableEncryption(self, notification): stream = notification.sender session = stream.session log.info(u'Room %s - %s %s stream did not enable encryption: %s' % (self.uri, format_identity(session.remote_identity), stream.type, notification.data.reason)) def _NH_RTPStreamZRTPReceivedSAS(self, notification): if not self.config.zrtp_auto_verify: return stream = notification.sender session = stream.session sas = notification.data.sas # Send ZRTP SAS over the chat stream, if available try: chat_stream = next(stream for stream in session.streams if stream.type=='chat') except StopIteration: return # Only send the message if there are no relays in between secure_chat = chat_stream.transport == 'tls' and all(len(path)==1 for path in (chat_stream.msrp.full_local_path, chat_stream.msrp.full_remote_path)) if secure_chat: txt = 'Received ZRTP Short Authentication String: %s' % sas # Don't set the remote identity, that way it will appear as a private message ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp') message_type = CPIMHeader('Message-Type', ns, 'status') chat_stream.send_message(txt, 'text/plain', sender=self.identity, additional_headers=[message_type]) def _NH_RTPStreamDidTimeout(self, notification): stream = notification.sender if stream.type != 'audio': return session = stream.session log.info(u'Room %s - audio stream for session %s timed out' % (self.uri, format_identity(session.remote_identity))) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender data = notification.data session = notification.sender.session message = data.message content_type = message.content_type.lower() if content_type.startswith(('text/', 'image/')): stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') self.incoming_message_queue.send((session, 'message', data)) elif content_type == 'application/blink-screensharing': stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') self.add_screen_image(message.sender, message.content) elif content_type == 'application/blink-zrtp-sas': if not self.config.zrtp_auto_verify: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') return try: audio_stream = next(stream for stream in session.streams if stream.type=='audio' and stream.encryption.active and stream.encryption.type=='ZRTP') except StopIteration: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') return # Only trust it if there was a direct path and the transport is TLS secure_chat = stream.transport == 'tls' and all(len(path)==1 for path in (stream.msrp.full_local_path, stream.msrp.full_remote_path)) remote_sas = str(message.content) if remote_sas == audio_stream.encryption.zrtp.sas and secure_chat: audio_stream.encryption.zrtp.verified = True stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') else: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') else: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') def _NH_ChatStreamGotComposingIndication(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session self.incoming_message_queue.send((session, 'composing_indication', data)) def _NH_ChatStreamGotNicknameRequest(self, notification): nickname = notification.data.nickname session = notification.sender.session chunk = notification.data.chunk if nickname: if nickname in self.session_nickname_map.values() and (session not in self.session_nickname_map or self.session_nickname_map[session] != nickname): notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use') return self.session_nickname_map[session] = nickname self.last_nicknames_map[str(session.remote_identity.uri)] = nickname else: self.session_nickname_map.pop(session, None) self.last_nicknames_map.pop(str(session.remote_identity.uri), None) notification.sender.accept_nickname(chunk) self.dispatch_conference_info() def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender try: self.subscriptions.remove(subscription) except ValueError: pass else: notification.center.remove_observer(self, sender=subscription) def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.info(u'Room %s - %s has put the audio session on hold' % (self.uri, format_identity(session.remote_identity))) else: log.info(u'Room %s - %s has taken the audio session out of hold' % (self.uri, format_identity(session.remote_identity))) self.dispatch_conference_info() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] timer = reactor.callLater(3, self._accept_proposal, session, streams) old_timer = getattr(session, 'proposal_timer', None) assert old_timer is None session.proposal_timer = timer def _NH_SIPSessionProposalRejected(self, notification): if notification.data.originator == 'remote': session = notification.sender timer = getattr(session, 'proposal_timer', None) if timer is not None: timer.cancel() session.proposal_timer = None def _NH_SIPSessionHadProposalFailure(self, notification): if notification.data.originator == 'remote': session = notification.sender timer = getattr(session, 'proposal_timer', None) assert timer is not None timer.cancel() session.proposal_timer = None def _NH_SIPSessionDidRenegotiateStreams(self, notification): session = notification.sender for stream in notification.data.added_streams: notification.center.add_observer(self, sender=stream) txt = u'%s has added %s' % (format_identity(session.remote_identity), stream.type) log.info(u'Room %s - %s' % (self.uri, txt)) self.dispatch_server_message(txt, exclude=session) if stream.type == 'audio': log.info(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, stream.codec, stream.sample_rate, stream.local_rtp_address, stream.local_rtp_port, stream.remote_rtp_address, stream.remote_rtp_port)) if stream.encryption.type != 'ZRTP': # We don't listen for stream notifications early enough if stream.encryption.active: log.info(u'Room %s - %s %s stream enabled %s encryption' % (self.uri, format_identity(session.remote_identity), stream.type, stream.encryption.type)) else: log.info(u'Room %s - %s %s stream did not enable encryption' % (self.uri, format_identity(session.remote_identity), stream.type)) if notification.data.added_streams: welcome_handler = WelcomeHandler(self, initial=False, session=session, streams=notification.data.added_streams) welcome_handler.run() for stream in notification.data.removed_streams: notification.center.remove_observer(self, sender=stream) txt = u'%s has removed %s' % (format_identity(session.remote_identity), stream.type) log.info(u'Room %s - %s' % (self.uri, txt)) self.dispatch_server_message(txt, exclude=session) if stream.type == 'audio': try: self.audio_conference.remove(stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() if not session.streams: log.info(u'Room %s - %s has removed all streams, session will be terminated' % (self.uri, format_identity(session.remote_identity))) session.end() self.dispatch_conference_info() def _NH_SIPSessionTransferNewIncoming(self, notification): log.info(u'Room %s - Call transfer request rejected, REFER must be out of dialog (RFC4579 5.5)' % self.uri) notification.sender.reject_transfer(403) def _NH_SIPSessionWillEnd(self, notification): session = notification.sender timer = getattr(session, 'proposal_timer', None) if timer is not None and timer.active(): timer.cancel() session.proposal_timer = None @staticmethod def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt @staticmethod def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type @staticmethod def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text @staticmethod def format_file_size(size): infinite = float('infinity') boundaries = [( 1024, '%d bytes', 1), ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] for boundary, format, divisor in boundaries: if size < boundary: return format % (size/divisor,) else: return "%d bytes" % size class MoHPlayer(object): implements(IObserver) def __init__(self, conference): self.conference = conference self.files = None self.paused = None self._player = None def start(self): files = glob('%s/*.wav' % Resources.get('sounds/moh')) if not files: log.error(u'No files found, MoH is disabled') return random.shuffle(files) self.files = cycle(files) self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_delay=1, volume=20) self.paused = True self.conference.bridge.add(self._player) NotificationCenter().add_observer(self, sender=self._player) def stop(self): if self._player is None: return NotificationCenter().remove_observer(self, sender=self._player) self._player.stop() self.paused = True self.conference.bridge.remove(self._player) self.conference = None def play(self): if self._player is not None and self.paused: self.paused = False self._play_next_file() def pause(self): if self._player is not None and not self.paused: self.paused = True self._player.stop() def _play_next_file(self): self._player.filename = next(self.files) self._player.play() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_WavePlayerDidFail(self, notification): if not self.paused: self._play_next_file() _NH_WavePlayerDidEnd = _NH_WavePlayerDidFail class WelcomeHandler(object): implements(IObserver) def __init__(self, room, initial, session, streams): self.room = room self.initial = initial self.session = session self.streams = streams self.procs = proc.RunningProcSet() @run_in_green_thread def run(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) for stream in self.streams: if stream.type == 'audio': self.procs.spawn(self.audio_welcome, stream) elif stream.type == 'chat': self.procs.spawn(self.chat_welcome, stream) self.procs.waitall() notification_center.remove_observer(self, sender=self.session) self.session = None self.streams = None self.room = None self.procs = None def play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() except WavePlayerError as e: log.warning(u'Error playing file %s: %s' % (file, e)) def audio_welcome(self, stream): player = WavePlayer(stream.mixer, '', pause_time=1, initial_delay=1, volume=50) stream.bridge.add(player) try: if self.initial: file = Resources.get('sounds/co_welcome_conference.wav') self.play_file_in_player(player, file, 1) user_count = len({str(s.remote_identity.uri) for s in self.room.sessions if s.remote_identity.uri != self.session.remote_identity.uri and any(stream for stream in s.streams if stream.type == 'audio')}) if user_count == 0: file = Resources.get('sounds/co_only_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count == 1: file = Resources.get('sounds/co_there_is_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count < 100: file = Resources.get('sounds/co_there_are.wav') self.play_file_in_player(player, file, 0.2) if user_count <= 24: file = Resources.get('sounds/bi_%d.wav' % user_count) self.play_file_in_player(player, file, 0.1) else: file = Resources.get('sounds/bi_%d0.wav' % (user_count / 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/bi_%d.wav' % (user_count % 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/co_more_participants.wav') self.play_file_in_player(player, file, 0) file = Resources.get('sounds/connected_tone.wav') self.play_file_in_player(player, file, 0.1) except proc.ProcExit: # No need to remove the bridge from the stream, it's done automatically pass else: stream.bridge.remove(player) self.room.audio_conference.add(stream) self.room.audio_conference.unhold() if len(self.room.audio_conference.streams) == 1: self.room.moh_player.play() else: self.room.moh_player.pause() finally: player.stop() def chat_welcome(self, stream): if self.initial: txt = 'Welcome to SylkServer!' else: txt = '' user_count = len({str(s.remote_identity.uri) for s in self.room.sessions if s.remote_identity.uri != self.session.remote_identity.uri}) if user_count == 0: txt += ' You are the first participant' else: if user_count == 1: txt += ' There is one more participant' else: txt += ' There are %s more participants' % user_count txt += ' in this conference room.' if self.room.config.advertise_xmpp_support or self.room.config.pstn_access_numbers or self.room.config.webrtc_gateway_url: txt += '\n\nOther participants can join at these addresses:\n\n' txt += ' - Using a SIP client, initiate a session to %s (audio and chat)\n' % self.room.uri if self.room.config.webrtc_gateway_url: webrtc_url = str(self.room.config.webrtc_gateway_url).replace('$room', self.room.uri) txt += ' - Using a WEB browser go to %s (audio only)\n' % webrtc_url if self.room.config.advertise_xmpp_support: txt += ' - Using an XMPP Jingle client, add contact %s and call it (audio and chat)\n' % self.room.uri if self.room.config.pstn_access_numbers: if len(self.room.config.pstn_access_numbers) == 1: nums = self.room.config.pstn_access_numbers[0] else: nums = ', '.join(self.room.config.pstn_access_numbers[:-1]) + ' or %s' % self.room.config.pstn_access_numbers[-1] txt += ' - Using a landline or mobile phone, dial %s (audio only)\n' % nums stream.send_message(txt, 'text/plain', sender=self.room.identity, recipients=[self.room.identity]) for msg in self.room.history: stream.send_message(msg.content, msg.content_type, sender=msg.sender, recipients=[self.room.identity], timestamp=msg.timestamp) # Send ZRTP SAS over the chat stream, if applicable if self.room.config.zrtp_auto_verify: session = stream.session try: audio_stream = next(stream for stream in session.streams if stream.type=='audio') except StopIteration: pass else: if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active: # Only send the message if there are no relays in between secure_chat = stream.transport == 'tls' and all(len(path)==1 for path in (stream.msrp.full_local_path, stream.msrp.full_remote_path)) sas = audio_stream.encryption.zrtp.sas if sas is not None and secure_chat: txt = 'Received ZRTP Short Authentication String: %s' % sas # Don't set the remote identity, that way it will appear as a private message ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp') message_type = CPIMHeader('Message-Type', ns, 'status') stream.send_message(txt, 'text/plain', sender=self.room.identity, additional_headers=[message_type]) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): self.procs.killall() class RoomFile(object): def __init__(self, name, hash, size, sender): self.name = name self.hash = hash self.size = size self.sender = sender @property def file_selector(self): return FileSelector.for_file(self.name, hash=self.hash) class FileTransferHandler(object): implements(IObserver) def __init__(self, room): self.room = weakref.ref(room) self.session = None self.stream = None self.handler = None self.direction = None def init_incoming(self, stream): self.direction = 'incoming' self.stream = stream self.session = stream.session self.handler = stream.handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.stream) notification_center.add_observer(self, sender=self.handler) @run_in_green_thread def init_outgoing(self, destination, file): self.direction = 'outgoing' room = self.room() if room is None: return settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI.new(destination) lookup = DNSLookup() try: route = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()[0] except (DNSLookupError, IndexError): return self.session = Session(account) self.stream = MediaStreamRegistry.get('file-transfer')(file.file_selector, 'sendonly') self.handler = self.stream.handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.stream) notification_center.add_observer(self, sender=self.handler) from_header = FromHeader(SIPURI.new(room.identity.uri), u'Conference File Transfer') to_header = ToHeader(SIPURI.new(destination)) extra_headers = [] if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(file.sender.uri))) extra_headers.append(SubjectHeader(u'File uploaded by %s' % file.sender)) self.session.connect(from_header, to_header, route=route, streams=[self.stream], is_focus=True, extra_headers=extra_headers) def _terminate(self, failure_reason=None): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.handler) room = self.room() if room is not None: if failure_reason is None: if self.direction == 'incoming' and self.stream.direction == 'recvonly': sender = ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name) file = RoomFile(self.stream.file_selector.name, self.stream.file_selector.hash, self.stream.file_selector.size, sender) room.add_file(file) else: room.dispatch_server_message('File transfer for %s failed: %s' % (os.path.basename(self.stream.file_selector.name), failure_reason)) self.session = None self.stream = None self.handler = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidNotInitialize(self, notification): self._terminate(failure_reason=notification.data.reason) def _NH_FileTransferHandlerDidEnd(self, notification): if self.direction == 'incoming': if self.stream.direction == 'sendonly': reactor.callLater(3, self.session.end) else: reactor.callLater(1, self.session.end) else: self.session.end() self._terminate(failure_reason=notification.data.reason) diff --git a/sylk/applications/xmppgateway/__init__.py b/sylk/applications/xmppgateway/__init__.py index 048cfb9..2a64b16 100644 --- a/sylk/applications/xmppgateway/__init__.py +++ b/sylk/applications/xmppgateway/__init__.py @@ -1,554 +1,554 @@ from application.notification import IObserver, NotificationCenter from application.python import Null from sipsimple.core import SIPURI, SIPCoreError from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage from sipsimple.streams.msrp.chat import CPIMPayload, CPIMParserError from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.applications import SylkApplication from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource from sylk.applications.xmppgateway.im import SIPMessageSender, SIPMessageError, ChatSessionHandler from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.presence import S2XPresenceHandler, X2SPresenceHandler from sylk.applications.xmppgateway.media import MediaSessionHandler from sylk.applications.xmppgateway.muc import X2SMucInvitationHandler, S2XMucInvitationHandler, X2SMucHandler from sylk.applications.xmppgateway.util import format_uri from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, NormalMessage class XMPPGatewayApplication(SylkApplication): implements(IObserver) def __init__(self): self.xmpp_manager = XMPPManager() self.pending_sessions = {} self.chat_sessions = set() self.media_sessions = set() self.s2x_muc_sessions = {} self.x2s_muc_sessions = {} self.s2x_presence_subscriptions = {} self.x2s_presence_subscriptions = {} self.s2x_muc_add_participant_handlers = {} self.x2s_muc_add_participant_handlers = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.xmpp_manager) notification_center.add_observer(self, name='JingleSessionNewIncoming') self.xmpp_manager.start() def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.xmpp_manager) notification_center.add_observer(self, name='JingleSessionNewIncoming') self.xmpp_manager.stop() def incoming_session(self, session): - stream_types = set([stream.type for stream in session.proposed_streams]) + stream_types = set(stream.type for stream in session.proposed_streams) if 'chat' in stream_types: log.info('New chat session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) self.incoming_chat_session(session) elif 'audio' in stream_types: log.info('New audio session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) self.incoming_media_session(session) else: log.info('New session from %s to %s rejected. Unsupported media: %s ' % (session.remote_identity.uri, session.local_identity.uri, stream_types)) session.reject(488) def incoming_chat_session(self, session): # Check if this session is really an invitation to add a participant to a conference room / muc if session.remote_identity.uri.host in self.xmpp_manager.muc_domains and 'isfocus' in session._invitation.remote_contact_header.parameters: try: referred_by_uri = SIPURI.parse(session.transfer_info.referred_by) except SIPCoreError: log.info("SIP multiparty session invitation %s failed: invalid Referred-By header" % session.call_id) session.reject(488) return muc_uri = FrozenURI(session.remote_identity.uri.user, session.remote_identity.uri.host) inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) recipient_uri = FrozenURI(session.local_identity.uri.user, session.local_identity.uri.host) sender = Identity(muc_uri) recipient = Identity(recipient_uri) inviter = Identity(inviter_uri) try: handler = self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] except KeyError: handler = S2XMucInvitationHandler(session, sender, recipient, inviter) self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] = handler NotificationCenter().add_observer(self, sender=handler) handler.start() else: log.info("SIP multiparty session invitation %s failed: there is another invitation in progress from %s to %s" % (session.call_id, format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'))) session.reject(480) return # Check domain if session.remote_identity.uri.host not in XMPPGatewayConfig.domains: log.info('Session rejected: From domain is not a local XMPP domain') session.reject(606, 'Not Acceptable') return # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) # Get URI representing the XMPP side request_uri = session.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: pass else: # There is another pending session with same identifiers, can't accept this one log.info('Session rejected: other session with same identifiers in progress') session.reject(488) return sip_identity = Identity(sip_leg_uri, session.remote_identity.display_name) handler = ChatSessionHandler.new_from_sip_session(sip_identity, session) NotificationCenter().add_observer(self, sender=handler) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler if xmpp_leg_uri.resource is not None: # Incoming session target contained GRUU, so create XMPPChatSession immediately xmpp_session = XMPPChatSession(local_identity=handler.sip_identity, remote_identity=Identity(xmpp_leg_uri)) handler.xmpp_identity = xmpp_session.remote_identity handler.xmpp_session = xmpp_session def incoming_media_session(self, session): if session.remote_identity.uri.host not in self.xmpp_manager.domains|self.xmpp_manager.muc_domains: log.info('Session rejected: From domain is not a local XMPP domain') session.reject(403) return handler = MediaSessionHandler.new_from_sip_session(session) if handler is not None: NotificationCenter().add_observer(self, sender=handler) def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return if XMPPGatewayConfig.log_presence: log.info('SIP subscription from %s to %s' % (format_uri(from_header.uri, 'sip'), format_uri(to_header.uri, 'xmpp'))) if subscribe_request.event != 'presence': if XMPPGatewayConfig.log_presence: log.info('SIP subscription rejected: only presence event is supported') subscribe_request.reject(489) return # Check domain remote_identity_uri = data.headers['From'].uri if remote_identity_uri.host not in XMPPGatewayConfig.domains: if XMPPGatewayConfig.log_presence: log.info('SIP subscription rejected: From domain is not a local XMPP domain') subscribe_request.reject(606) return # Get URI representing the SIP side sip_leg_uri = FrozenURI(remote_identity_uri.user, remote_identity_uri.host) # Get URI representing the XMPP side request_uri = data.request_uri xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host) try: handler = self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: sip_identity = Identity(sip_leg_uri, data.headers['From'].display_name) xmpp_identity = Identity(xmpp_leg_uri) handler = S2XPresenceHandler(sip_identity, xmpp_identity) self.s2x_presence_subscriptions[(sip_leg_uri, xmpp_leg_uri)] = handler NotificationCenter().add_observer(self, sender=handler) handler.start() handler.add_sip_subscription(subscribe_request) def incoming_referral(self, refer_request, data): refer_request.reject(405) def incoming_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null).content_type from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (content_type, from_header, to_header): message_request.answer(400) return log.info('New SIP Message from %s to %s' % (from_header.uri, to_header.uri)) # Check domain if from_header.uri.host not in XMPPGatewayConfig.domains: log.info('Message rejected: From domain is not a local XMPP domain') message_request.answer(606) return if content_type == 'message/cpim': try: cpim_message = CPIMPayload.decode(data.body) except CPIMParserError: log.info('Message rejected: CPIM parse error') message_request.answer(400) return else: body = cpim_message.content content_type = cpim_message.content_type sender = cpim_message.sender or from_header from_uri = sender.uri else: body = data.body from_uri = from_header.uri to_uri = str(to_header.uri) message_request.answer(200) if from_uri.parameters.get('gr', None) is None: from_uri = SIPURI.new(from_uri) from_uri.parameters['gr'] = generate_sylk_resource() sender = Identity(FrozenURI.parse(from_uri)) recipient = Identity(FrozenURI.parse(to_uri)) if content_type in ('text/plain', 'text/html'): if content_type == 'text/plain': html_body = None else: html_body = body body = None if XMPPGatewayConfig.use_msrp_for_chat: message = NormalMessage(sender, recipient, body, html_body, use_receipt=False) self.xmpp_manager.send_stanza(message) else: message = ChatMessage(sender, recipient, body, html_body, use_receipt=False) self.xmpp_manager.send_stanza(message) elif content_type == IsComposingDocument.content_type: if not XMPPGatewayConfig.use_msrp_for_chat: try: msg = IsComposingMessage.parse(body) except ParserError: pass else: state = 'composing' if msg.state == 'active' else 'paused' message = ChatComposingIndication(sender, recipient, state, use_receipt=False) self.xmpp_manager.send_stanza(message) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Out of band XMPP stanza handling @run_in_green_thread def _NH_XMPPGotChatMessage(self, notification): # This notification is only processed here untill the ChatSessionHandler # has both (SIP and XMPP) sessions established message = notification.data.message sender = message.sender recipient = message.recipient if XMPPGatewayConfig.use_msrp_for_chat: if recipient.uri.resource is None: # If recipient resource is not set the session is started from # the XMPP side sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] handler.enqueue_xmpp_message(message) except KeyError: # Check if we have any already open chat session and dispatch it there try: handler = next(h for h in self.chat_sessions if h.xmpp_identity.uri.user == xmpp_leg_uri.user and h.xmpp_identity.uri.host == xmpp_leg_uri.host and h.sip_identity.uri.user == sip_leg_uri.user and h.sip_identity.uri.host == sip_leg_uri.host) except StopIteration: # Not found, need to create a new handler and a outgoing SIP session xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler NotificationCenter().add_observer(self, sender=handler) handler.enqueue_xmpp_message(message) else: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI.new(recipient.uri) xmpp_leg_uri = FrozenURI(sender.uri.user, sender.uri.host) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Find handler pending XMPP confirmation sip_leg_uri = FrozenURI(recipient.uri.user, recipient.uri.host) xmpp_leg_uri = FrozenURI.new(sender.uri) try: handler = self.pending_sessions[(sip_leg_uri, xmpp_leg_uri)] except KeyError: # Try harder, maybe the XMPP client changed his from try: handler = next(h for h in self.chat_sessions if h.xmpp_identity.uri.user == xmpp_leg_uri.user and h.xmpp_identity.uri.host == xmpp_leg_uri.host and h.sip_identity.uri.user == sip_leg_uri.user and h.sip_identity.uri.host == sip_leg_uri.host) except StopIteration: # It's a new XMPP session to a full JID, disregard the full JID and start a new SIP session to the bare JID xmpp_identity = Identity(xmpp_leg_uri) handler = ChatSessionHandler.new_from_xmpp_stanza(xmpp_identity, sip_leg_uri) key = (sip_leg_uri, xmpp_leg_uri) self.pending_sessions[key] = handler NotificationCenter().add_observer(self, sender=handler) handler.enqueue_xmpp_message(message) else: # Found handle, create XMPP session and establish session session = XMPPChatSession(local_identity=recipient, remote_identity=sender) handler.enqueue_xmpp_message(message) handler.xmpp_identity = session.remote_identity handler.xmpp_session = session else: sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) @run_in_green_thread def _NH_XMPPGotNormalMessage(self, notification): message = notification.data.message sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) @run_in_green_thread def _NH_XMPPGotComposingIndication(self, notification): composing_indication = notification.data.composing_indication sender = composing_indication.sender recipient = composing_indication.recipient if not XMPPGatewayConfig.use_msrp_for_chat: state = 'active' if composing_indication.state == 'composing' else 'idle' body = IsComposingMessage(state=state, refresh=composing_indication.interval or 30).toxml() message = NormalMessage(sender, recipient, body, IsComposingDocument.content_type) sip_message_sender = SIPMessageSender(message) try: sip_message_sender.send().wait() except SIPMessageError as e: # TODO report back an error stanza log.error('Error sending SIP Message: %s' % e) def _NH_XMPPGotPresenceSubscriptionRequest(self, notification): stanza = notification.data.stanza # Disregard the resource part, the presence request could be a probe instead of a subscribe sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: handler = self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] except KeyError: xmpp_identity = stanza.sender xmpp_identity.uri = sender_uri_bare sip_identity = stanza.recipient handler = X2SPresenceHandler(sip_identity, xmpp_identity) self.x2s_presence_subscriptions[(sender_uri_bare, stanza.recipient.uri)] = handler notification.center.add_observer(self, sender=handler) handler.start() def _NH_XMPPGotMucJoinRequest(self, notification): stanza = notification.data.stanza muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) nickname = stanza.recipient.uri.resource try: handler = self.x2s_muc_sessions[(stanza.sender.uri, muc_uri)] except KeyError: xmpp_identity = stanza.sender sip_identity = stanza.recipient sip_identity.uri = muc_uri handler = X2SMucHandler(sip_identity, xmpp_identity, nickname) handler._first_stanza = stanza notification.center.add_observer(self, sender=handler) handler.start() # Check if there was a pending join request on the SIP side try: handler = self.s2x_muc_add_participant_handlers[(muc_uri, FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host))] except KeyError: pass else: handler.stop() def _NH_XMPPGotMucAddParticipantRequest(self, notification): sender = notification.data.sender recipient = notification.data.recipient participant = notification.data.participant muc_uri = FrozenURI(recipient.uri.user, recipient.uri.host) sender_uri = FrozenURI(sender.uri.user, sender.uri.host) participant_uri = FrozenURI(participant.uri.user, participant.uri.host) sender = Identity(sender_uri) recipient = Identity(muc_uri) participant = Identity(participant_uri) try: handler = self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] except KeyError: handler = X2SMucInvitationHandler(sender, recipient, participant) self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] = handler notification.center.add_observer(self, sender=handler) handler.start() # Chat session handling def _NH_ChatSessionDidStart(self, notification): handler = notification.sender log.info('Chat session established sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) for k,v in self.pending_sessions.items(): if v is handler: del self.pending_sessions[k] break self.chat_sessions.add(handler) def _NH_ChatSessionDidEnd(self, notification): handler = notification.sender log.info('Chat session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.chat_sessions.remove(handler) notification.center.remove_observer(self, sender=handler) def _NH_ChatSessionDidFail(self, notification): handler = notification.sender uris = None for k,v in self.pending_sessions.items(): if v is handler: uris = k del self.pending_sessions[k] break sip_uri, xmpp_uri = uris log.info('Chat session failed sip:%s <--> xmpp:%s (%s)' % (sip_uri, xmpp_uri, notification.data.reason)) notification.center.remove_observer(self, sender=handler) # Presence handling def _NH_S2XPresenceHandlerDidStart(self, notification): handler = notification.sender if XMPPGatewayConfig.log_presence: log.info('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp'))) log.info('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions), len(self.x2s_presence_subscriptions))) def _NH_S2XPresenceHandlerDidEnd(self, notification): handler = notification.sender self.s2x_presence_subscriptions.pop((handler.sip_identity.uri, handler.xmpp_identity.uri), None) notification.center.remove_observer(self, sender=handler) if XMPPGatewayConfig.log_presence: log.info('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.sip_identity.uri, 'sip'), format_uri(handler.xmpp_identity.uri, 'xmpp'))) log.info('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions), len(self.x2s_presence_subscriptions))) def _NH_X2SPresenceHandlerDidStart(self, notification): handler = notification.sender if XMPPGatewayConfig.log_presence: log.info('Presence flow 0x%x established %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip'))) log.info('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions), len(self.x2s_presence_subscriptions))) def _NH_X2SPresenceHandlerDidEnd(self, notification): handler = notification.sender self.x2s_presence_subscriptions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification.center.remove_observer(self, sender=handler) if XMPPGatewayConfig.log_presence: log.info('Presence flow 0x%x ended %s --> %s' % (id(handler), format_uri(handler.xmpp_identity.uri, 'xmpp'), format_uri(handler.sip_identity.uri, 'sip'))) log.info('%d SIP --> XMPP and %d XMPP --> SIP presence flows are active' % (len(self.s2x_presence_subscriptions), len(self.x2s_presence_subscriptions))) # MUC handling def _NH_X2SMucHandlerDidStart(self, notification): handler = notification.sender log.info('Multiparty session established xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_muc_sessions[(handler.xmpp_identity.uri, handler.sip_identity.uri)] = handler def _NH_X2SMucHandlerDidEnd(self, notification): handler = notification.sender log.info('Multiparty session ended xmpp:%s --> sip:%s' % (handler.xmpp_identity.uri, handler.sip_identity.uri)) self.x2s_muc_sessions.pop((handler.xmpp_identity.uri, handler.sip_identity.uri), None) notification.center.remove_observer(self, sender=handler) def _NH_X2SMucInvitationHandlerDidStart(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.info('%s invited %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'))) def _NH_X2SMucInvitationHandlerDidEnd(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.info('%s added %s to multiparty chat %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'))) del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] notification.center.remove_observer(self, sender=handler) def _NH_X2SMucInvitationHandlerDidFail(self, notification): handler = notification.sender sender_uri = handler.sender.uri muc_uri = handler.recipient.uri participant_uri = handler.participant.uri log.info('%s could not add %s to multiparty chat %s: %s' % (format_uri(sender_uri, 'xmpp'), format_uri(participant_uri), format_uri(muc_uri, 'sip'), notification.data.failure)) del self.x2s_muc_add_participant_handlers[(muc_uri, participant_uri)] notification.center.remove_observer(self, sender=handler) def _NH_S2XMucInvitationHandlerDidStart(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.info("%s invited %s to multiparty chat %s" % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'))) def _NH_S2XMucInvitationHandlerDidEnd(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.info('%s added %s to multiparty chat %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'))) del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] notification.center.remove_observer(self, sender=handler) def _NH_S2XMucInvitationHandlerDidFail(self, notification): handler = notification.sender muc_uri = handler.sender.uri inviter_uri = handler.inviter.uri recipient_uri = handler.recipient.uri log.info('%s could not add %s to multiparty chat %s: %s' % (format_uri(inviter_uri, 'sip'), format_uri(recipient_uri, 'xmpp'), format_uri(muc_uri, 'sip'), str(notification.data.failure))) del self.s2x_muc_add_participant_handlers[(muc_uri, recipient_uri)] notification.center.remove_observer(self, sender=handler) # Media sessions def _NH_JingleSessionNewIncoming(self, notification): session = notification.sender handler = MediaSessionHandler.new_from_jingle_session(session) if handler is not None: notification.center.add_observer(self, sender=handler) def _NH_MediaSessionHandlerDidStart(self, notification): handler = notification.sender log.info('Media session started sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.media_sessions.add(handler) def _NH_MediaSessionHandlerDidEnd(self, notification): handler = notification.sender log.info('Media session ended sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) self.media_sessions.remove(handler) notification.center.remove_observer(self, sender=handler) def _NH_MediaSessionHandlerDidFail(self, notification): handler = notification.sender log.info('Media session failed sip:%s <--> xmpp:%s' % (handler.sip_identity.uri, handler.xmpp_identity.uri)) notification.center.remove_observer(self, sender=handler) diff --git a/sylk/applications/xmppgateway/media.py b/sylk/applications/xmppgateway/media.py index 50699a8..3d9735b 100644 --- a/sylk/applications/xmppgateway/media.py +++ b/sylk/applications/xmppgateway/media.py @@ -1,327 +1,327 @@ from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlib.twistedutil import block_on from sipsimple.audio import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import FromHeader, ToHeader from sipsimple.core import SIPURI, SIPCoreError from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams import MediaStreamRegistry as SIPMediaStreamRegistry from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, decode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry as JingleMediaStreamRegistry from sylk.applications.xmppgateway.xmpp.stanzas import jingle from sylk.session import Session __all__ = 'MediaSessionHandler', class MediaSessionHandler(object): implements(IObserver) def __init__(self): self.started = False self.ended = False self._sip_identity = None self._xmpp_identity = None self._audio_bidge = AudioConference() self.sip_session = None self.jingle_session = None @classmethod def new_from_sip_session(cls, session): - proposed_stream_types = set([stream.type for stream in session.proposed_streams]) + proposed_stream_types = set(stream.type for stream in session.proposed_streams) streams = [] for stream_type in proposed_stream_types: try: klass = JingleMediaStreamRegistry.get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject(488) return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) # Get URI representing the SIP side contact_uri = session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) instance._sip_identity = Identity(sip_leg_uri) # Get URI representing the XMPP side request_uri = session.request_uri remote_resource = request_uri.parameters.get('gr', None) if remote_resource is not None: try: remote_resource = decode_resource(remote_resource) except (TypeError, UnicodeError): remote_resource = None xmpp_leg_uri = FrozenURI(request_uri.user, request_uri.host, remote_resource) instance._xmpp_identity = Identity(xmpp_leg_uri) instance.sip_session = session instance._start_outgoing_jingle_session(streams) return instance @classmethod def new_from_jingle_session(cls, session): - proposed_stream_types = set([stream.type for stream in session.proposed_streams]) + proposed_stream_types = set(stream.type for stream in session.proposed_streams) streams = [] for stream_type in proposed_stream_types: try: klass = SIPMediaStreamRegistry.get(stream_type) except Exception: continue streams.append(klass()) if not streams: session.reject('unsupported-applications') return None session.send_ring_indication() instance = cls() NotificationCenter().add_observer(instance, sender=session) instance._xmpp_identity = session.remote_identity instance._sip_identity = session.local_identity instance.jingle_session = session instance._start_outgoing_sip_session(streams) return instance @property def sip_identity(self): return self._sip_identity @property def xmpp_identity(self): return self._xmpp_identity @property def started(self): return self.__dict__['started'] @started.setter def started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('MediaSessionHandlerDidStart', sender=self) @run_in_green_thread def _start_outgoing_sip_session(self, streams): notification_center = NotificationCenter() # self.xmpp_identity is our local identity on the SIP side from_uri = self.xmpp_identity.uri.as_sip_uri() from_uri.parameters.pop('gr', None) # no GRUU in From header to_uri = self.sip_identity.uri.as_sip_uri() to_uri.parameters.pop('gr', None) # no GRUU in To header # TODO: need to fix GRUU in the proxy #contact_uri = self.xmpp_identity.uri.as_sip_uri() #contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('MedialSessionHandlerDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) self.sip_session.connect(from_header, to_header, route=route, streams=streams) @run_in_green_thread def _start_outgoing_jingle_session(self, streams): if self.xmpp_identity.uri.resource is not None: self.sip_session.reject() return xmpp_manager = XMPPManager() local_jid = self.sip_identity.uri.as_xmpp_jid() remote_jid = self.xmpp_identity.uri.as_xmpp_jid() # If this was an invitation to a conference, use the information in the Referred-By header if self.sip_identity.uri.host in xmpp_manager.muc_domains and self.sip_session.transfer_info and self.sip_session.transfer_info.referred_by: try: referred_by_uri = SIPURI.parse(self.sip_session.transfer_info.referred_by) except SIPCoreError: self.sip_session.reject(488) return else: inviter_uri = FrozenURI(referred_by_uri.user, referred_by_uri.host) local_jid = inviter_uri.as_xmpp_jid() # Use disco to gather potential JIDs to call d = xmpp_manager.disco_client_protocol.requestItems(remote_jid, sender=local_jid) try: items = block_on(d) except Exception: items = [] if not items: self.sip_session.reject(480) return # Check which items support Jingle valid = [] for item in items: d = xmpp_manager.disco_client_protocol.requestInfo(item.entity, nodeIdentifier=item.nodeIdentifier, sender=local_jid) try: info = block_on(d) except Exception: continue if jingle.NS_JINGLE in info.features and jingle.NS_JINGLE_APPS_RTP in info.features: valid.append(item.entity) if not valid: self.sip_session.reject(480) return # TODO: start multiple sessions? self._xmpp_identity = Identity(FrozenURI.parse(valid[0])) notification_center = NotificationCenter() if self.sip_identity.uri.host in xmpp_manager.muc_domains: self.jingle_session = JingleSession(xmpp_manager.jingle_coin_protocol) else: self.jingle_session = JingleSession(xmpp_manager.jingle_protocol) notification_center.add_observer(self, sender=self.jingle_session) self.jingle_session.connect(self.sip_identity, self.xmpp_identity, streams, is_focus=self.sip_session.remote_focus) def end(self): if self.ended: return notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) if self.sip_session.direction == 'incoming' and not self.started: self.sip_session.reject() else: self.sip_session.end() self.sip_session = None if self.jingle_session is not None: notification_center.remove_observer(self, sender=self.jingle_session) if self.jingle_session.direction == 'incoming' and not self.started: self.jingle_session.reject() else: self.jingle_session.end() self.jingle_session = None self.ended = True if self.started: notification_center.post_notification('MediaSessionHandlerDidEnd', sender=self) else: notification_center.post_notification('MediaSessionHandlerDidFail', sender=self) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.info("SIP session %s started" % self.sip_session.call_id) if self.sip_session.direction == 'outgoing': # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.jingle_session.accept(self.jingle_session.proposed_streams, is_focus=self.sip_session.remote_focus) else: # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.sip_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) def _NH_SIPSessionDidEnd(self, notification): log.info("SIP session %s ended" % self.sip_session.call_id) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionDidFail(self, notification): log.info("SIP session %s failed (%s)" % (self.sip_session.call_id, notification.data.reason)) notification.center.remove_observer(self, sender=self.sip_session) self.sip_session = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_SIPSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.jingle_session.hold() else: self.jingle_session.unhold() def _NH_SIPSessionGotConferenceInfo(self, notification): self.jingle_session._send_conference_info(notification.data.conference_info.toxml()) def _NH_JingleSessionDidStart(self, notification): log.info("Jingle session %s started" % notification.sender.id) if self.jingle_session.direction == 'incoming': # Both sessions have been accepted now self.started = True try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) else: # Time to accept the Jingle session and bridge them together try: audio_stream = next(stream for stream in self.jingle_session.streams if stream.type=='audio') except StopIteration: pass else: self._audio_bidge.add(audio_stream) self.sip_session.accept(self.sip_session.proposed_streams) def _NH_JingleSessionDidEnd(self, notification): log.info("Jingle session %s ended" % notification.sender.id) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidFail(self, notification): log.info("Jingle session %s failed (%s)" % (notification.sender.id, notification.data.reason)) notification.center.remove_observer(self, sender=self.jingle_session) self.jingle_session = None self.end() def _NH_JingleSessionDidChangeHoldState(self, notification): if notification.data.originator == 'remote': if notification.data.on_hold: self.sip_session.hold() else: self.sip_session.unhold() diff --git a/sylk/applications/xmppgateway/xmpp/__init__.py b/sylk/applications/xmppgateway/xmpp/__init__.py index 9ff5596..05897a0 100644 --- a/sylk/applications/xmppgateway/xmpp/__init__.py +++ b/sylk/applications/xmppgateway/xmpp/__init__.py @@ -1,319 +1,319 @@ from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from twisted.internet import reactor from wokkel.disco import DiscoClientProtocol from wokkel.generic import FallbackHandler, VersionHandler from wokkel.ping import PingHandler from wokkel.server import ServerService, XMPPS2SServerFactory from zope.interface import implements from sylk import __version__ as SYLK_VERSION from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import FrozenURI from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp.jingle.session import JingleSession, JingleSessionManager from sylk.applications.xmppgateway.xmpp.protocols import DiscoProtocol, JingleProtocol, MessageProtocol, MUCServerProtocol, MUCPresenceProtocol, PresenceProtocol from sylk.applications.xmppgateway.xmpp.server import SylkInternalComponent, SylkRouter from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager, XMPPMucSessionManager from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscriptionManager class XMPPManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): config = XMPPGatewayConfig self.stopped = False self.domains = set(config.domains) - self.muc_domains = set(['%s.%s' % (config.muc_prefix, domain) for domain in self.domains]) + self.muc_domains = set('%s.%s' % (config.muc_prefix, domain) for domain in self.domains) router = SylkRouter() self._server_service = ServerService(router) self._server_service.domains = self.domains | self.muc_domains self._server_service.logTraffic = False # done manually self._s2s_factory = XMPPS2SServerFactory(self._server_service) self._s2s_factory.logTraffic = False # done manually # Setup internal components self._internal_component = SylkInternalComponent(router) self._internal_component.domains = self.domains self._internal_component.manager = self self._muc_component = SylkInternalComponent(router) self._muc_component.domains = self.muc_domains self._muc_component.manager = self # Setup protocols self.message_protocol = MessageProtocol() self.message_protocol.setHandlerParent(self._internal_component) self.presence_protocol = PresenceProtocol() self.presence_protocol.setHandlerParent(self._internal_component) self.disco_protocol = DiscoProtocol() self.disco_protocol.setHandlerParent(self._internal_component) self.disco_client_protocol = DiscoClientProtocol() self.disco_client_protocol.setHandlerParent(self._internal_component) self.muc_protocol = MUCServerProtocol() self.muc_protocol.setHandlerParent(self._muc_component) self.muc_presence_protocol = MUCPresenceProtocol() self.muc_presence_protocol.setHandlerParent(self._muc_component) self.disco_muc_protocol = DiscoProtocol() self.disco_muc_protocol.setHandlerParent(self._muc_component) self.version_protocol = VersionHandler('SylkServer', SYLK_VERSION) self.version_protocol.setHandlerParent(self._internal_component) self.fallback_protocol = FallbackHandler() self.fallback_protocol.setHandlerParent(self._internal_component) self.fallback_muc_protocol = FallbackHandler() self.fallback_muc_protocol.setHandlerParent(self._muc_component) self.ping_protocol = PingHandler() self.ping_protocol.setHandlerParent(self._internal_component) self.jingle_protocol = JingleProtocol() self.jingle_protocol.setHandlerParent(self._internal_component) self.jingle_coin_protocol = JingleProtocol() self.jingle_coin_protocol.setHandlerParent(self._muc_component) self._s2s_listener = None self.chat_session_manager = XMPPChatSessionManager() self.muc_session_manager = XMPPMucSessionManager() self.subscription_manager = XMPPSubscriptionManager() self.jingle_session_manager = JingleSessionManager() def start(self): self.stopped = False # noinspection PyUnresolvedReferences self._s2s_listener = reactor.listenTCP(XMPPGatewayConfig.local_port, self._s2s_factory, interface=XMPPGatewayConfig.local_ip) listen_address = self._s2s_listener.getHost() log.info("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port)) self.chat_session_manager.start() self.muc_session_manager.start() self.subscription_manager.start() self.jingle_session_manager.start() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._internal_component) notification_center.add_observer(self, sender=self._muc_component) self._internal_component.startService() self._muc_component.startService() def stop(self): self.stopped = True self._s2s_listener.stopListening() self.jingle_session_manager.stop() self.subscription_manager.stop() self.muc_session_manager.stop() self.chat_session_manager.stop() self._internal_component.stopService() self._muc_component.stopService() notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._internal_component) notification_center.remove_observer(self, sender=self._muc_component) def send_stanza(self, stanza): if self.stopped: return self._internal_component.send(stanza.to_xml_element()) def send_muc_stanza(self, stanza): if self.stopped: return self._muc_component.send(stanza.to_xml_element()) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Process message stanzas def _NH_XMPPGotChatMessage(self, notification): message = notification.data.message try: session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)] except KeyError: notification.center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data) else: session.channel.send(message) def _NH_XMPPGotNormalMessage(self, notification): notification.center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data) def _NH_XMPPGotComposingIndication(self, notification): composing_indication = notification.data.composing_indication try: session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)] except KeyError: notification.center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data) else: session.channel.send(composing_indication) def _NH_XMPPGotErrorMessage(self, notification): error_message = notification.data.error_message try: session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)] except KeyError: notification.center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data) else: session.channel.send(error_message) def _NH_XMPPGotReceipt(self, notification): receipt = notification.data.receipt try: session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)] except KeyError: pass else: session.channel.send(receipt) # Process presence stanzas def _NH_XMPPGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: # Ignore incoming presence stanzas if there is no subscription pass else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceSubscriptionStatus(self, notification): stanza = notification.data.presence_stanza if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None: # Skip directed presence return if stanza.type in ('subscribed', 'unsubscribed'): try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: pass else: subscription.channel.send(stanza) elif stanza.type in ('subscribe', 'unsubscribe'): try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: if stanza.type == 'subscribe': notification.center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza)) else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceProbe(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: notification.center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza)) else: subscription.channel.send(stanza) # Process muc stanzas def _NH_XMPPMucGotGroupChat(self, notification): message = notification.data.message muc_uri = FrozenURI(message.recipient.uri.user, message.recipient.uri.host) try: session = self.muc_session_manager.incoming[(muc_uri, message.sender.uri)] except KeyError: # Ignore groupchat messages if there was no session created pass else: session.channel.send(message) def _NH_XMPPMucGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if not stanza.sender.uri.resource: return muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) try: session = self.muc_session_manager.incoming[(muc_uri, stanza.sender.uri)] except KeyError: if stanza.available: notification.center.post_notification('XMPPGotMucJoinRequest', sender=self, data=NotificationData(stanza=stanza)) else: notification.center.post_notification('XMPPGotMucLeaveRequest', sender=self, data=NotificationData(stanza=stanza)) else: session.channel.send(stanza) def _NH_XMPPMucGotInvitation(self, notification): invitation = notification.data.invitation data = NotificationData(sender=invitation.sender, recipient=invitation.recipient, participant=invitation.invited_user) notification.center.post_notification('XMPPGotMucAddParticipantRequest', sender=self, data=data) # Jingle def _NH_XMPPGotJingleSessionInitiate(self, notification): stanza = notification.data.stanza try: self.jingle_session_manager.sessions[stanza.jingle.sid] except KeyError: session = JingleSession(notification.data.protocol) session.init_incoming(stanza) session.send_ring_indication() def _NH_XMPPGotJingleSessionTerminate(self, notification): stanza = notification.data.stanza try: session = self.jingle_session_manager.sessions[stanza.jingle.sid] except KeyError: return session.handle_notification(notification) def _NH_XMPPGotJingleSessionInfo(self, notification): stanza = notification.data.stanza try: session = self.jingle_session_manager.sessions[stanza.jingle.sid] except KeyError: return session.handle_notification(notification) def _NH_XMPPGotJingleSessionAccept(self, notification): stanza = notification.data.stanza try: session = self.jingle_session_manager.sessions[stanza.jingle.sid] except KeyError: return session.handle_notification(notification) def _NH_XMPPGotJingleDescriptionInfo(self, notification): stanza = notification.data.stanza try: session = self.jingle_session_manager.sessions[stanza.jingle.sid] except KeyError: return session.handle_notification(notification) def _NH_XMPPGotJingleTransportInfo(self, notification): stanza = notification.data.stanza try: session = self.jingle_session_manager.sessions[stanza.jingle.sid] except KeyError: return session.handle_notification(notification) diff --git a/sylk/interfaces/sipthor.py b/sylk/interfaces/sipthor.py index 6490252..bdfbc7d 100644 --- a/sylk/interfaces/sipthor.py +++ b/sylk/interfaces/sipthor.py @@ -1,110 +1,110 @@ from application import log from application.notification import NotificationCenter, NotificationData from application.python.types import Singleton from eventlib.twistedutil import block_on, callInGreenThread from gnutls.interfaces.twisted import TLSContext, X509Credentials from twisted.internet import defer from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from thor.scheduler import KeepRunning import sylk from sylk.configuration import SIPConfig, ThorNodeConfig __all__ = 'ConferenceNode', class ConferenceNode(EventServiceClient): __metaclass__ = Singleton topics = ["Thor.Members"] def __init__(self): pass def connectionLost(self, connector, reason): """Called when an event server connection goes away""" self.connections.discard(connector.transport) def connectionFailed(self, connector, reason): """Called when an event server connection has an unrecoverable error""" connector.failed = True available_connectors = set(c for c in self.connectors if not c.failed) if not available_connectors: NotificationCenter().post_notification('ThorNetworkGotFatalError', sender=self) def start(self, roles): # Needs to be called from a green thread log.info('Publishing %s roles to SIPThor' % roles) self.node = ThorEntity(SIPConfig.local_ip.normalized, roles, version=sylk.__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True tls_context = TLSContext(credentials) EventServiceClient.__init__(self, ThorNodeConfig.domain, tls_context) def stop(self): # Needs to be called from a green thread self._shutdown() def _monitor_event_servers(self): def wrapped_func(): servers = self._get_event_servers() self._update_event_servers(servers) callInGreenThread(wrapped_func) return KeepRunning def _disconnect_all(self): for conn in self.connectors: conn.disconnect() def _shutdown(self): if self.disconnecting: return self.disconnecting = True self.dns_monitor.cancel() if self.advertiser: self.advertiser.cancel() if self.shutdown_message: self._publish(self.shutdown_message) requests = [conn.protocol.unsubscribe(*self.topics) for conn in self.connections] d = defer.DeferredList([request.deferred for request in requests]) block_on(d) self._disconnect_all() def handle_event(self, event): #print "Received event: %s" % event networks = self.networks role_map = ThorEntitiesRoleMap(event.message) # mapping between role names and lists of nodes with that role updated = False for role in self.node.roles + ('sip_proxy',): try: network = networks[role] except KeyError: from thor import network as thor_network network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network - new_nodes = set([node.ip for node in role_map.get(role, [])]) + new_nodes = set(node.ip for node in role_map.get(role, [])) old_nodes = set(network.nodes) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) plural = len(removed_nodes) != 1 and 's' or '' log.info("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes))) updated = True if added_nodes: for node in added_nodes: network.add_node(node) plural = len(added_nodes) != 1 and 's' or '' log.info("added %s node%s: %s" % (role, plural, ', '.join(added_nodes))) updated = True if updated: NotificationCenter().post_notification('ThorNetworkGotUpdate', sender=self, data=NotificationData(networks=self.networks))