diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py index a6baae9..b693438 100644 --- a/sylk/applications/conference/__init__.py +++ b/sylk/applications/conference/__init__.py @@ -1,243 +1,247 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # import re from application import log from application.notification import IObserver, NotificationCenter from application.python.util import Null, Singleton from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError, Header, ContactHeader, FromHeader, ToHeader from sipsimple.lookup import DNSLookup from sipsimple.streams import AudioStream from twisted.internet import reactor from zope.interface import implements from sylk.applications import ISylkApplication, sylk_application from sylk.applications.conference.configuration import ConferenceConfig from sylk.applications.conference.room import Room from sylk.configuration import SIPConfig from sylk.extensions import ChatStream from sylk.session import ServerSession # Initialize database from sylk.applications.conference import database @sylk_application class ConferenceApplication(object): __metaclass__ = Singleton implements(ISylkApplication, IObserver) __appname__ = 'conference' def __init__(self): self.rooms = set() self.pending_sessions = [] def incoming_session(self, session): log.msg('New incoming session from %s' % session.remote_identity.uri) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject(488) return self.pending_sessions.append(session) notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) if audio_streams: session.send_ring_indication() streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): to_header = data.headers.get('To', Null) if to_header is Null: subscribe_request.reject(400) return room = Room.get_room(data.request_uri) if not room.started: room = Room.get_room(to_header.uri) if not room.started: subscribe_request.reject(480) return room.handle_incoming_subscription(subscribe_request, data) def incoming_referral(self, refer_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) refer_to_header = data.headers.get('Refer-To', Null) if Null in (from_header, to_header, refer_to_header): refer_request.reject(400) return referral_handler = IncomingReferralHandler(refer_request, data) referral_handler.start() def incoming_sip_message(self, message_request, data): if not ConferenceConfig.enable_sip_message: return room = Room.get_room(data.request_uri) if not room.started: message_request.answer(480) return room.handle_incoming_sip_message(message_request, data) def accept_session(self, session, streams): if session in self.pending_sessions: session.accept(streams, is_focus=True) def add_participant(self, session, room_uri): log.msg('New outgoing session to %s' % session.remote_identity.uri) notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) room = Room.get_room(room_uri) room.start() room.add_session(session) self.rooms.add(room) def remove_participant(self, participant_uri, room_uri): room = Room.get_room(room_uri) room.terminate_sessions(participant_uri) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender self.pending_sessions.remove(session) room = Room.get_room(session._invitation.request_uri) # FIXME room.start() room.add_session(session) self.rooms.add(room) def _NH_SIPSessionDidEnd(self, notification): session = notification.sender log.msg('Session from %s ended' % session.remote_identity.uri) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) if session.direction == 'incoming': room = Room.get_room(session._invitation.request_uri) # FIXME else: room = Room.get_room(session.local_identity.uri) if session in room.sessions: # We could get this notifiction even if we didn't get SIPSessionDidStart room.remove_session(session) if room.empty: room.stop() try: self.rooms.remove(room) except KeyError: pass def _NH_SIPSessionDidFail(self, notification): session = notification.sender self.pending_sessions.remove(session) log.msg('Session from %s failed' % session.remote_identity.uri) class IncomingReferralHandler(object): implements(IObserver) def __init__(self, refer_request, data): self._refer_request = refer_request self._refer_headers = data.headers self.room_uri = data.headers.get('To').uri self.refer_to_uri = data.headers.get('Refer-To').uri self.method = data.headers.get('Refer-To').parameters.get('method', 'invite').lower() self.session = None self.streams = [] def start(self): if not re.match('^(sip:|sips:).*', self.refer_to_uri): self.refer_to_uri = 'sip:%s' % self.refer_to_uri try: self.refer_to_uri = SIPURI.parse(self.refer_to_uri) except SIPCoreError: self._refer_request.reject(488) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._refer_request) if self.method == 'invite': self._refer_request.accept() settings = SIPSimpleSettings() lookup = DNSLookup() notification_center.add_observer(self, sender=lookup) lookup.lookup_sip_proxy(self.refer_to_uri, settings.sip.transport_list) elif self.method == 'bye': self._refer_request.accept() try: conference_application = ConferenceApplication() conference_application.remove_participant(self.refer_to_uri, self.room_uri) except RoomError: self._refer_request.end(404) else: self._refer_request.end(200) else: self._refer_request.reject(488) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) account = AccountManager().default_account self.streams.append(AudioStream(account)) self.streams.append(ChatStream(account)) self.session = ServerSession(account) notification_center.add_observer(self, sender=self.session) original_from_header = self._refer_headers.get('From') + if original_from_header.display_name: + original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host) + else: + original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host) from_header = FromHeader(SIPURI.new(self.room_uri)) to_header = ToHeader(self.refer_to_uri) transport = notification.data.result[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters)) extra_headers = [] if self._refer_headers.get('Referred-By', None) is not None: extra_headers.append(Header.new(self._refer_headers.get('Referred-By'))) - extra_headers.append(Header('Subject', 'Invitation to conference from: %s' % original_from_header.uri)) - self.session.connect(from_header, to_header, contact_header, routes=notification.data.result, streams=self.streams, is_focus=True, extra_headers=extra_headers) + subject = u'Join conference request from: %s' % original_identity + self.session.connect(from_header, to_header, contact_header, routes=notification.data.result, streams=self.streams, is_focus=True, subject=subject, extra_headers=extra_headers) def _NH_DNSLookupDidFail(self, notification): NotificationCenter().remove_observer(self, sender=notification.sender) def _NH_SIPSessionGotRingIndication(self, notification): if self._refer_request is not None: self._refer_request.send_notify(180) def _NH_SIPSessionGotProvisionalResponse(self, notification): if self._refer_request is not None: self._refer_request.send_notify(notification.data.code) def _NH_SIPSessionDidStart(self, notification): NotificationCenter().remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) conference_application = ConferenceApplication() conference_application.add_participant(self.session, self.room_uri) self.session = None self.streams = [] def _NH_SIPSessionDidFail(self, notification): NotificationCenter().remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(notification.data.code or 500) self.session = None self.streams = [] def _NH_SIPSessionDidEnd(self, notification): # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead NotificationCenter().remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) self.session = None self.streams = [] def _NH_SIPIncomingReferralDidEnd(self, notification): NotificationCenter().remove_observer(self, sender=notification.sender) self._refer_request = None self._refer_headers = None diff --git a/sylk/session.py b/sylk/session.py index a5b3b00..4297a8f 100644 --- a/sylk/session.py +++ b/sylk/session.py @@ -1,213 +1,216 @@ # Copyright (C) 2011 AG Projects. See LICENSE for details. # from datetime import datetime from application.notification import NotificationCenter from application.python.util import Null from eventlet import api from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Invitation, SIPCoreError, sip_status_messages -from sipsimple.core import RouteHeader +from sipsimple.core import RouteHeader, SubjectHeader from sipsimple.core import SDPConnection, SDPSession from sipsimple.session import Session, InvitationDidFailError, MediaStreamDidFailError, transition_state from sipsimple.threading.green import run_in_green_thread from sipsimple.util import TimestampedNotificationData class ServerSession(Session): @transition_state(None, 'connecting') @run_in_green_thread - def connect(self, from_header, to_header, contact_header, routes, streams, is_focus=False, extra_headers=[]): + def connect(self, from_header, to_header, contact_header, routes, streams, is_focus=False, subject=None, extra_headers=[]): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False received_code = 0 received_reason = None unhandled_notifications = [] self.direction = 'outgoing' self.proposed_streams = streams self.route = routes[0] self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() self._local_identity = from_header self._remote_identity = to_header + self.__dict__['subject'] = subject self.conference = Null notification_center.add_observer(self, sender=self._invitation) notification_center.post_notification('SIPSessionNewOutgoing', self, TimestampedNotificationData(streams=streams)) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 local_ip = contact_header.uri.host local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent) stun_addresses = [] for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(for_offer=True) local_sdp.media.append(media) stun_addresses.extend((value.split(' ', 5)[4] for value in media.attributes.getall('candidate') if value.startswith('S '))) if stun_addresses: local_sdp.connection.address = stun_addresses[0] route_header = RouteHeader(self.route.get_uri()) if is_focus: contact_header.parameters['isfocus'] = None + if self.subject: + extra_headers.append(SubjectHeader(self.subject)) self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, extra_headers=extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=received_code, reason=received_reason, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData()) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDidFailError(notification.sender, notification.data) except api.TimeoutError: self.greenlet = None self.end() return notification_center.post_notification('SIPSessionWillStart', self, TimestampedNotificationData()) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) [self._channel.send(notification) for notification in invitation_notifications] while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData()) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDidFailError(notification.sender, notification.data) except (MediaStreamDidFailError, api.TimeoutError), e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=received_code, reason=received_reason, error=error) except InvitationDidFailError, e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': notification_center.post_notification('SIPSessionWillEnd', self, TimestampedNotificationData(originator=e.data.originator)) if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200])) self.end_time = datetime.now() notification_center.post_notification('SIPSessionDidEnd', self, TimestampedNotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason)) if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None except SIPCoreError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=received_code, reason=received_reason, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = datetime.now() notification_center.post_notification('SIPSessionDidStart', self, TimestampedNotificationData(streams=self.streams)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold()