diff --git a/sylk/applications/__init__.py b/sylk/applications/__init__.py index 1dd11cb..c92e822 100644 --- a/sylk/applications/__init__.py +++ b/sylk/applications/__init__.py @@ -1,130 +1,145 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # __all__ = ['ISylkApplication', 'ApplicationRegistry', 'sylk_application', 'IncomingRequestHandler'] import os from application import log from application.notification import IObserver, NotificationCenter from application.python.util import Null, Singleton from sipsimple.threading import run_in_twisted_thread from zope.interface import Attribute, Interface, implements from sylk.configuration import ServerConfig class ISylkApplication(Interface): """ Interface defining attributes and methods any application must implement. Each application must be a Singleton and has to be decorated with the @sylk_application decorator. """ __appname__ = Attribute("Application name") def incoming_session(self, session): pass def incoming_subscription(self, subscribe_request, data): pass + def incoming_referral(self, refer_request, data): + pass + def incoming_sip_message(self, message_request, data): pass class ApplicationRegistry(object): __metaclass__ = Singleton def __init__(self): self.applications = [] def __iter__(self): return iter(self.applications) def add(self, app): if app not in self.applications: self.applications.append(app) def sylk_application(cls): """Class decorator for adding applications to the ApplicationRegistry""" ApplicationRegistry().add(cls()) return cls def load_applications(): toplevel = os.path.dirname(__file__) app_list = ['sylk.applications.%s' % item for item in os.listdir(toplevel) if os.path.isdir(os.path.join(toplevel, item)) and '__init__.py' in os.listdir(os.path.join(toplevel, item))] map(__import__, app_list) class IncomingRequestHandler(object): """ Handle incoming requests and match them to applications. """ __metaclass__ = Singleton implements(IObserver) # TODO: implement a 'find_application' function which will get the appropriate application # as defined in the configuration # TODO: apply ACLs (before or after?) def __init__(self): load_applications() log.msg('Loaded applications: %s' % ', '.join([app.__appname__ for app in ApplicationRegistry()])) self.application_map = dict((item.split(':')) for item in ServerConfig.application_map) def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='SIPSessionNewIncoming') notification_center.add_observer(self, name='SIPIncomingSubscriptionGotSubscribe') + notification_center.add_observer(self, name='SIPIncomingReferralGotRefer') notification_center.add_observer(self, name='SIPIncomingRequestGotRequest') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='SIPSessionNewIncoming') notification_center.remove_observer(self, name='SIPIncomingSubscriptionGotSubscribe') + notification_center.remove_observer(self, name='SIPIncomingReferralGotRefer') notification_center.remove_observer(self, name='SIPIncomingRequestGotRequest') @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionNewIncoming(self, notification): session = notification.sender application = self.application_map.get(session._invitation.request_uri.user, ServerConfig.default_application) try: app = (app for app in ApplicationRegistry() if app.__appname__ == application).next() except StopIteration: log.error('Application %s is not loaded' % application) else: app.incoming_session(session) def _NH_SIPIncomingSubscriptionGotSubscribe(self, notification): subscribe_request = notification.sender application = self.application_map.get(notification.data.request_uri.user, ServerConfig.default_application) try: app = (app for app in ApplicationRegistry() if app.__appname__ == application).next() except StopIteration: log.error('Application %s is not loaded' % application) else: app.incoming_subscription(subscribe_request, notification.data) + def _NH_SIPIncomingReferralGotRefer(self, notification): + refer_request = notification.sender + application = self.application_map.get(notification.data.request_uri.user, ServerConfig.default_application) + try: + app = (app for app in ApplicationRegistry() if app.__appname__ == application).next() + except StopIteration: + log.error('Application %s is not loaded' % application) + else: + app.incoming_referral(refer_request, notification.data) + def _NH_SIPIncomingRequestGotRequest(self, notification): request = notification.sender if notification.data.method != 'MESSAGE': request.answer(405) return application = self.application_map.get(notification.data.request_uri.user, ServerConfig.default_application) try: app = (app for app in ApplicationRegistry() if app.__appname__ == application).next() except StopIteration: log.error('Application %s is not loaded' % application) else: app.incoming_sip_message(request, notification.data) diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py index 9a687b2..b9c128b 100644 --- a/sylk/applications/conference/__init__.py +++ b/sylk/applications/conference/__init__.py @@ -1,103 +1,243 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # +import re + from application import log from application.notification import IObserver, NotificationCenter from application.python.util import Null, Singleton +from sipsimple.account import AccountManager +from sipsimple.configuration.settings import SIPSimpleSettings +from sipsimple.core import SIPURI, SIPCoreError, Header, ContactHeader, FromHeader, ToHeader +from sipsimple.lookup import DNSLookup +from sipsimple.streams import AudioStream from twisted.internet import reactor from zope.interface import implements from sylk.applications import ISylkApplication, sylk_application from sylk.applications.conference.configuration import ConferenceConfig -from sylk.applications.conference.room import Room +from sylk.applications.conference.room import Room, RoomError +from sylk.configuration import SIPConfig +from sylk.extensions import ChatStream +from sylk.session import ServerSession # Initialize database from sylk.applications.conference import database @sylk_application class ConferenceApplication(object): __metaclass__ = Singleton implements(ISylkApplication, IObserver) __appname__ = 'conference' def __init__(self): self.rooms = set() self.pending_sessions = [] def incoming_session(self, session): log.msg('New incoming session from %s' % session.remote_identity.uri) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject(488) return self.pending_sessions.append(session) notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) if audio_streams: session.send_ring_indication() streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): to_header = data.headers.get('To', Null) if to_header is Null: subscribe_request.reject(400) return room = Room.get_room(data.request_uri) if not room.started: room = Room.get_room(to_header.uri) if not room.started: subscribe_request.reject(480) return room.handle_incoming_subscription(subscribe_request, data) + def incoming_referral(self, refer_request, data): + from_header = data.headers.get('From', Null) + to_header = data.headers.get('To', Null) + refer_to_header = data.headers.get('Refer-To', Null) + if Null in (from_header, to_header, refer_to_header): + refer_request.reject(400) + return + referral_handler = IncomingReferralHandler(refer_request, data) + referral_handler.start() + def incoming_sip_message(self, message_request, data): if not ConferenceConfig.enable_sip_message: return room = Room.get_room(data.request_uri) if not room.started: message_request.answer(480) return room.handle_incoming_sip_message(message_request, data) def accept_session(self, session, streams): if session in self.pending_sessions: session.accept(streams, is_focus=True) + def add_participant(self, session, room_uri): + log.msg('New outgoing session to %s' % session.remote_identity.uri) + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=session) + room = Room.get_room(room_uri) + room.start() + room.add_session(session) + self.rooms.add(room) + + def remove_participant(self, participant_uri, room_uri): + room = Room.get_room(room_uri) + room.terminate_sessions(participant_uri) + def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender self.pending_sessions.remove(session) room = Room.get_room(session._invitation.request_uri) # FIXME room.start() room.add_session(session) self.rooms.add(room) def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.msg('Session from %s ended' % session.remote_identity.uri) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) - room = Room.get_room(session._invitation.request_uri) # FIXME + if session.direction == 'incoming': + room = Room.get_room(session._invitation.request_uri) # FIXME + else: + room = Room.get_room(session.local_identity.uri) if session in room.sessions: # We could get this notifiction even if we didn't get SIPSessionDidStart room.remove_session(session) if room.empty: room.stop() try: self.rooms.remove(room) except KeyError: pass def _NH_SIPSessionDidFail(self, notification): session = notification.sender self.pending_sessions.remove(session) log.msg('Session from %s failed' % session.remote_identity.uri) +class IncomingReferralHandler(object): + implements(IObserver) + + def __init__(self, refer_request, data): + self._refer_request = refer_request + self._refer_headers = data.headers + self.room_uri = data.headers.get('To').uri + self.refer_to_uri = data.headers.get('Refer-To').uri + self.method = data.headers.get('Refer-To').parameters.get('method', 'invite').lower() + self.session = None + self.streams = [] + + def start(self): + if not re.match('^(sip:|sips:).*'): + self.refer_to_uri = 'sip:%s' % self.refer_to_uri + try: + self.refer_to_uri = SIPURI.parse(self.refer_to_uri) + except SIPCoreError: + self._refer_request.reject(488) + return + notification_center = NotificationCenter() + notification_center.add_observer(self, sender=self._refer_request) + if self.method == 'invite': + self._refer_request.accept() + settings = SIPSimpleSettings() + lookup = DNSLookup() + notification_center.add_observer(self, sender=lookup) + lookup.lookup_sip_proxy(self.refer_to_uri, settings.sip.transport_list) + elif self.method == 'bye': + self._refer_request.accept() + try: + conference_application = ConferenceApplication() + conference_application.remove_participant(self.refer_to_uri, self.room_uri) + except RoomError: + self._refer_request.end(404) + else: + self._refer_request.end(200) + else: + self._refer_request.reject(488) + + def handle_notification(self, notification): + handler = getattr(self, '_NH_%s' % notification.name, Null) + handler(notification) + + def _NH_DNSLookupDidSucceed(self, notification): + notification_center = NotificationCenter() + notification_center.remove_observer(self, sender=notification.sender) + account = AccountManager().default_account + self.streams.append(AudioStream(account)) + self.streams.append(ChatStream(account)) + self.session = ServerSession(account) + notification_center.add_observer(self, sender=self.session) + original_from_header = self._refer_headers.get('From') + from_header = FromHeader(SIPURI.new(self.room_uri)) + to_header = ToHeader(self.refer_to_uri) + transport = notification.data.result[0].transport + parameters = {} if transport=='udp' else {'transport': transport} + contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters)) + extra_headers = [] + if self._refer_headers.get('Referred-By', None) is not None: + extra_headers.append(Header.new(self._refer_headers.get('Referred-By'))) + extra_headers.append(Header('Subject', 'Invitation to conference from: %s' % original_from_header.uri)) + self.session.connect(from_header, to_header, contact_header, routes=notification.data.result, streams=self.streams, is_focus=True, extra_headers=extra_headers) + + def _NH_DNSLookupDidFail(self, notification): + NotificationCenter().remove_observer(self, sender=notification.sender) + + def _NH_SIPSessionGotRingIndication(self, notification): + if self._refer_request is not None: + self._refer_request.send_notify(180) + + def _NH_SIPSessionGotProvisionalResponse(self, notification): + if self._refer_request is not None: + self._refer_request.send_notify(notification.data.code) + + def _NH_SIPSessionDidStart(self, notification): + NotificationCenter().remove_observer(self, sender=notification.sender) + if self._refer_request is not None: + self._refer_request.end(200) + conference_application = ConferenceApplication() + conference_application.add_participant(self.session, self.room_uri) + self.session = None + self.streams = [] + + def _NH_SIPSessionDidFail(self, notification): + NotificationCenter().remove_observer(self, sender=notification.sender) + if self._refer_request is not None: + self._refer_request.end(notification.data.code or 500) + self.session = None + self.streams = [] + + def _NH_SIPSessionDidEnd(self, notification): + # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead + NotificationCenter().remove_observer(self, sender=notification.sender) + if self._refer_request is not None: + self._refer_request.end(200) + self.session = None + self.streams = [] + + def _NH_SIPIncomingReferralDidEnd(self, notification): + NotificationCenter().remove_observer(self, sender=notification.sender) + self._refer_request = None + self._refer_headers = None + diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py index 97caaa5..e8264a2 100644 --- a/sylk/applications/conference/room.py +++ b/sylk/applications/conference/room.py @@ -1,650 +1,656 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import random from datetime import datetime from glob import glob from itertools import cycle from time import mktime from application import log from application.notification import IObserver, NotificationCenter from application.python.util import Null, Singleton from eventlet import coros, proc from sipsimple.application import SIPApplication from sipsimple.audio import WavePlayer, WavePlayerError from sipsimple.conference import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import FromHeader, ToHeader, RouteHeader, SIPURI, Message, SIPCoreError, SIPCoreInvalidStateError from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads.conference import Conference, ConferenceDescription, ConferenceState, Endpoint, EndpointStatus, HostInfo, JoiningInfo, Media, User, Users, WebPage from sipsimple.payloads.iscomposing import IsComposingMessage, State, LastActive, Refresh, ContentType from sipsimple.streams.applications.chat import CPIMIdentity, CPIMMessage, CPIMParserError from sipsimple.streams.msrp import ChatStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import Timestamp from twisted.internet import reactor from zope.interface import implements from sylk.applications.conference import database from sylk.applications.conference.configuration import ConferenceConfig from sylk.configuration.datatypes import ResourcePath def format_identity(identity, cpim_format=False): uri = identity.uri if identity.display_name: return u'%s ' % (identity.display_name, uri.user, uri.host) elif cpim_format: return u'' % (uri.user, uri.host) else: return u'sip:%s@%s' % (uri.user, uri.host) def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type def format_identity_with_stream_types(identity, streams): return '%s %s' % (format_identity(identity), format_stream_types(streams)) def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text def chunks(text, size): for i in xrange(0, len(text), size): yield text[i:i+size] class SIPMessage(object): def __init__(self, sender, recipient, content_type, body): self.sender = sender self.recipient = recipient self.content_type = content_type self.body = body self.timestamp = None class Room(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ __metaclass__ = Singleton implements(IObserver) def __init__(self, uri): self.uri = uri self.identity = CPIMIdentity.parse('' % self.uri) self.sessions = [] self.sessions_with_proposals = [] self.subscriptions = [] self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.moh_player = None self.conference_info_payload = None @classmethod def get_room(cls, uri): room_uri = '%s@%s' % (uri.user, uri.host) room = cls(room_uri) return room @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' def start(self): if self.state != 'stopped': return self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.moh_player = MoHPlayer(self.audio_conference) self.moh_player.initialize() self.state = 'started' def stop(self): if self.state != 'started': return self.state = 'stopped' self.message_dispatcher.kill(proc.ProcExit) self.moh_player.stop() self.moh_player = None self.audio_conference = None def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'message': if data.timestamp is not None and isinstance(data.timestamp, Timestamp): timestamp = datetime.fromtimestamp(mktime(data.timestamp.timetuple())) else: timestamp = datetime.now() if data.sender.uri != session.remote_identity.uri: return recipient = data.recipients[0] database.async_save_message(format_identity(session.remote_identity, True), self.uri, data.body, data.content_type, unicode(data.sender), unicode(recipient), timestamp) if recipient.uri == self.identity.uri: self.dispatch_message(session, data) else: self.dispatch_private_message(session, data) elif message_type == 'sip_message': database.async_save_message(format_identity(session.remote_identity, True), self.uri, data.body, data.content_type, unicode(data.sender), data.recipient, data.timestamp) self.dispatch_message(session, data) elif message_type == 'composing_indication': if data.sender.uri != session.remote_identity.uri: return recipient = data.recipients[0] if recipient.uri == self.identity.uri: self.dispatch_iscomposing(session, data) else: self.dispatch_private_iscomposing(session, data) def dispatch_message(self, session, message): for s in (s for s in self.sessions if s is not session): try: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() chat_stream.send_message(message.body, message.content_type, local_identity=identity, recipients=[self.identity], timestamp=message.timestamp) except ChatStreamError, e: log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e)) except StopIteration: # This session doesn't have a chat stream, send him a SIP MESSAGE if ConferenceConfig.enable_sip_message: self.send_sip_message(session.remote_identity.uri, s.remote_identity.uri, message.content_type, message.body) def dispatch_private_message(self, session, message): # Private messages are delivered to all sessions matching the recipient but also to the sender, # for replication in clients recipient = message.recipients[0] for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)): try: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() chat_stream.send_message(message.body, message.content_type, local_identity=identity, recipients=[recipient], timestamp=message.timestamp) except ChatStreamError, e: log.error(u'Error dispatching private message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_iscomposing(self, session, data): for s in (s for s in self.sessions if s is not session): try: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity, recipients=[self.identity]) except ChatStreamError, e: log.error(u'Error dispatching composing indication to %s: %s' % (s.remote_identity.uri, e)) except StopIteration: # This session doesn't have a chat stream, send him a SIP MESSAGE if ConferenceConfig.enable_sip_message: body = IsComposingMessage(state=State(data.state), refresh=Refresh(data.refresh), last_active=LastActive(data.last_active or datetime.now()), content_type=ContentType('text')).toxml() self.send_sip_message(session.remote_identity.uri, s.remote_identity.uri, IsComposingMessage.content_type, body) def dispatch_private_iscomposing(self, session, data): recipient_uri = data.recipients[0].uri for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri): try: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity) except ChatStreamError, e: log.error(u'Error dispatching private composing indication to %s: %s' % (s.remote_identity.uri, e)) def dispatch_server_message(self, body, content_type='text/plain', exclude=None): for session in (session for session in self.sessions if session is not exclude): try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() chat_stream.send_message(body, content_type, local_identity=self.identity, recipients=[self.identity]) except StopIteration: # This session doesn't have a chat stream, send him a SIP MESSAGE if ConferenceConfig.enable_sip_message: self.send_sip_message(self.identity.uri, session.remote_identity.uri, content_type, body) self_identity = format_identity(self.identity, cpim_format=True) database.async_save_message(self_identity, self.uri, body, content_type, self_identity, self_identity, datetime.now()) def dispatch_conference_info(self): data = self.build_conference_info_payload() for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(Conference.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass @run_in_green_thread def send_sip_message(self, from_uri, to_uri, content_type, body): lookup = DNSLookup() settings = SIPSimpleSettings() try: routes = lookup.lookup_sip_proxy(to_uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning(u'DNS lookup error while looking for %s proxy' % to_uri) else: route = routes.pop(0) from_header = FromHeader(self.identity.uri) to_header = ToHeader(SIPURI.new(to_uri)) route_header = RouteHeader(route.get_uri()) sender = CPIMIdentity(from_uri) for chunk in chunks(body, 1000): msg = CPIMMessage(chunk, content_type, sender=sender, recipients=[self.identity]) message_request = Message(from_header, to_header, route_header, 'message/cpim', str(msg)) message_request.send() def render_text_welcome(self, session): txt = 'Welcome to the conference.' user_count = len(set(str(s.remote_identity.uri) for s in self.sessions) - set([str(session.remote_identity.uri)])) if user_count == 0: txt += ' You are the first participant in the room.' else: if user_count == 1: txt += ' There is one more participant in the room.' else: txt += ' There are %s more participants in the room.' % user_count return txt def _play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() except WavePlayerError, e: log.warning(u"Error playing file %s: %s" % (file, e)) @run_in_green_thread def play_audio_welcome(self, session, welcome_prompt=True): audio_stream = (stream for stream in session.streams if stream.type == 'audio').next() player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_play=False, volume=50) audio_stream.bridge.add(player) if welcome_prompt: file = ResourcePath('sounds/co_welcome_conference.wav').normalized self._play_file_in_player(player, file, 1) user_count = len(set(str(s.remote_identity.uri) for s in self.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(session.remote_identity.uri)])) if user_count == 0: file = ResourcePath('sounds/co_only_one.wav').normalized self._play_file_in_player(player, file, 0.5) elif user_count == 1: file = ResourcePath('sounds/co_there_is.wav').normalized self._play_file_in_player(player, file, 0.5) elif user_count < 100: file = ResourcePath('sounds/co_there_are.wav').normalized self._play_file_in_player(player, file, 0.2) if user_count <= 24: file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized self._play_file_in_player(player, file, 0.1) else: file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized self._play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized self._play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/co_more_participants.wav').normalized self._play_file_in_player(player, file, 0) audio_stream.bridge.remove(player) self.audio_conference.add(audio_stream) self.audio_conference.unhold() if len(self.audio_conference.streams) == 1: # Play MoH self.moh_player.play() else: self.moh_player.pause() def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) remote_identity = CPIMIdentity.parse(format_identity(session.remote_identity, cpim_format=True)) # getting last messages may take time, so new messages can arrive before messages the last message from history for msg in database.get_last_messages(self.uri, ConferenceConfig.replay_history): recipient = CPIMIdentity.parse(msg.cpim_recipient) sender = CPIMIdentity.parse(msg.cpim_sender) if recipient.uri in (self.identity.uri, remote_identity.uri) or sender.uri == remote_identity.uri: chat_stream.send_message(msg.cpim_body, msg.cpim_content_type, local_identity=sender, recipients=[recipient], timestamp=msg.cpim_timestamp) try: audio_stream = (stream for stream in session.streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) self.play_audio_welcome(session) self.dispatch_conference_info() if len(self.sessions) == 1: log.msg(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams))) else: log.msg(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), format_stream_types(session.streams)), exclude=session) def remove_session(self, session): notification_center = NotificationCenter() try: chat_stream = (stream for stream in session.streams or [] if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = (stream for stream in session.streams or [] if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.dispatch_conference_info() log.msg(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, format_session_duration(session))) if not self.sessions: log.msg(u'Last participant left conference %s' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), format_session_duration(session))) + def terminate_sessions(self, uri): + if not self.started: + return + for session in (session for session in self.sessions if session.remote_identity.uri == uri): + session.end() + def handle_incoming_sip_message(self, message_request, data): content_type = data.headers.get('Content-Type', Null)[0] from_header = data.headers.get('From', Null) if content_type is Null or from_header is Null: message_request.answer(400) return try: # Take the first session which doesn't have a chat stream. This is needed because the # seession picked up here will later be ignored. It doesn't matter if we ignore a session # without a chat stream, because that means we will send SIP MESSAGE, and it will fork, so # everyone will get it. session = (session for session in self.sessions if str(session.remote_identity.uri) == str(from_header.uri) and any(stream for stream in session.streams if stream.type != 'chat')).next() except StopIteration: # MESSAGE from a user which is not in this room message_request.answer(503) return if content_type == 'message/cpim': try: message = CPIMMessage.parse(data.body) except CPIMParserError: message_request.answer(500) return else: body = message.body content_type = message.content_type sender = message.sender or format_identity(from_header, cpim_format=True) if message.timestamp is not None and isinstance(message.timestamp, Timestamp): timestamp = datetime.fromtimestamp(mktime(message.timestamp.timetuple())) else: timestamp = datetime.now() else: body = data.body sender = format_identity(from_header, cpim_format=True) timestamp = datetime.now() message_request.answer(200) if content_type == IsComposingMessage.content_type: return log.msg(u'New incoming MESSAGE from %s' % session.remote_identity.uri) self_identity = format_identity(self.identity, cpim_format=True) message = SIPMessage(sender=sender, recipient=self_identity, content_type=content_type, body=body) message.timestamp = timestamp self.incoming_message_queue.send((session, 'sip_message', message)) def build_conference_info_payload(self): if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent) host_info = HostInfo(web_page=WebPage('http://sylkserver.com')) self.conference_info_payload = Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=Users()) user_count = len(set(str(s.remote_identity.uri) for s in self.sessions)) self.conference_info_payload.conference_state = ConferenceState(user_count=user_count, active=True) users = Users() for session in self.sessions: try: user = (user for user in users if user.entity == str(session.remote_identity.uri)).next() except StopIteration: user = User(str(session.remote_identity.uri), display_text=session.remote_identity.display_name) users.append(user) joining_info = JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = EndpointStatus('on-hold' if session_on_hold else 'connected') endpoint = Endpoint(str(session._invitation.remote_contact_header.uri), display_text=session.remote_identity.display_name, joining_info=joining_info, status=hold_status) for stream in session.streams: endpoint.append(Media(id(stream), media_type=format_conference_stream_type(stream))) user.append(endpoint) self.conference_info_payload.users = users return self.conference_info_payload.toxml() def handle_incoming_subscription(self, subscribe_request, data): content = self.build_conference_info_payload() notification_center = NotificationCenter() notification_center.add_observer(self, sender=subscribe_request) subscribe_request.accept(Conference.content_type, content) self.subscriptions.append(subscribe_request) def accept_proposal(self, session, streams): if session in self.sessions_with_proposals: session.accept_proposal(streams) self.sessions_with_proposals.remove(session) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_AudioStreamDidTimeout(self, notification): stream = notification.sender session = stream._session log.msg(u'Audio stream for session %s timed out' % format_identity(session.remote_identity)) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): data = notification.data.message session = notification.sender.session self.incoming_message_queue.send((session, 'message', data)) def _NH_ChatStreamGotComposingIndication(self, notification): data = notification.data session = notification.sender.session self.incoming_message_queue.send((session, 'composing_indication', data)) def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification_center = NotificationCenter() notification_center.remove_observer(self, sender=subscription) self.subscriptions.remove(subscription) def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.msg(u'%s has put the audio session on hold' % format_identity(session.remote_identity)) else: log.msg(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity)) self.dispatch_conference_info() def _NH_SIPSessionGotProposal(self, notification): session = notification.sender audio_streams = [stream for stream in notification.data.streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] self.sessions_with_proposals.append(session) reactor.callLater(4, self.accept_proposal, session, streams) def _NH_SIPSessionGotRejectProposal(self, notification): session = notification.sender self.sessions_with_proposals.remove(session) def _NH_SIPSessionDidRenegotiateStreams(self, notification): notification_center = NotificationCenter() session = notification.sender streams = notification.data.streams if notification.data.action == 'add': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) remote_identity = CPIMIdentity.parse(format_identity(session.remote_identity, cpim_format=True)) # getting last messages may take time, so new messages can arrive before messages the last message from history for msg in database.get_last_messages(self.uri, ConferenceConfig.replay_history): recipient = CPIMIdentity.parse(msg.cpim_recipient) sender = CPIMIdentity.parse(msg.cpim_sender) if recipient.uri in (self.identity.uri, remote_identity.uri) or sender.uri == remote_identity.uri: chat_stream.send_message(msg.cpim_body, msg.cpim_content_type, local_identity=sender, recipients=[recipient], timestamp=msg.cpim_timestamp) log.msg(u'%s has added chat to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) log.msg(u'%s has added audio to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added audio' % format_identity(session.remote_identity), exclude=session) self.play_audio_welcome(session, False) elif notification.data.action == 'remove': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) log.msg(u'%s has removed chat from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() log.msg(u'%s has removed audio from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed audio' % format_identity(session.remote_identity), exclude=session) if not session.streams: log.msg(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri)) session.end() self.dispatch_conference_info() class MoHPlayer(object): implements(IObserver) def __init__(self, conference): self.conference = conference self.files = None self.paused = True self._player = None def initialize(self): files = glob('%s/*.wav' % ResourcePath('sounds/moh').normalized) if not files: log.error(u'No files found, MoH is disabled') return random.shuffle(files) self.files = cycle(files) self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_play=False, volume=20) self.conference.bridge.add(self._player) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._player) def stop(self): if self._player is None: return notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._player) self.conference.bridge.remove(self._player) self._player.stop() self._player = None def play(self): if self._player is not None and self.paused: self.paused = False self._play_next_file() log.msg(u'Started playing music on hold') def pause(self): if self._player is not None and not self.paused: self.paused = True self._player.stop() log.msg(u'Stopped playing music on hold') def _play_next_file(self): self._player.filename = self.files.next() self._player.play() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_WavePlayerDidFail(self, notification): if not self.paused: self._play_next_file() def _NH_WavePlayerDidEnd(self, notification): if not self.paused: self._play_next_file() diff --git a/sylk/applications/ircconference/__init__.py b/sylk/applications/ircconference/__init__.py index 7c63df6..a73cd99 100644 --- a/sylk/applications/ircconference/__init__.py +++ b/sylk/applications/ircconference/__init__.py @@ -1,96 +1,99 @@ # Copyright (C) 2011 AG Projects. See LICENSE for details # from application import log from application.notification import IObserver, NotificationCenter from application.python.util import Null, Singleton from twisted.internet import reactor from zope.interface import implements from sylk.applications import ISylkApplication, sylk_application from sylk.applications.ircconference.room import IRCRoom @sylk_application class IRCConferenceApplication(object): __metaclass__ = Singleton implements(ISylkApplication, IObserver) __appname__ = 'irc-conference' def __init__(self): self.rooms = set() self.pending_sessions = [] def incoming_session(self, session): log.msg('New incoming session from %s' % session.remote_identity.uri) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject(488) return self.pending_sessions.append(session) notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) if audio_streams: session.send_ring_indication() if chat_streams: # Disable private message capability chat_streams[0].chatroom_capabilities = [] streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): to_header = data.headers.get('To', Null) if to_header is Null: subscribe_request.reject(400) return room = IRCRoom.get_room(data.request_uri) if not room.started: room = IRCRoom.get_room(to_header.uri) if not room.started: subscribe_request.reject(480) return room.handle_incoming_subscription(subscribe_request, data) + def incoming_referral(self, refer_request, data): + pass + def incoming_sip_message(self, message_request, data): pass def accept_session(self, session, streams): if session in self.pending_sessions: session.accept(streams, is_focus=True) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender self.pending_sessions.remove(session) room = IRCRoom.get_room(session._invitation.request_uri) # FIXME room.start() room.add_session(session) self.rooms.add(room) def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.msg('Session from %s ended' % session.remote_identity.uri) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) room = IRCRoom.get_room(session._invitation.request_uri) # FIXME if session in room.sessions: # We could get this notifiction even if we didn't get SIPSessionDidStart room.remove_session(session) if room.empty: room.stop() try: self.rooms.remove(room) except KeyError: pass def _NH_SIPSessionDidFail(self, notification): session = notification.sender self.pending_sessions.remove(session) log.msg('Session from %s failed' % session.remote_identity.uri) diff --git a/sylk/server.py b/sylk/server.py index fb62d93..258b337 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,172 +1,173 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import sys from threading import Event from application import log from application.notification import NotificationCenter from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration import ConfigurationError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer, Engine, SIPCoreError from sipsimple.session import SessionManager from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from sylk.applications import IncomingRequestHandler from sylk.configuration import SIPConfig from sylk.configuration.backend import MemoryBackend from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import Logger # Load extensions needed for integration with SIP SIMPLE SDK import sylk.extensions class SylkServer(SIPApplication): def __init__(self): self.logger = None self.request_handler = IncomingRequestHandler() self.stop_event = Event() def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) self.logger = Logger() Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: SIPApplication.start(self, MemoryBackend()) except ConfigurationError, e: log.fatal("Error loading configuration: ",e) sys.exit(1) def _load_configuration(self): account_manager = AccountManager() account = Account("account@example.com") # an account is required by AccountManager account_manager.default_account = account def _initialize_subsystems(self): account_manager = AccountManager() engine = Engine() notification_center = NotificationCenter() session_manager = SessionManager() settings = SIPSimpleSettings() self._load_configuration() notification_center.post_notification('SIPApplicationWillStart', sender=self, data=TimestampedNotificationData()) if self.state == 'stopping': reactor.stop() return account = account_manager.default_account # initialize core notification_center.add_observer(self, sender=engine) options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_protocol='TLSv1', tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, tls_timeout=3000, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # logging log_level=settings.logs.pjsip_level, trace_sip=True, # events and requests to handle - events={"conference": ["application/conference-info+xml"]}, + events={"conference": ["application/conference-info+xml"], + "refer": ["message/sipfrag;version=2.0"]}, incoming_events=set(['conference']), incoming_requests=set(['MESSAGE']) ) try: engine.start(**options) except SIPCoreError: self.end_reason = 'engine failed' reactor.stop() return # initialize TLS try: engine.set_tls_options(port=settings.sip.tls_port if 'tls' in settings.sip.transport_list else None, protocol=settings.tls.protocol, verify_server=account.tls.verify_server if account else False, ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None, cert_file=account.tls.certificate.normalized if account and account.tls.certificate else None, privkey_file=account.tls.certificate.normalized if account and account.tls.certificate else None, timeout=settings.tls.timeout) except Exception, e: notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=TimestampedNotificationData(error=e)) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, settings.audio.tail_length) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize middleware components account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self, data=TimestampedNotificationData()) def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal("Couldn't set TLS options: %s" % notification.data.error) def _NH_SIPApplicationWillStart(self, notification): self.logger.start() self.request_handler.start() settings = SIPSimpleSettings() if settings.logs.trace_sip and self.logger._siptrace_filename is not None: log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename) if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None: log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename) if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None: log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename) if settings.logs.trace_notifications and self.logger._notifications_filename is not None: log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename) def _NH_SIPApplicationDidStart(self, notification): engine = Engine() settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.msg("SylkServer started, listening on:") for transport in settings.sip.transport_list: try: log.msg("%s:%d (%s)" % (local_ip, getattr(engine, '%s_port' % transport), transport.upper())) except TypeError: pass def _NH_SIPApplicationWillEnd(self, notification): self.request_handler.stop() def _NH_SIPApplicationDidEnd(self, notification): self.logger.stop() self.stop_event.set() def _NH_SIPEngineGotException(self, notification): log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback)