diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py index 04e571c..442a345 100644 --- a/sylk/applications/conference/room.py +++ b/sylk/applications/conference/room.py @@ -1,1079 +1,1079 @@ import os import random import shutil import string import weakref from collections import defaultdict, deque from glob import glob from itertools import chain, count, cycle from application.notification import IObserver, NotificationCenter from application.python import Null from application.system import makedirs from eventlib import api, coros, proc from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.application import SIPApplication from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPCoreError, SIPCoreInvalidStateError, SIPURI from sipsimple.core import Header, FromHeader, ToHeader, SubjectHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import conference from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.msrp.chat import ChatIdentity, CPIMHeader, CPIMNamespace from sipsimple.streams.msrp.filetransfer import FileSelector from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import ISOTimestamp from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.logger import log from sylk.bonjour import BonjourService from sylk.configuration import ServerConfig, ThorNodeConfig from sylk.configuration.datatypes import URL from sylk.resources import Resources from sylk.session import Session, IllegalStateError from sylk.web import server as web_server def format_identity(identity): uri = identity.uri if identity.display_name: return u'%s <%s@%s>' % (identity.display_name, uri.user, uri.host) else: return u'%s@%s' % (uri.user, uri.host) class ScreenImage(object): def __init__(self, room, sender): self.room = weakref.ref(room) self.room_uri = room.uri self.sender = sender self.filename = os.path.join(ConferenceConfig.screensharing_images_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10)))) self.url = URL(web_server.url + '/conference/' + room.uri + '/screensharing') self.url.query_items['image'] = os.path.basename(self.filename) self.state = None self.timer = None @property def active(self): return self.state == 'active' @property def idle(self): return self.state == 'idle' @run_in_thread('file-io') def save(self, image): makedirs(os.path.dirname(self.filename)) tmp_filename = self.filename + '.tmp' try: with open(tmp_filename, 'wb') as file: file.write(image) except EnvironmentError, e: log.msg('Room %s - cannot write screen sharing image: %s: %s' % (self.room_uri, self.filename, e)) else: try: os.rename(tmp_filename, self.filename) except EnvironmentError: pass self.advertise() @run_in_twisted_thread def advertise(self): if self.state == 'active': self.timer.reset(10) else: if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'active' self.timer = reactor.callLater(10, self.stop_advertising) room = self.room() or Null room.dispatch_conference_info() txt = 'Room %s - %s is sharing the screen at %s' % (self.room_uri, format_identity(self.sender), self.url) room.dispatch_server_message(txt) log.msg(txt) @run_in_twisted_thread def stop_advertising(self): if self.state != 'idle': if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'idle' self.timer = None room = self.room() or Null room.dispatch_conference_info() txt = '%s stopped sharing the screen' % format_identity(self.sender) room.dispatch_server_message(txt) log.msg(txt) class Room(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ implements(IObserver) def __init__(self, uri): self.config = get_room_config(uri) self.uri = uri self.identity = ChatIdentity(SIPURI.parse('sip:%s' % self.uri), display_name='Conference Room') self.files = [] self.screen_images = {} self.sessions = [] self.subscriptions = [] self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.moh_player = None self.conference_info_payload = None self.conference_info_version = count(1) self.bonjour_services = Null self.session_nickname_map = {} self.last_nicknames_map = {} self.participants_counter = defaultdict(lambda: 0) self.history = deque(maxlen=ConferenceConfig.history_size) @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' @property def stopping(self): return self.state in ('stopping', 'stopped') @property def active_media(self): return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams)))) @property def conference_info(self): if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent) conference_description.conf_uris = conference.ConfUris() conference_description.conf_uris.add(conference.ConfUrisEntry('sip:%s' % self.uri, purpose='participation')) if self.config.advertise_xmpp_support: conference_description.conf_uris.add(conference.ConfUrisEntry('xmpp:%s' % self.uri, purpose='participation')) # TODO: add grouptextchat service uri for number in self.config.pstn_access_numbers: conference_description.conf_uris.add(conference.ConfUrisEntry('tel:%s' % number, purpose='participation')) host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com')) self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users()) self.conference_info_payload.version = next(self.conference_info_version) user_count = len(self.participants_counter.keys()) self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True) users = conference.Users() for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')): try: user = next(user for user in users if user.entity == str(session.remote_identity.uri)) except StopIteration: display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name) user = conference.User(str(session.remote_identity.uri), display_text=display_text) user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host) screen_image = self.screen_images.get(user_uri, None) if screen_image is not None and screen_image.active: user.screen_image_url = screen_image.url users.add(user) joining_info = conference.JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected') display_text = self.session_nickname_map.get(session, session.remote_identity.display_name) endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status) for stream in session.streams: if stream.type == 'file-transfer': continue endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream))) user.add(endpoint) self.conference_info_payload.users = users if self.files: files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, 'OK') for file in self.files) self.conference_info_payload.conference_description.resources = conference.Resources(files=files) return self.conference_info_payload.toxml() def start(self): if self.started: return if ServerConfig.enable_bonjour and self.identity.uri.user != 'conference': room_user = self.identity.uri.user self.bonjour_services = BonjourService(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user) self.bonjour_services.start() self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.moh_player = MoHPlayer(self.audio_conference) self.moh_player.start() self.state = 'started' def stop(self): if not self.started: return self.state = 'stopping' self.bonjour_services.stop() self.bonjour_services = None self.incoming_message_queue.send_exception(api.GreenletExit) self.incoming_message_queue = None self.message_dispatcher.kill(proc.ProcExit) self.message_dispatcher = None self.moh_player.stop() self.moh_player = None self.audio_conference = None notification_center = NotificationCenter() for subscription in self.subscriptions: notification_center.remove_observer(self, sender=subscription) subscription.end() self.subscriptions = [] self.cleanup_files() self.conference_info_payload = None self.state = 'stopped' @run_in_thread('file-io') def cleanup_files(self): path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass path = os.path.join(ConferenceConfig.screensharing_images_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'message': message = data.message if message.sender.uri != session.remote_identity.uri: continue if message.content.startswith('?OTR:'): continue if message.timestamp is None: message.timestamp = ISOTimestamp.utcnow() message.sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), message.sender.display_name) recipient = message.recipients[0] private = len(message.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri if private: self.dispatch_private_message(session, message) else: self.history.append(message) self.dispatch_message(session, message) elif message_type == 'composing_indication': if data.sender.uri != session.remote_identity.uri: continue recipient = data.recipients[0] private = len(data.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri if private: self.dispatch_private_iscomposing(session, data) else: self.dispatch_iscomposing(session, data) def dispatch_message(self, session, message): for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers) def dispatch_private_message(self, session, message): # Private messages are delivered to all sessions matching the recipient but also to the sender, # for replication in clients recipient = message.recipients[0] for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers) def dispatch_iscomposing(self, session, data): identity = ChatIdentity(session.remote_identity.uri, session.remote_identity.display_name) for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_composing_indication(data.state, data.refresh, sender=identity, recipients=[self.identity]) def dispatch_private_iscomposing(self, session, data): identity = ChatIdentity(session.remote_identity.uri, session.remote_identity.display_name) recipient_uri = data.recipients[0].uri for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_composing_indication(data.state, data.refresh, sender=identity) def dispatch_server_message(self, content, content_type='text/plain', exclude=None): ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp') message_type = CPIMHeader('Message-Type', ns, 'status') for session in (session for session in self.sessions if session is not exclude): try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: continue chat_stream.send_message(content, content_type, sender=self.identity, recipients=[self.identity], additional_headers=[message_type]) def dispatch_conference_info(self): data = self.conference_info for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(conference.ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def dispatch_file(self, file): sender_uri = file.sender.uri for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)): handler = FileTransferHandler(self) handler.init_outgoing(uri, file) def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] += 1 try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams if stream.type == 'audio') except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, audio_stream.codec, audio_stream.sample_rate, audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) if audio_stream.encryption.type != 'ZRTP': # We don't listen for stream notifications early enough if audio_stream.encryption.active: log.msg(u'Room %s - %s audio stream enabled %s encryption' % (self.uri, format_identity(session.remote_identity), audio_stream.encryption.type)) else: log.msg(u'Room %s - %s audio stream did not enable encryption' % (self.uri, format_identity(session.remote_identity))) try: transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer') except StopIteration: pass else: transfer_handler = FileTransferHandler(self) transfer_handler.init_incoming(transfer_stream) if transfer_stream.direction == 'recvonly': filename = os.path.basename(os.path.splitext(transfer_stream.file_selector.name)[0]) txt = u'Room %s - %s is uploading file %s (%s)' % (self.uri, format_identity(session.remote_identity), filename,self.format_file_size(transfer_stream.file_selector.size)) else: filename = os.path.basename(transfer_stream.file_selector.name) txt = u'Room %s - %s requested file %s' % (self.uri, format_identity(session.remote_identity), filename) log.msg(txt) self.dispatch_server_message(txt) if len(session.streams) == 1: return welcome_handler = WelcomeHandler(self, initial=True, session=session, streams=session.streams) welcome_handler.run() self.dispatch_conference_info() if len(self.sessions) == 1: log.msg(u'Room %s - started by %s with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams))) else: log.msg(u'Room %s - %s joined with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session) if ServerConfig.enable_bonjour: self._update_bonjour_presence() def remove_session(self, session): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.session_nickname_map.pop(session, None) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] -= 1 if self.participants_counter[remote_uri] <= 0: del self.participants_counter[remote_uri] self.last_nicknames_map.pop(remote_uri, None) try: chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat') except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio') except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() try: next(stream for stream in session.streams if stream.type == 'file-transfer') except StopIteration: pass else: if len(session.streams) == 1: return self.dispatch_conference_info() log.msg(u'Room %s - %s left conference after %s' % (self.uri, format_identity(session.remote_identity), self.format_session_duration(session))) if not self.sessions: log.msg(u'Room %s - Last participant left conference' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session))) if ServerConfig.enable_bonjour: self._update_bonjour_presence() def terminate_sessions(self, uri): if not self.started: return for session in (session for session in self.sessions if session.remote_identity.uri == uri): session.end() def handle_incoming_subscription(self, subscribe_request, data): log.msg('Room %s - subscription from %s' % (self.uri, data.headers['From'].uri)) if subscribe_request.event != 'conference': log.msg('Room %s - Subscription for event %s rejected: only conference event is supported' % (self.uri, subscribe_request.event)) subscribe_request.reject(489) return NotificationCenter().add_observer(self, sender=subscribe_request) self.subscriptions.append(subscribe_request) try: subscribe_request.accept(conference.ConferenceDocument.content_type, self.conference_info) except SIPCoreError, e: log.warning('Error accepting SIP subscription: %s' % e) subscribe_request.end() def _accept_proposal(self, session, streams): try: session.accept_proposal(streams) except IllegalStateError: pass session.proposal_timer = None def add_file(self, file): self.dispatch_server_message('%s has uploaded file %s (%s)' % (format_identity(file.sender), os.path.basename(file.name), self.format_file_size(file.size))) self.files.append(file) self.dispatch_conference_info() if ConferenceConfig.push_file_transfer: self.dispatch_file(file) def add_screen_image(self, sender, image): sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host) screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender)) screen_image.save(image) def _update_bonjour_presence(self): num = len(self.sessions) if num == 0: num_str = 'No' elif num == 1: num_str = 'One' elif num == 2: num_str = 'Two' else: num_str = str(num) txt = u'%s participant%s' % (num_str, '' if num==1 else 's') presence_state = BonjourPresenceState('available', txt) if self.bonjour_services is Null: # This is the room being published all the time from sylk.applications.conference import ConferenceApplication ConferenceApplication().bonjour_room_service.presence_state = presence_state else: self.bonjour_services.presence_state = presence_state @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPStreamDidEnableEncryption(self, notification): stream = notification.sender session = stream.session log.msg(u'Room %s - %s %s stream enabled %s encryption' % (self.uri, format_identity(session.remote_identity), stream.type, stream.encryption.type)) def _NH_RTPStreamDidNotEnableEncryption(self, notification): stream = notification.sender session = stream.session log.msg(u'Room %s - %s %s stream did not enable encryption: %s' % (self.uri, format_identity(session.remote_identity), stream.type, notification.data.reason)) def _NH_RTPStreamZRTPReceivedSAS(self, notification): if not self.config.zrtp_auto_verify: return stream = notification.sender session = stream.session sas = notification.data.sas # Send ZRTP SAS over the chat stream, if available try: chat_stream = next(stream for stream in session.streams if stream.type=='chat') except StopIteration: return # Only send the message if there are no relays in between secure_chat = chat_stream.transport == 'tls' and all(len(path)==1 for path in (chat_stream.msrp.full_local_path, chat_stream.msrp.full_remote_path)) if secure_chat: txt = 'Received ZRTP Short Authentication String: %s' % sas # Don't set the remote identity, that way it will appear as a private message ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp') message_type = CPIMHeader('Message-Type', ns, 'status') chat_stream.send_message(txt, 'text/plain', sender=self.identity, additional_headers=[message_type]) def _NH_RTPStreamDidTimeout(self, notification): stream = notification.sender if stream.type != 'audio': return session = stream.session log.msg(u'Room %s - audio stream for session %s timed out' % (self.uri, format_identity(session.remote_identity))) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender data = notification.data session = notification.sender.session message = data.message content_type = message.content_type.lower() if content_type.startswith(('text/', 'image/')): stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') self.incoming_message_queue.send((session, 'message', data)) elif content_type == 'application/blink-screensharing': stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') self.add_screen_image(message.sender, message.content) elif content_type == 'application/blink-zrtp-sas': if not self.config.zrtp_auto_verify: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') return try: audio_stream = next(stream for stream in session.streams if stream.type=='audio' and stream.encryption.active and stream.encryption.type=='ZRTP') except StopIteration: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') return # Only trust it if there was a direct path and the transport is TLS secure_chat = stream.transport == 'tls' and all(len(path)==1 for path in (stream.msrp.full_local_path, stream.msrp.full_remote_path)) remote_sas = str(message.content) if remote_sas == audio_stream.encryption.zrtp.sas and secure_chat: audio_stream.encryption.zrtp.verified = True stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') else: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') else: stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') def _NH_ChatStreamGotComposingIndication(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session self.incoming_message_queue.send((session, 'composing_indication', data)) def _NH_ChatStreamGotNicknameRequest(self, notification): nickname = notification.data.nickname session = notification.sender.session chunk = notification.data.chunk if nickname: if nickname in self.session_nickname_map.values() and (session not in self.session_nickname_map or self.session_nickname_map[session] != nickname): notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use') return self.session_nickname_map[session] = nickname self.last_nicknames_map[str(session.remote_identity.uri)] = nickname else: self.session_nickname_map.pop(session, None) self.last_nicknames_map.pop(str(session.remote_identity.uri), None) notification.sender.accept_nickname(chunk) self.dispatch_conference_info() def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender try: self.subscriptions.remove(subscription) except ValueError: pass else: notification.center.remove_observer(self, sender=subscription) def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.msg(u'Room %s - %s has put the audio session on hold' % (self.uri, format_identity(session.remote_identity))) else: log.msg(u'Room %s - %s has taken the audio session out of hold' % (self.uri, format_identity(session.remote_identity))) self.dispatch_conference_info() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] timer = reactor.callLater(3, self._accept_proposal, session, streams) old_timer = getattr(session, 'proposal_timer', None) assert old_timer is None session.proposal_timer = timer def _NH_SIPSessionProposalRejected(self, notification): if notification.data.originator == 'remote': session = notification.sender timer = getattr(session, 'proposal_timer', None) if timer is not None: timer.cancel() session.proposal_timer = None def _NH_SIPSessionHadProposalFailure(self, notification): if notification.data.originator == 'remote': session = notification.sender timer = getattr(session, 'proposal_timer', None) assert timer is not None timer.cancel() session.proposal_timer = None def _NH_SIPSessionDidRenegotiateStreams(self, notification): session = notification.sender for stream in notification.data.added_streams: notification.center.add_observer(self, sender=stream) txt = u'%s has added %s' % (format_identity(session.remote_identity), stream.type) log.msg(u'Room %s - %s' % (self.uri, txt)) self.dispatch_server_message(txt, exclude=session) if stream.type == 'audio': log.msg(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, stream.codec, stream.sample_rate, stream.local_rtp_address, stream.local_rtp_port, stream.remote_rtp_address, stream.remote_rtp_port)) if stream.encryption.type != 'ZRTP': # We don't listen for stream notifications early enough if stream.encryption.active: log.msg(u'Room %s - %s %s stream enabled %s encryption' % (self.uri, format_identity(session.remote_identity), stream.type, stream.encryption.type)) else: log.msg(u'Room %s - %s %s stream did not enable encryption' % (self.uri, format_identity(session.remote_identity), stream.type)) if notification.data.added_streams: welcome_handler = WelcomeHandler(self, initial=False, session=session, streams=notification.data.added_streams) welcome_handler.run() for stream in notification.data.removed_streams: notification.center.remove_observer(self, sender=stream) txt = u'%s has removed %s' % (format_identity(session.remote_identity), stream.type) log.msg(u'Room %s - %s' % (self.uri, txt)) self.dispatch_server_message(txt, exclude=session) if stream.type == 'audio': try: self.audio_conference.remove(stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() if not session.streams: log.msg(u'Room %s - %s has removed all streams, session will be terminated' % (self.uri, format_identity(session.remote_identity))) session.end() self.dispatch_conference_info() def _NH_SIPSessionTransferNewIncoming(self, notification): log.msg(u'Room %s - Call transfer request rejected, REFER must be out of dialog (RFC4579 5.5)' % self.uri) notification.sender.reject_transfer(403) def _NH_SIPSessionWillEnd(self, notification): session = notification.sender timer = getattr(session, 'proposal_timer', None) if timer is not None and timer.active(): timer.cancel() session.proposal_timer = None @staticmethod def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt @staticmethod def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type @staticmethod def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text @staticmethod def format_file_size(size): infinite = float('infinity') boundaries = [( 1024, '%d bytes', 1), ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] for boundary, format, divisor in boundaries: if size < boundary: return format % (size/divisor,) else: return "%d bytes" % size class MoHPlayer(object): implements(IObserver) def __init__(self, conference): self.conference = conference self.files = None self.paused = None self._player = None def start(self): files = glob('%s/*.wav' % Resources.get('sounds/moh')) if not files: log.error(u'No files found, MoH is disabled') return random.shuffle(files) self.files = cycle(files) self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_delay=1, volume=20) self.paused = True self.conference.bridge.add(self._player) NotificationCenter().add_observer(self, sender=self._player) def stop(self): if self._player is None: return NotificationCenter().remove_observer(self, sender=self._player) self._player.stop() self.paused = True self.conference.bridge.remove(self._player) self.conference = None def play(self): if self._player is not None and self.paused: self.paused = False self._play_next_file() def pause(self): if self._player is not None and not self.paused: self.paused = True self._player.stop() def _play_next_file(self): self._player.filename = next(self.files) self._player.play() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_WavePlayerDidFail(self, notification): if not self.paused: self._play_next_file() _NH_WavePlayerDidEnd = _NH_WavePlayerDidFail class WelcomeHandler(object): implements(IObserver) def __init__(self, room, initial, session, streams): self.room = room self.initial = initial self.session = session self.streams = streams self.procs = proc.RunningProcSet() @run_in_green_thread def run(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) for stream in self.streams: if stream.type == 'audio': self.procs.spawn(self.audio_welcome, stream) elif stream.type == 'chat': self.procs.spawn(self.chat_welcome, stream) self.procs.waitall() notification_center.remove_observer(self, sender=self.session) self.session = None self.streams = None self.room = None self.procs = None def play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() except WavePlayerError, e: log.warning(u"Error playing file %s: %s" % (file, e)) def audio_welcome(self, stream): player = WavePlayer(stream.mixer, '', pause_time=1, initial_delay=1, volume=50) stream.bridge.add(player) try: if self.initial: file = Resources.get('sounds/co_welcome_conference.wav') self.play_file_in_player(player, file, 1) - user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(self.session.remote_identity.uri)])) + user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - {str(self.session.remote_identity.uri)}) if user_count == 0: file = Resources.get('sounds/co_only_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count == 1: file = Resources.get('sounds/co_there_is_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count < 100: file = Resources.get('sounds/co_there_are.wav') self.play_file_in_player(player, file, 0.2) if user_count <= 24: file = Resources.get('sounds/bi_%d.wav' % user_count) self.play_file_in_player(player, file, 0.1) else: file = Resources.get('sounds/bi_%d0.wav' % (user_count / 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/bi_%d.wav' % (user_count % 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/co_more_participants.wav') self.play_file_in_player(player, file, 0) file = Resources.get('sounds/connected_tone.wav') self.play_file_in_player(player, file, 0.1) except proc.ProcExit: # No need to remove the bridge from the stream, it's done automatically pass else: stream.bridge.remove(player) self.room.audio_conference.add(stream) self.room.audio_conference.unhold() if len(self.room.audio_conference.streams) == 1: self.room.moh_player.play() else: self.room.moh_player.pause() finally: player.stop() def chat_welcome(self, stream): if self.initial: txt = 'Welcome to SylkServer!' else: txt = '' - user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions) - set([str(self.session.remote_identity.uri)])) + user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions) - {str(self.session.remote_identity.uri)}) if user_count == 0: txt += ' You are the first participant' else: if user_count == 1: txt += ' There is one more participant' else: txt += ' There are %s more participants' % user_count txt += ' in this conference room.' if not ServerConfig.enable_bonjour: if self.room.config.advertise_xmpp_support or self.room.config.pstn_access_numbers: txt += '\n\nOther participants can join at these addresses:\n\n' if self.room.config.pstn_access_numbers: if len(self.room.config.pstn_access_numbers) == 1: nums = self.room.config.pstn_access_numbers[0] else: nums = ', '.join(self.room.config.pstn_access_numbers[:-1]) + ' or %s' % self.room.config.pstn_access_numbers[-1] txt += ' - Using a landline or mobile phone, dial %s (audio)\n' % nums if self.room.config.advertise_xmpp_support: txt += ' - Using an XMPP client, connect to group chat room %s (chat)\n' % self.room.uri txt += ' - Using an XMPP Jingle capable client, add contact %s and call it (audio)\n' % self.room.uri txt += ' - Using a SIP client, initiate a session to %s (audio and chat)\n' % self.room.uri if self.room.config.webrtc_gateway_url: txt += ' - Using a WebRTC enabled browser go to %s and join room %s\n' % (self.room.config.webrtc_gateway_url, self.room.identity.uri.user) stream.send_message(txt, 'text/plain', sender=self.room.identity, recipients=[self.room.identity]) for msg in self.room.history: stream.send_message(msg.content, msg.content_type, sender=msg.sender, recipients=[self.room.identity], timestamp=msg.timestamp) # Send ZRTP SAS over the chat stream, if applicable if self.room.config.zrtp_auto_verify: session = stream.session try: audio_stream = next(stream for stream in session.streams if stream.type=='audio') except StopIteration: pass else: if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active: # Only send the message if there are no relays in between secure_chat = stream.transport == 'tls' and all(len(path)==1 for path in (stream.msrp.full_local_path, stream.msrp.full_remote_path)) sas = audio_stream.encryption.zrtp.sas if sas is not None and secure_chat: txt = 'Received ZRTP Short Authentication String: %s' % sas # Don't set the remote identity, that way it will appear as a private message ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp') message_type = CPIMHeader('Message-Type', ns, 'status') stream.send_message(txt, 'text/plain', sender=self.room.identity, additional_headers=[message_type]) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): self.procs.killall() class RoomFile(object): def __init__(self, name, hash, size, sender): self.name = name self.hash = hash self.size = size self.sender = sender @property def file_selector(self): return FileSelector.for_file(self.name, hash=self.hash) class FileTransferHandler(object): implements(IObserver) def __init__(self, room): self.room = weakref.ref(room) self.session = None self.stream = None self.handler = None self.direction = None def init_incoming(self, stream): self.direction = 'incoming' self.stream = stream self.session = stream.session self.handler = stream.handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.stream) notification_center.add_observer(self, sender=self.handler) @run_in_green_thread def init_outgoing(self, destination, file): self.direction = 'outgoing' room = self.room() if room is None: return settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI.new(destination) lookup = DNSLookup() try: route = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()[0] except (DNSLookupError, IndexError): return self.session = Session(account) self.stream = MediaStreamRegistry.get('file-transfer')(file.file_selector, 'sendonly') self.handler = self.stream.handler notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.stream) notification_center.add_observer(self, sender=self.handler) from_header = FromHeader(SIPURI.new(room.identity.uri), u'Conference File Transfer') to_header = ToHeader(SIPURI.new(destination)) extra_headers = [] if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(file.sender.uri))) extra_headers.append(SubjectHeader(u'File uploaded by %s' % file.sender)) self.session.connect(from_header, to_header, route=route, streams=[self.stream], is_focus=True, extra_headers=extra_headers) def _terminate(self, failure_reason=None): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.handler) room = self.room() if room is not None: if failure_reason is None: if self.direction == 'incoming' and self.stream.direction == 'recvonly': sender = ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name) file = RoomFile(self.stream.file_selector.name, self.stream.file_selector.hash, self.stream.file_selector.size, sender) room.add_file(file) else: room.dispatch_server_message('File transfer for %s failed: %s' % (os.path.basename(self.stream.file_selector.name), failure_reason)) self.session = None self.stream = None self.handler = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidNotInitialize(self, notification): self._terminate(failure_reason=notification.data.reason) def _NH_FileTransferHandlerDidEnd(self, notification): if self.direction == 'incoming': if self.stream.direction == 'sendonly': reactor.callLater(3, self.session.end) else: reactor.callLater(1, self.session.end) else: self.session.end() self._terminate(failure_reason=notification.data.reason) diff --git a/sylk/applications/ircconference/room.py b/sylk/applications/ircconference/room.py index 5364992..e91670e 100644 --- a/sylk/applications/ircconference/room.py +++ b/sylk/applications/ircconference/room.py @@ -1,672 +1,672 @@ import random import urllib import lxml.html import lxml.html.clean from itertools import count from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from eventlib import coros, proc from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError, SIPCoreInvalidStateError from sipsimple.payloads.conference import Conference, ConferenceDocument, ConferenceDescription, ConferenceState, Endpoint, EndpointStatus, HostInfo, JoiningInfo, Media, User, Users, WebPage from sipsimple.streams.msrp.chat import ChatIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from twisted.internet import protocol, reactor from twisted.words.protocols import irc from zope.interface import implements from sylk.applications.ircconference.configuration import get_room_configuration from sylk.applications.ircconference.logger import log from sylk.resources import Resources def format_identity(identity): uri = identity.uri if identity.display_name: return u'%s ' % (identity.display_name, uri.user, uri.host) else: return u'sip:%s@%s' % (uri.user, uri.host) def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type def html2text(data): try: doc = lxml.html.document_fromstring(data) cleaner = lxml.html.clean.Cleaner(style=True) doc = cleaner.clean_html(doc) return doc.text_content().strip('\n') except Exception: return '' class IRCMessage(object): def __init__(self, username, uri, content, content_type='text/plain'): self.sender = ChatIdentity(uri, display_name=username) self.content = content self.content_type = content_type class IRCRoom(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ __metaclass__ = Singleton implements(IObserver) def __init__(self, uri): self.uri = uri self.identity = ChatIdentity.parse('' % self.uri) self.sessions = [] self.sessions_with_proposals = [] self.subscriptions = [] self.pending_messages = [] self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.conference_info_payload = None self.conference_info_version = count(1) self.irc_connector = None self.irc_protocol = None @classmethod def get_room(cls, uri): room_uri = '%s@%s' % (uri.user, uri.host) room = cls(room_uri) return room @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' def start(self): if self.state != 'stopped': return config = get_room_configuration(self.uri.split('@')[0]) factory = IRCBotFactory(config) host, port = config.server self.irc_connector = reactor.connectTCP(host, port, factory) NotificationCenter().add_observer(self, sender=self.irc_connector.factory) self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.state = 'started' def stop(self): if self.state != 'started': return self.state = 'stopped' NotificationCenter().remove_observer(self, sender=self.irc_connector.factory) self.irc_connector.factory.stop_requested = True self.irc_connector.disconnect() self.irc_connector = None self.message_dispatcher.kill(proc.ProcExit) self.audio_conference = None def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'msrp_message': if data.sender.uri != session.remote_identity.uri: return self.dispatch_message(session, data) elif message_type == 'irc_message': self.dispatch_irc_message(data) def dispatch_message(self, session, message): identity = ChatIdentity.parse(format_identity(session.remote_identity, True)) for s in (s for s in self.sessions if s is not session): try: chat_stream = next(stream for stream in s.streams if stream.type == 'chat') except StopIteration: pass else: chat_stream.send_message(message.content, message.content_type, sender=identity, recipients=[self.identity], timestamp=message.timestamp) def dispatch_irc_message(self, message): for session in self.sessions: try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[self.identity]) def dispatch_server_message(self, content, content_type='text/plain', exclude=None): for session in (session for session in self.sessions if session is not exclude): try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: chat_stream.send_message(content, content_type, sender=self.identity, recipients=[self.identity]) def get_conference_info(self): # Send request to get participants list, we'll get a notification with it if self.irc_protocol is not None: self.irc_protocol.get_participants() else: self.dispatch_conference_info([]) def dispatch_conference_info(self, irc_participants): data = self.build_conference_info_payload(irc_participants) for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def build_conference_info_payload(self, irc_participants): irc_configuration = get_room_configuration(self.uri.split('@')[0]) if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = ConferenceDescription(display_text='#%s on %s' % (irc_configuration.channel, irc_configuration.server[0]), free_text='Hosted by %s' % settings.user_agent) host_info = HostInfo(web_page=WebPage(irc_configuration.website)) self.conference_info_payload = Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=Users()) self.conference_info_payload.version = next(self.conference_info_version) user_count = len(set(str(s.remote_identity.uri) for s in self.sessions)) + len(irc_participants) self.conference_info_payload.conference_state = ConferenceState(user_count=user_count, active=True) users = Users() for session in self.sessions: try: user = next(user for user in users if user.entity == str(session.remote_identity.uri)) except StopIteration: user = User(str(session.remote_identity.uri), display_text=session.remote_identity.display_name) users.add(user) joining_info = JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = EndpointStatus('on-hold' if session_on_hold else 'connected') endpoint = Endpoint(str(session._invitation.remote_contact_header.uri), display_text=session.remote_identity.display_name, joining_info=joining_info, status=hold_status) for stream in session.streams: endpoint.add(Media(id(stream), media_type=format_conference_stream_type(stream))) user.add(endpoint) for nick in irc_participants: irc_uri = '%s@%s' % (urllib.quote(nick), irc_configuration.server[0]) user = User(irc_uri, display_text=nick) users.add(user) endpoint = Endpoint(irc_uri, display_text=nick) endpoint.add(Media(random.randint(100000000, 999999999), media_type='message')) user.add(endpoint) self.conference_info_payload.users = users return self.conference_info_payload.toxml() def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) try: chat_stream = next(stream for stream in session.streams if stream.type == 'chat') except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams if stream.type == 'audio') except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz, end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) welcome_handler = WelcomeHandler(self, session) welcome_handler.start() self.get_conference_info() if len(self.sessions) == 1: log.msg(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams))) else: log.msg(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), format_stream_types(session.streams)), exclude=session) def remove_session(self, session): notification_center = NotificationCenter() try: chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat') except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio') except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.audio_conference.hold() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.get_conference_info() log.msg(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, format_session_duration(session))) if not self.sessions: log.msg(u'Last participant left conference %s' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), format_session_duration(session))) def accept_proposal(self, session, streams): if session in self.sessions_with_proposals: session.accept_proposal(streams) self.sessions_with_proposals.remove(session) def handle_incoming_subscription(self, subscribe_request, data): if subscribe_request.event != 'conference': subscribe_request.reject(489) return NotificationCenter().add_observer(self, sender=subscribe_request) self.subscriptions.append(subscribe_request) try: subscribe_request.accept() except SIPCoreError, e: log.warning('Error accepting SIP subscription: %s' % e) subscribe_request.end() self.get_conference_info() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification_center = NotificationCenter() notification_center.remove_observer(self, sender=subscription) self.subscriptions.remove(subscription) def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.msg(u'%s has put the audio session on hold' % format_identity(session.remote_identity)) else: log.msg(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity)) self.get_conference_info() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': session = notification.sender audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return if chat_streams: chat_streams[0].chatroom_capabilities = [] streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] self.sessions_with_proposals.append(session) reactor.callLater(4, self.accept_proposal, session, streams) def _NH_SIPSessionProposalRejected(self, notification): session = notification.sender self.sessions_with_proposals.remove(session) def _NH_SIPSessionDidRenegotiateStreams(self, notification): session = notification.sender for stream in notification.data.added_streams: notification.center.add_observer(self, sender=stream) log.msg(u'%s has added %s to %s' % (format_identity(session.remote_identity), stream.type, self.uri)) self.dispatch_server_message('%s has added %s' % (format_identity(session.remote_identity), stream.type), exclude=session) if stream.type == 'audio': log.msg(u'Audio stream using %s/%sHz, end-points: %s:%d <-> %s:%d' % (stream.codec, stream.sample_rate, stream.local_rtp_address, stream.local_rtp_port, stream.remote_rtp_address, stream.remote_rtp_port)) welcome_handler = WelcomeHandler(self, session) welcome_handler.start(welcome_prompt=False) for stream in notification.data.removed_streams: notification.center.remove_observer(self, sender=stream) log.msg(u'%s has removed %s from %s' % (format_identity(session.remote_identity), stream.type, self.uri)) self.dispatch_server_message('%s has removed %s' % (format_identity(session.remote_identity), stream.type), exclude=session) if stream.type == 'audio': try: self.audio_conference.remove(stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.audio_conference.hold() if not session.streams: log.msg(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri)) session.end() self.get_conference_info() def _NH_RTPStreamDidTimeout(self, notification): stream = notification.sender if stream.type != 'audio': return session = stream.session log.msg(u'Audio stream for session %s timed out' % format_identity(session.remote_identity)) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender message = notification.data.message if message.content_type not in ('text/html', 'text/plain'): log.msg(u'Unsupported content type: %s, ignoring message' % message.content_type) stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message') return stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') # Send MSRP chat message to other participants session = stream.session self.incoming_message_queue.send((session, 'msrp_message', message)) # Send MSRP chat message to IRC chat room if message.content_type == 'text/html': content = html2text(message.content) elif message.content_type == 'text/plain': content = message.content else: log.warning('unexpected message type: %s' % message.content_type) return sender = message.sender irc_message = '%s: %s' % (format_identity(sender), content) if self.irc_protocol is not None: self.irc_protocol.send_message(irc_message.encode('utf-8')) else: self.pending_messages.append(irc_message) def _NH_ChatStreamGotNicknameRequest(self, notification): # Discard the nickname but pretend we accept it so that XMPP clients can work chunk = notification.data.chunk notification.sender.accept_nickname(chunk) def _NH_IRCBotGotConnected(self, notification): self.irc_protocol = notification.data.protocol # Send enqueued messages while self.pending_messages: message = self.pending_messages.pop(0) self.irc_protocol.send_message(message.encode('utf-8')) # Update participants list self.get_conference_info() def _NH_IRCBotGotDisconnected(self, notification): self.irc_protocol = None def _NH_IRCBotGotMessage(self, notification): message = notification.data.message self.incoming_message_queue.send((None, 'irc_message', message)) def _NH_IRCBotGotParticipantsList(self, notification): self.dispatch_conference_info(notification.data.participants) def _NH_IRCBotJoinedChannel(self, notification): self.get_conference_info() def _NH_IRCBotUserJoined(self, notification): self.dispatch_server_message('%s joined the IRC channel' % notification.data.user) self.get_conference_info() def _NH_IRCBotUserLeft(self, notification): self.dispatch_server_message('%s left the IRC channel' % notification.data.user) self.get_conference_info() def _NH_IRCBotUserQuit(self, notification): self.dispatch_server_message('%s quit the IRC channel: %s' % (notification.data.user, notification.data.reason)) self.get_conference_info() def _NH_IRCBotUserKicked(self, notification): data = notification.data self.dispatch_server_message('%s kicked %s out of the IRC channel: %s' % (data.kicker, data.kickee, data.reason)) self.get_conference_info() def _NH_IRCBotUserRenamed(self, notification): self.dispatch_server_message('%s changed his name to %s' % (notification.data.oldname, notification.data.newname)) self.get_conference_info() def _NH_IRCBotUserAction(self, notification): self.dispatch_server_message('%s %s' % (notification.data.user, notification.data.action)) class WelcomeHandler(object): implements(IObserver) def __init__(self, room, session): self.room = room self.session = session self.proc = None @run_in_green_thread def start(self, welcome_prompt=True): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) self.proc = proc.spawn(self.play_audio_welcome, welcome_prompt) self.proc.wait() notification_center.remove_observer(self, sender=self.session) self.session = None self.room = None self.proc = None def play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() except WavePlayerError, e: log.warning(u"Error playing file %s: %s" % (file, e)) def play_audio_welcome(self, welcome_prompt): try: audio_stream = next(stream for stream in self.session.streams if stream.type == 'audio') except StopIteration: return player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_delay=1, volume=50) audio_stream.bridge.add(player) try: if welcome_prompt: file = Resources.get('sounds/co_welcome_conference.wav') self.play_file_in_player(player, file, 1) - user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(self.session.remote_identity.uri)])) + user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - {str(self.session.remote_identity.uri)}) if user_count == 0: file = Resources.get('sounds/co_only_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count == 1: file = Resources.get('sounds/co_there_is_one.wav') self.play_file_in_player(player, file, 0.5) elif user_count < 100: file = Resources.get('sounds/co_there_are.wav') self.play_file_in_player(player, file, 0.2) if user_count <= 24: file = Resources.get('sounds/bi_%d.wav' % user_count) self.play_file_in_player(player, file, 0.1) else: file = Resources.get('sounds/bi_%d0.wav' % (user_count / 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/bi_%d.wav' % (user_count % 10)) self.play_file_in_player(player, file, 0.1) file = Resources.get('sounds/co_more_participants.wav') self.play_file_in_player(player, file, 0) file = Resources.get('sounds/connected_tone.wav') self.play_file_in_player(player, file, 0.1) except proc.ProcExit: # No need to remove the bridge from the stream, it's done automatically pass else: audio_stream.bridge.remove(player) self.room.audio_conference.add(audio_stream) self.room.audio_conference.unhold() finally: player.stop() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): self.proc.kill() class IRCBot(irc.IRCClient): nickname = 'SylkServer' def __init__(self): self._nick_collector = [] self.nicks = [] def connectionMade(self): irc.IRCClient.connectionMade(self) log.msg('Connection to IRC has been established') NotificationCenter().post_notification('IRCBotGotConnected', self.factory, NotificationData(protocol=self)) def connectionLost(self, failure): irc.IRCClient.connectionLost(self, failure) NotificationCenter().post_notification('IRCBotGotDisconnected', self.factory, NotificationData()) def signedOn(self): log.msg('Logging into %s channel...' % self.factory.channel) self.join(self.factory.channel) def kickedFrom(self, channel, kicker, message): log.msg('Got kicked from %s by %s: %s. Rejoining...' % (channel, kicker, message)) self.join(self.factory.channel) def joined(self, channel): log.msg('Logged into %s channel' % channel) NotificationCenter().post_notification('IRCBotJoinedChannel', self.factory, NotificationData(channel=self.factory.channel)) def privmsg(self, user, channel, message): if channel == '*': return username = user.split('!', 1)[0] if username == self.nickname: return if channel == self.nickname: self.msg(username, "Sorry, I don't support private messages, I'm a bot.") return uri = SIPURI.parse('sip:%s@%s' % (urllib.quote(username), self.factory.config.server[0])) irc_message = IRCMessage(username, uri, message.decode('utf-8')) data = NotificationData(message=irc_message) NotificationCenter().post_notification('IRCBotGotMessage', self.factory, data) def send_message(self, message): self.say(self.factory.channel, message) def get_participants(self): self.sendLine("NAMES #%s" % self.factory.channel) def got_participants(self, nicks): data = NotificationData(participants=nicks) NotificationCenter().post_notification('IRCBotGotParticipantsList', self.factory, data) def irc_RPL_NAMREPLY(self, prefix, params): """Collect usernames from this channel. Several of these messages may be sent to cover the channel's full nicklist. An RPL_ENDOFNAMES signals the end of the list. """ # We just separate these into individual nicks and stuff them in # the nickCollector, transferred to 'nicks' when we get the RPL_ENDOFNAMES. for name in params[3].split(): # Remove operator and voice prefixes if name[0] in '@+': name = name[1:] if name != self.nickname: self._nick_collector.append(name) def irc_RPL_ENDOFNAMES(self, prefix, params): """This is sent after zero or more RPL_NAMREPLY commands to terminate the list of users in a channel. """ self.nicks = self._nick_collector self._nick_collector = [] self.got_participants(self.nicks) def userJoined(self, user, channel): if channel.strip('#') == self.factory.channel: data = NotificationData(user=user) NotificationCenter().post_notification('IRCBotUserJoined', self.factory, data) def userLeft(self, user, channel): if channel.strip('#') == self.factory.channel: data = NotificationData(user=user) NotificationCenter().post_notification('IRCBotUserLeft', self.factory, data) def userQuit(self, user, reason): data = NotificationData(user=user, reason=reason) NotificationCenter().post_notification('IRCBotUserQuit', self.factory, data) def userKicked(self, kickee, channel, kicker, message): if channel.strip('#') == self.factory.channel: data = NotificationData(kickee=kickee, kicker=kicker, reason=message) NotificationCenter().post_notification('IRCBotUserKicked', self.factory, data) def userRenamed(self, oldname, newname): data = NotificationData(oldname=oldname, newname=newname) NotificationCenter().post_notification('IRCBotUserRenamed', self.factory, data) def action(self, user, channel, data): if channel.strip('#') == self.factory.channel: username = user.split('!', 1)[0] data = NotificationData(user=username, action=data) NotificationCenter().post_notification('IRCBotUserAction', self.factory, data) class IRCBotFactory(protocol.ClientFactory): protocol = IRCBot def __init__(self, config): self.config = config self.channel = config.channel self.stop_requested = False def clientConnectionLost(self, connector, failure): log.msg('Disconnected from IRC: %s' % failure.getErrorMessage()) if not self.stop_requested: log.msg('Reconnecting...') connector.connect() def clientConnectionFailed(self, connector, failure): log.error('Connection to IRC server failed: %s' % failure.getErrorMessage()) diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py index 98766cc..51fe21f 100644 --- a/sylk/applications/xmppgateway/presence.py +++ b/sylk/applications/xmppgateway/presence.py @@ -1,520 +1,520 @@ import hashlib import random from application.notification import IObserver, NotificationCenter from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute from eventlib import coros, proc from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Subscription from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import pidf, rpid, caps from sipsimple.payloads import ParserError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import ISOTimestamp from time import time from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.util import format_uri from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscription, XMPPIncomingSubscription from sylk.configuration import SIPConfig __all__ = ['S2XPresenceHandler', 'X2SPresenceHandler'] class S2XPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self._sip_subscriptions = [] self._stanza_cache = {} self._pidf = None self._xmpp_subscription = None self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None while self._sip_subscriptions: subscription = self._sip_subscriptions.pop() notification_center.remove_observer(self, sender=subscription) try: subscription.end() except SIPCoreError: pass self.ended = True notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self) def add_sip_subscription(self, subscription): # If s subscription is received after the handle has ended but before # S2XPresenceHandlerDidEnd has been processed we need to ignore it and wait for a retransmission # which we will handle by creating a new S2XPresenceHandler if self.ended: return self._sip_subscriptions.append(subscription) NotificationCenter().add_observer(self, sender=subscription) if self._xmpp_subscription.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None try: subscription.accept(content_type, pidf_doc) except SIPCoreError, e: log.warning('Error accepting SIP subscription: %s' % e) subscription.end() else: try: subscription.accept_pending() except SIPCoreError, e: log.warning('Error accepting SIP subscription: %s' % e) subscription.end() if XMPPGatewayConfig.log_presence: log.msg('SIP subscription from %s to %s added to presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _build_pidf(self): if not self._stanza_cache: self._pidf = None return None pidf_doc = pidf.PIDF(str(self.xmpp_identity)) uri = next(self._stanza_cache.iterkeys()) person = pidf.Person("PID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()) person.activities = rpid.Activities() pidf_doc.add(person) for stanza in self._stanza_cache.itervalues(): if not stanza.available: status = pidf.Status('closed') status.extended = 'offline' else: status = pidf.Status('open') if stanza.show == 'away': status.extended = 'away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'xa': status.extended = 'away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'dnd': status.extended = 'busy' if 'busy' not in person.activities: person.activities.add('busy') else: status.extended = 'available' if stanza.sender.uri.resource: resource = encode_resource(stanza.sender.uri.resource) else: # Workaround for clients not sending the resource under certain (unknown) circumstances resource = hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest() service_id = "SID-%s" % resource sip_uri = stanza.sender.uri.as_sip_uri() sip_uri.parameters['gr'] = resource sip_uri.parameters['xmpp'] = None contact = pidf.Contact(str(sip_uri)) service = pidf.Service(service_id, status=status, contact=contact) service.add(pidf.DeviceID(resource)) service.device_info = pidf.DeviceInfo(resource, description=stanza.sender.uri.resource) service.timestamp = pidf.ServiceTimestamp(stanza.timestamp) service.capabilities = caps.ServiceCapabilities(text=True, message=True) for lang, note in stanza.statuses.iteritems(): service.notes.add(pidf.PIDFNote(note, lang=lang)) pidf_doc.add(service) if not person.activities: person.activities = None self._pidf = pidf_doc.toxml() return self._pidf @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification.center.remove_observer(self, sender=subscription) self._sip_subscriptions.remove(subscription) if XMPPGatewayConfig.log_presence: log.msg('SIP subscription from %s to %s removed from presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) if not self._sip_subscriptions: self.end() def _NH_SIPIncomingSubscriptionNotifyDidFail(self, notification): if XMPPGatewayConfig.log_presence: log.msg('Sending SIP NOTIFY failed from %s to %s for presence flow 0x%x: %s (%s)' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), notification.data.code, notification.data.reason)) def _NH_SIPIncomingSubscriptionGotUnsubscribe(self, notification): if XMPPGatewayConfig.log_presence: log.msg('SIP subscription from %s to %s was terminated by user for presence flow 1x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _NH_SIPIncomingSubscriptionGotRefreshingSubscribe(self, notification): if XMPPGatewayConfig.log_presence: log.msg('SIP subscription from %s to %s was refreshed for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _NH_SIPIncomingSubscriptionDidTimeout(self, notification): if XMPPGatewayConfig.log_presence: log.msg('SIP subscription from %s to %s timed out for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions))) def _NH_XMPPSubscriptionChangedState(self, notification): if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'): subscription.accept(content_type, pidf_doc) def _NH_XMPPSubscriptionGotNotify(self, notification): stanza = notification.data.presence self._stanza_cache[stanza.sender.uri] = stanza stanza.timestamp = ISOTimestamp.now() # TODO: mirror the one in the stanza, if present pidf_doc = self._build_pidf() if XMPPGatewayConfig.log_presence: log.msg('XMPP notification from %s to %s for presence flow 0x%x' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self))) for subscription in self._sip_subscriptions: try: subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc) except SIPCoreError, e: if XMPPGatewayConfig.log_presence: log.msg('Failed to send SIP NOTIFY from %s to %s for presence flow 0x%x: %s' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), e)) if not stanza.available: # Only inform once about this device being unavailable del self._stanza_cache[stanza.sender.uri] def _NH_XMPPSubscriptionDidFail(self, notification): notification.center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription = None self.end() _NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class SubscriptionError(Exception): def __init__(self, error, timeout, refresh_interval=None, fatal=False): self.error = error self.refresh_interval = refresh_interval self.timeout = timeout self.fatal = fatal class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class X2SPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._sip_subscription = None self._sip_subscription_proc = None self._sip_subscription_timer = None self._xmpp_subscription = None def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() self._command_proc = proc.spawn(self._run) self._subscribe_sip() notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None if self._sip_subscription: self._unsubscribe_sip() self.ended = True notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self) @run_in_green_thread def _subscribe_sip(self): command = Command('subscribe') self._command_channel.send(command) @run_in_green_thread def _unsubscribe_sip(self): command = Command('unsubscribe') self._command_channel.send(command) command.wait() self._command_proc.kill() self._command_proc = None def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_subscribe(self, command): if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._sip_subscription_proc = None command.signal() def _process_pidf(self, body): try: pidf_doc = pidf.PIDF.parse(body) except ParserError, e: log.warn('Error parsing PIDF document: %s' % e) return # Build XML stanzas out of PIDF documents try: person = next(p for p in pidf_doc.persons) except StopIteration: person = None for service in pidf_doc.services: sip_contact = self.sip_identity.uri.as_sip_uri() if service.device_info is not None: sip_contact.parameters['gr'] = 'urn:uuid:%s' % service.device_info.id else: sip_contact.parameters['gr'] = service.id sender = Identity(FrozenURI.parse(sip_contact)) if service.status.extended is not None: available = service.status.extended != 'offline' else: available = service.status.basic == 'open' stanza = AvailabilityPresence(sender, self.xmpp_identity, available) for note in service.notes: stanza.statuses[note.lang] = note if service.status.extended is not None: if service.status.extended == 'away': stanza.show = 'away' elif service.status.extended == 'busy': stanza.show = 'dnd' elif person is not None and person.activities is not None: activities = set(list(person.activities)) if 'away' in activities: stanza.show = 'away' - elif set(('holiday', 'vacation')).intersection(activities): + elif {'holiday', 'vacation'}.intersection(activities): stanza.show = 'xa' elif 'busy' in activities: stanza.show = 'dnd' self._xmpp_subscription.send_presence(stanza) def _sip_subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() account = DefaultAccount() refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval try: # Lookup routes if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) subscription_uri = self.sip_identity.uri.as_sip_uri() subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()), ToHeader(subscription_uri), ContactHeader(contact_uri), 'presence', RouteHeader(route.uri), refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) raise SubscriptionError(error='Internal error', timeout=5) self._sip_subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail, e: notification_center.remove_observer(self, sender=subscription) self._sip_subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120)) elif e.data.code == 403: # Forbidden raise SubscriptionError(error='Forbidden', timeout=None, fatal=True) elif e.data.code == 423: # Get the value of the Min-Expires header if e.data.min_expires is not None and e.data.min_expires > refresh_interval: interval = e.data.min_expires else: interval = None raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval) elif e.data.code in (405, 406, 489): raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True) elif e.data.code == 1400: raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True) else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, give up raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._sip_subscription: continue if self._xmpp_subscription is None: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'presence': subscription_state = notification.data.headers.get('Subscription-State').state if subscription_state == 'active' and self._xmpp_subscription.state != 'active': self._xmpp_subscription.accept() elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active': # The state went from active to pending, hide the presence state? pass if notification.data.body: if XMPPGatewayConfig.log_presence: log.msg('SIP NOTIFY from %s to %s' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'))) self._process_pidf(notification.data.body) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail, e: if e.data.code == 0 and e.data.reason == 'rejected': self._xmpp_subscription.reject() else: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._sip_subscription) except InterruptSubscription, e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: notification_center.remove_observer(self, sender=self._sip_subscription) try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription, e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._sip_subscription) except SubscriptionError, e: if not e.fatal: self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval)) finally: self.subscribed = False self._sip_subscription = None self._sip_subscription_proc = None reactor.callLater(0, self.end) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification): self.end() def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification): if self._sip_subscription is not None and self._sip_subscription.state.lower() == 'active': self._xmpp_subscription.accept() _NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe diff --git a/sylk/server.py b/sylk/server.py index d4d2414..32c4b6d 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,257 +1,257 @@ import os import sys from threading import Event from uuid import uuid4 from application import log from application.notification import NotificationCenter from application.python import Null from application.system import makedirs from eventlib import proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer from sipsimple.lookup import DNSManager from sipsimple.storage import MemoryStorage from sipsimple.threading import ThreadManager from sipsimple.threading.green import run_in_green_thread from sipsimple.video import VideoDevice from twisted.internet import reactor # Load stream extensions needed for integration with SIP SIMPLE SDK import sylk.streams del sylk.streams from sylk.accounts import DefaultAccount from sylk.applications import IncomingRequestHandler from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import Logger from sylk.session import SessionManager from sylk.web import WebServer class SylkServer(SIPApplication): def __init__(self): self.request_handler = Null self.thor_interface = Null self.web_server = Null self.logger = Logger() self.stopping_event = Event() self.stop_event = Event() self.failed = False def start(self, options): self.options = options if self.options.enable_bonjour: ServerConfig.enable_bonjour = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, name='ThorNetworkGotFatalError') Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: super(SylkServer, self).start(MemoryStorage()) except Exception, e: log.fatal("Error starting SIP Application: %s" % e) sys.exit(1) def _initialize_core(self): # SylkServer needs to listen for extra events and request types notification_center = NotificationCenter() settings = SIPSimpleSettings() # initialize core options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP detect_sip_loops=False, udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # video video_codecs=list(settings.rtp.video_codec_list), enable_colorbar_device=True, # logging log_level=settings.logs.pjsip_level if settings.logs.trace_pjsip else 0, trace_sip=settings.logs.trace_sip, # events and requests to handle events={'conference': ['application/conference-info+xml'], 'presence': ['application/pidf+xml'], 'refer': ['message/sipfrag;version=2.0']}, - incoming_events=set(['conference', 'presence']), - incoming_requests=set(['MESSAGE'])) + incoming_events={'conference', 'presence'}, + incoming_requests={'MESSAGE'}) notification_center.add_observer(self, sender=self.engine) self.engine.start(**options) @run_in_green_thread def _initialize_subsystems(self): account_manager = AccountManager() dns_manager = DNSManager() notification_center = NotificationCenter() session_manager = SessionManager() settings = SIPSimpleSettings() notification_center.post_notification('SIPApplicationWillStart', sender=self) if self.state == 'stopping': reactor.stop() return # Initialize default account default_account = DefaultAccount() account_manager.default_account = default_account # initialize TLS self._initialize_tls() # initialize PJSIP internal resolver self.engine.set_nameservers(dns_manager.nameservers) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize video objects self.video_device = VideoDevice(u'Colorbar generator', settings.video.resolution, settings.video.framerate) # initialize instance id settings.instance_id = uuid4().urn settings.save() # initialize ZRTP cache makedirs(ServerConfig.spool_dir.normalized) self.engine.zrtp_cache = os.path.join(ServerConfig.spool_dir.normalized, 'zrtp.db') # initialize middleware components dns_manager.start() account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self) # start SylkServer components self.web_server = WebServer() self.web_server.start() self.request_handler = IncomingRequestHandler() self.request_handler.start() if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode self.thor_interface = ConferenceNode() thor_roles = [] if 'conference' in self.request_handler.applications: thor_roles.append('conference_server') if 'xmppgateway' in self.request_handler.applications: thor_roles.append('xmpp_gateway') if 'webrtcgateway' in self.request_handler.applications: thor_roles.append('webrtc_gateway') self.thor_interface.start(thor_roles) @run_in_green_thread def _shutdown_subsystems(self): dns_manager = DNSManager() account_manager = AccountManager() session_manager = SessionManager() # terminate all sessions p = proc.spawn(session_manager.stop) p.wait() # shutdown SylkServer components procs = [proc.spawn(self.web_server.stop), proc.spawn(self.request_handler.stop), proc.spawn(self.thor_interface.stop)] proc.waitall(procs) # shutdown other middleware components procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop)] proc.waitall(procs) # shutdown engine self.engine.stop() self.engine.join() # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _NH_AudioDevicesDidChange(self, notification): pass def _NH_DefaultAudioDeviceDidChange(self, notification): pass def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal("Couldn't set TLS options: %s" % notification.data.error) def _NH_SIPApplicationWillStart(self, notification): self.logger.start() settings = SIPSimpleSettings() if settings.logs.trace_sip and self.logger._siptrace_filename is not None: log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename) if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None: log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename) if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None: log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename) if settings.logs.trace_notifications and self.logger._notifications_filename is not None: log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename) def _NH_SIPApplicationDidStart(self, notification): settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.msg("SylkServer started, listening on:") for transport in settings.sip.transport_list: try: log.msg("%s:%d (%s)" % (local_ip, getattr(self.engine, '%s_port' % transport), transport.upper())) except TypeError: pass def _NH_SIPApplicationWillEnd(self, notification): self.stopping_event.set() def _NH_SIPApplicationDidEnd(self, notification): log.msg('SIP application ended') self.logger.stop() if not self.stopping_event.is_set(): log.warning('SIP application ended without shutting down all subsystems') self.stopping_event.set() self.stop_event.set() def _NH_SIPEngineGotException(self, notification): log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback) self.failed = True def _NH_SIPEngineDidFail(self, notification): log.error('SIP engine failed') self.failed = True super(SylkServer, self)._NH_SIPEngineDidFail(notification) def _NH_ThorNetworkGotFatalError(self, notification): log.error("All Thor Event Servers have unrecoverable errors.")