diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py index 5dda11b..afc6eb3 100644 --- a/sylk/applications/conference/__init__.py +++ b/sylk/applications/conference/__init__.py @@ -1,438 +1,438 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details # import mimetypes import os import re import shutil from application.notification import IObserver, NotificationCenter from application.python import Null from gnutls.interfaces.twisted import X509Credentials from sipsimple.account.bonjour import BonjourPresenceState from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader, SubjectHeader from sipsimple.lookup import DNSLookup from sipsimple.session import IllegalStateError from sipsimple.streams import AudioStream from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications import SylkApplication from sylk.applications.conference.configuration import get_room_config, ConferenceConfig from sylk.applications.conference.logger import log from sylk.applications.conference.room import Room from sylk.applications.conference.web import ScreenSharingWebServer from sylk.bonjour import BonjourServices from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig -from sylk.extensions import ChatStream from sylk.session import Session +from sylk.streams import ChatStream from sylk.tls import Certificate, PrivateKey class ACLValidationError(Exception): pass class RoomNotFoundError(Exception): pass class ConferenceApplication(SylkApplication): implements(IObserver) def __init__(self): self._rooms = {} self.invited_participants_map = {} self.bonjour_focus_service = Null self.bonjour_room_service = Null self.screen_sharing_web_server = None def start(self): # cleanup old files for path in (ConferenceConfig.file_transfer_dir, ConferenceConfig.screen_sharing_dir): try: shutil.rmtree(path) except EnvironmentError: pass if ServerConfig.enable_bonjour and ServerConfig.default_application == 'conference': self.bonjour_focus_service = BonjourServices(service='sipfocus') self.bonjour_focus_service.start() log.msg("Bonjour publication started for service 'sipfocus'") self.bonjour_room_service = BonjourServices(service='sipuri', name='Conference Room', uri_user='conference') self.bonjour_room_service.start() self.bonjour_room_service.presence_state = BonjourPresenceState('available', u'No participants') log.msg("Bonjour publication started for service 'sipuri'") self.screen_sharing_web_server = ScreenSharingWebServer(ConferenceConfig.screen_sharing_dir) if ConferenceConfig.screen_sharing_use_https and ConferenceConfig.screen_sharing_certificate is not None: cert = Certificate(ConferenceConfig.screen_sharing_certificate.normalized) key = PrivateKey(ConferenceConfig.screen_sharing_certificate.normalized) credentials = X509Credentials(cert, key) else: credentials = None self.screen_sharing_web_server.start(ConferenceConfig.screen_sharing_ip, ConferenceConfig.screen_sharing_port, credentials) listen_address = self.screen_sharing_web_server.listener.getHost() log.msg("ScreenSharing listener started on %s:%d" % (listen_address.host, listen_address.port)) def stop(self): self.bonjour_focus_service.stop() self.bonjour_room_service.stop() self.screen_sharing_web_server.stop() def get_room(self, uri, create=False): room_uri = '%s@%s' % (uri.user, uri.host) try: room = self._rooms[room_uri] except KeyError: if create: room = Room(room_uri) self._rooms[room_uri] = room return room else: raise RoomNotFoundError else: return room def remove_room(self, uri): room_uri = '%s@%s' % (uri.user, uri.host) self._rooms.pop(room_uri, None) def validate_acl(self, room_uri, from_uri): room_uri = '%s@%s' % (room_uri.user, room_uri.host) cfg = get_room_config(room_uri) if cfg.access_policy == 'allow,deny': if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri): return raise ACLValidationError else: if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri): raise ACLValidationError def incoming_session(self, session): log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri)) audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio'] chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat'] transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer'] if not audio_streams and not chat_streams and not transfer_streams: log.msg(u'Session rejected: invalid media, only RTP audio and MSRP chat are supported') session.reject(488) return try: self.validate_acl(session._invitation.request_uri, session.remote_identity.uri) except ACLValidationError: log.msg(u'Session rejected: unauthorized by access list') session.reject(403) return # Check if requested files belong to this room for stream in (stream for stream in transfer_streams if stream.direction == 'sendonly'): try: room = self.get_room(session._invitation.request_uri) except RoomNotFoundError: log.msg(u'Session rejected: room not found') session.reject(404) return try: file = next(file for file in room.files if file.hash == stream.file_selector.hash) except StopIteration: log.msg(u'Session rejected: requested file not found') session.reject(404) return filename = os.path.basename(file.name) for dirpath, dirnames, filenames in os.walk(os.path.join(ConferenceConfig.file_transfer_dir, room.uri)): if filename in filenames: path = os.path.join(dirpath, filename) stream.file_selector.fd = open(path, 'r') if stream.file_selector.size is None: stream.file_selector.size = os.fstat(stream.file_selector.fd.fileno()).st_size if stream.file_selector.type is None: mime_type, encoding = mimetypes.guess_type(filename) if encoding is not None: type = 'application/x-%s' % encoding elif mime_type is not None: type = mime_type else: type = 'application/octet-stream' stream.file_selector.type = type break else: # File got removed from the filesystem log.msg(u'Session rejected: requested file removed from the filesystem') session.reject(404) return NotificationCenter().add_observer(self, sender=session) if audio_streams: session.send_ring_indication() streams = [streams[0] for streams in (audio_streams, chat_streams, transfer_streams) if streams] reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams) def incoming_subscription(self, subscribe_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) if Null in (from_header, to_header): subscribe_request.reject(400) return if subscribe_request.event != 'conference': log.msg(u'Subscription rejected: only conference event is supported') subscribe_request.reject(489) return try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: try: self.validate_acl(to_header.uri, from_header.uri) except ACLValidationError: # Check if we need to skip the ACL because this was an invited participant if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})): log.msg(u'Subscription rejected: unauthorized by access list') subscribe_request.reject(403) return try: room = self.get_room(data.request_uri) except RoomNotFoundError: try: room = self.get_room(to_header.uri) except RoomNotFoundError: log.msg(u'Subscription rejected: room not yet created') subscribe_request.reject(480) return if not room.started: log.msg(u'Subscription rejected: room not started yet') subscribe_request.reject(480) else: room.handle_incoming_subscription(subscribe_request, data) def incoming_referral(self, refer_request, data): from_header = data.headers.get('From', Null) to_header = data.headers.get('To', Null) refer_to_header = data.headers.get('Refer-To', Null) if Null in (from_header, to_header, refer_to_header): refer_request.reject(400) return log.msg(u'Room %s - join request from %s to %s' % ('%s@%s' % (to_header.uri.user, to_header.uri.host), from_header.uri, refer_to_header.uri)) try: self.validate_acl(data.request_uri, from_header.uri) except ACLValidationError: log.msg(u'Room %s - invite participant request rejected: unauthorized by access list' % data.request_uri) refer_request.reject(403) return referral_handler = IncomingReferralHandler(refer_request, data) referral_handler.start() def incoming_message(self, message_request, data): log.msg(u'SIP MESSAGE is not supported, use MSRP media instead') message_request.answer(405) def accept_session(self, session, streams): if session.state == 'incoming': try: session.accept(streams, is_focus=True) except IllegalStateError: pass def add_participant(self, session, room_uri): # Keep track of the invited participants, we must skip ACL policy # for SUBSCRIBE requests room_uri_str = '%s@%s' % (room_uri.user, room_uri.host) log.msg(u'Room %s - outgoing session to %s started' % (room_uri_str, session.remote_identity.uri)) d = self.invited_participants_map.setdefault(room_uri_str, {}) d.setdefault(str(session.remote_identity.uri), 0) d[str(session.remote_identity.uri)] += 1 NotificationCenter().add_observer(self, sender=session) room = self.get_room(room_uri, True) room.start() room.add_session(session) def remove_participant(self, participant_uri, room_uri): try: room = self.get_room(room_uri) except RoomNotFoundError: pass else: log.msg('Room %s - %s removed from conference' % (room_uri, participant_uri)) room.terminate_sessions(participant_uri) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): session = notification.sender room = self.get_room(session._invitation.request_uri, True) # FIXME room.start() room.add_session(session) @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) if session.direction == 'incoming': room_uri = session._invitation.request_uri # FIXME else: # Clear invited participants mapping room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host) d = self.invited_participants_map[room_uri_str] d[str(session.remote_identity.uri)] -= 1 if d[str(session.remote_identity.uri)] == 0: del d[str(session.remote_identity.uri)] room_uri = session.local_identity.uri # We could get this notifiction even if we didn't get SIPSessionDidStart try: room = self.get_room(room_uri) except RoomNotFoundError: return if session in room.sessions: room.remove_session(session) if not room.stopping and room.empty: self.remove_room(room_uri) room.stop() def _NH_SIPSessionDidFail(self, notification): session = notification.sender notification.center.remove_observer(self, sender=session) log.msg(u'Session from %s failed: %s' % (session.remote_identity.uri, notification.data.reason)) class IncomingReferralHandler(object): implements(IObserver) def __init__(self, refer_request, data): self._refer_request = refer_request self._refer_headers = data.headers self.room_uri = data.request_uri self.room_uri_str = '%s@%s' % (self.room_uri.user, self.room_uri.host) self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri) self.method = data.headers.get('Refer-To').parameters.get('method', 'INVITE').upper() self.session = None self.streams = [] def start(self): if not self.refer_to_uri.startswith(('sip:', 'sips:')): self.refer_to_uri = 'sip:%s' % self.refer_to_uri try: self.refer_to_uri = SIPURI.parse(self.refer_to_uri) except SIPCoreError: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.reject(488) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._refer_request) if self.method == 'INVITE': self._refer_request.accept() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.refer_to_uri lookup = DNSLookup() notification_center.add_observer(self, sender=lookup) lookup.lookup_sip_proxy(uri, settings.sip.transport_list) elif self.method == 'BYE': log.msg('Room %s - %s removed %s from the room' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self._refer_request.accept() conference_application = ConferenceApplication() conference_application.remove_participant(self.refer_to_uri, self.room_uri) self._refer_request.end(200) else: self._refer_request.reject(488) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) account = DefaultAccount() conference_application = ConferenceApplication() try: room = conference_application.get_room(self.room_uri) except RoomNotFoundError: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return else: active_media = room.active_media if not active_media: log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) self._refer_request.end(500) return if 'audio' in active_media: self.streams.append(AudioStream()) if 'chat' in active_media: self.streams.append(ChatStream()) self.session = Session(account) notification_center.add_observer(self, sender=self.session) original_from_header = self._refer_headers.get('From') if original_from_header.display_name: original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host) else: original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host) from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call') to_header = ToHeader(self.refer_to_uri) transport = notification.data.result[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)) extra_headers = [] if self._refer_headers.get('Referred-By', None) is not None: extra_headers.append(Header.new(self._refer_headers.get('Referred-By'))) else: extra_headers.append(Header('Referred-By', str(original_from_header.uri))) if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) extra_headers.append(Header('X-Originator-From', str(original_from_header.uri))) extra_headers.append(SubjectHeader(u'Join conference request from %s' % original_identity)) route = notification.data.result[0] self.session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=self.streams, is_focus=True, extra_headers=extra_headers) def _NH_DNSLookupDidFail(self, notification): notification.center.remove_observer(self, sender=notification.sender) def _NH_SIPSessionGotRingIndication(self, notification): if self._refer_request is not None: self._refer_request.send_notify(180) def _NH_SIPSessionGotProvisionalResponse(self, notification): if self._refer_request is not None: self._refer_request.send_notify(notification.data.code, notification.data.reason) def _NH_SIPSessionDidStart(self, notification): notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) conference_application = ConferenceApplication() conference_application.add_participant(self.session, self.room_uri) log.msg('Room %s - %s added %s' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri)) self.session = None self.streams = [] def _NH_SIPSessionDidFail(self, notification): log.msg('Room %s - failed to add %s: %s' % (self.room_uri_str, self.refer_to_uri, notification.data.reason)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(notification.data.code or 500, notification.data.reason or notification.data.code) self.session = None self.streams = [] def _NH_SIPSessionDidEnd(self, notification): # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri)) notification.center.remove_observer(self, sender=notification.sender) if self._refer_request is not None: self._refer_request.end(200) self.session = None self.streams = [] def _NH_SIPIncomingReferralDidEnd(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._refer_request = None diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py index 3fffa22..d49ba2e 100644 --- a/sylk/applications/xmppgateway/im.py +++ b/sylk/applications/xmppgateway/im.py @@ -1,451 +1,451 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from collections import deque from eventlib import coros from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Message as SIPMessageRequest from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage -from sylk.extensions import ChatStream from sylk.session import Session +from sylk.streams import ChatStream __all__ = ['ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError'] SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout class ChatSessionHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self): self.started = False self.ended = False self.sip_session = None self.msrp_stream = None self._sip_session_timer = None self.use_receipts = False self.xmpp_session = None self._xmpp_message_queue = deque() self._pending_msrp_chunks = {} self._pending_xmpp_stanzas = {} def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: NotificationCenter().post_notification('ChatSessionDidStart', sender=self) self._send_queued_messages() def _get_started(self): return self.__dict__['started'] started = property(_get_started, _set_started) del _get_started, _set_started def _set_xmpp_session(self, session): self.__dict__['xmpp_session'] = session if session is not None: # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) NotificationCenter().add_observer(self, sender=session) session.start() # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) def _get_xmpp_session(self): return self.__dict__['xmpp_session'] xmpp_session = property(_get_xmpp_session, _set_xmpp_session) del _get_xmpp_session, _set_xmpp_session @classmethod def new_from_sip_session(cls, sip_identity, session): instance = cls() instance.sip_identity = sip_identity instance._start_incoming_sip_session(session) return instance @classmethod def new_from_xmpp_stanza(cls, xmpp_identity, recipient): instance = cls() instance.xmpp_identity = xmpp_identity instance._start_outgoing_sip_session(recipient) return instance @run_in_green_thread def _start_incoming_sip_session(self, session): self.sip_session = session self.msrp_stream = next(stream for stream in session.proposed_streams if stream.type=='chat') notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.accept([self.msrp_stream]) @run_in_green_thread def _start_outgoing_sip_session(self, target_uri): notification_center = NotificationCenter() # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = target_uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='DNS lookup error')) return self.msrp_stream = ChatStream() route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self.sip_session = Session(account) notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self.msrp_stream]) def end(self): if self.ended: return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.cancel() self._sip_session_timer = None notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session.end() self.sip_session = None self.msrp_stream = None if self.xmpp_session is not None: notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session.end() self.xmpp_session = None self.ended = True if self.started: notification_center.post_notification('ChatSessionDidEnd', sender=self) else: notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='Ended before actually started')) def enqueue_xmpp_message(self, message): if self.started: raise RuntimeError('session is already started') self._xmpp_message_queue.append(message) def _send_queued_messages(self): if self._xmpp_message_queue: while self._xmpp_message_queue: message = self._xmpp_message_queue.popleft() if message.body is None: continue if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' sender_uri = message.sender.uri.as_sip_uri() sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8')) sender = CPIMIdentity(sender_uri) self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _inactivity_timeout(self): log.msg("Ending SIP session %s due to inactivity" % self.sip_session._invitation.call_id) self.sip_session.end() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP session %s started" % notification.sender._invitation.call_id) self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout) if self.sip_session.direction == 'outgoing': # Time to set sip_identity and create the XMPPChatSession contact_uri = self.sip_session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = self.sip_session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name) session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) self.xmpp_session = session # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: if self.xmpp_session is not None: # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: # Try to wakeup XMPP clients sender = self.sip_identity tmp = self.sip_session.local_identity.uri recipient_uri = FrozenURI(tmp.user, tmp.host) recipient = Identity(recipient_uri) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, ' ', 'text/plain')) # Send queued messages self._send_queued_messages() def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP session %s ended" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP session %s failed" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self.sip_session) notification.center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.body else: html_body = message.body body = None if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) chunk = notification.data.chunk if self.started: self.xmpp_session.send_message(body, html_body, message_id=chunk.message_id) if self.use_receipts: self._pending_msrp_chunks[chunk.message_id] = chunk else: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') else: sender = self.sip_identity recipient_uri = FrozenURI.parse(message.recipients[0].uri) recipient = Identity(recipient_uri, message.recipients[0].display_name) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, body, html_body)) self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_ChatStreamGotComposingIndication(self, notification): # Notification is sent by the MSRP stream if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) if not self.started: return state = None if notification.data.state == 'active': state = 'composing' elif notification.data.state == 'idle': state = 'paused' if state is not None: self.xmpp_session.send_composing_indication(state) def _NH_ChatStreamDidDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_ChatStreamDidNotDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_error(message, 'TODO', []) # TODO def _NH_XMPPChatSessionDidStart(self, notification): if self.sip_session is not None: # Session is now established on both ends self.started = True def _NH_XMPPChatSessionDidEnd(self, notification): notification.center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session = None self.end() def _NH_XMPPChatSessionGotMessage(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': self._xmpp_message_queue.append(notification.data.message) return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.use_receipts = message.use_receipt if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' self._pending_xmpp_stanzas[message.id] = message # Prefer plaintext self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _NH_XMPPChatSessionGotComposingIndication(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message state = None if message.state == 'composing': state = 'active' elif message.state == 'paused': state = 'idle' if state is not None: sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.msrp_stream.send_composing_indication(state, 30, local_identity=sender) if message.use_receipt: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_XMPPChatSessionDidDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason) def chunks(text, size): for i in xrange(0, len(text), size): yield text[i:i+size] class SIPMessageError(Exception): def __init__(self, code, reason): Exception.__init__(self, reason) self.code = code self.reason = reason class SIPMessageSender(object): implements(IObserver) def __init__(self, message): # TODO: sometimes we may want to send it to the GRUU, for example when a XMPP client # replies to one of our messages. MESSAGE requests don't need a Contact header, though # so how should we communicate our GRUU to the recipient? self.from_uri = message.sender.uri.as_sip_uri() self.from_uri.parameters.pop('gr', None) # No GRUU in From header self.to_uri = message.recipient.uri.as_sip_uri() self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU self.body = message.body self.content_type = 'text/plain' self._requests = set() self._channel = coros.queue() @run_in_waitable_green_thread def send(self): lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: msg = 'DNS lookup error while looking for %s proxy' % uri log.warning(msg) raise SIPMessageError(0, msg) else: route = routes.pop(0) from_header = FromHeader(self.from_uri) to_header = ToHeader(self.to_uri) route_header = RouteHeader(route.uri) notification_center = NotificationCenter() for chunk in chunks(self.body, 1000): request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body) notification_center.add_observer(self, sender=request) self._requests.add(request) request.send() error = None count = len(self._requests) while count > 0: notification = self._channel.wait() if notification.name == 'SIPMessageDidFail': error = (notification.data.code, notification.data.reason) count -= 1 self._requests.clear() if error is not None: raise SIPMessageError(*error) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPMessageDidSucceed(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._channel.send(notification) def _NH_SIPMessageDidFail(self, notification): notification.center.remove_observer(self, sender=notification.sender) self._channel.send(notification) diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py index ed8fac2..9814c20 100644 --- a/sylk/applications/xmppgateway/muc.py +++ b/sylk/applications/xmppgateway/muc.py @@ -1,470 +1,470 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import random import uuid from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute from eventlib import coros, proc from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Engine, SIPURI, SIPCoreError, Referral, sip_status_messages from sipsimple.core import ContactHeader, FromHeader, ToHeader, ReferToHeader, RouteHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams.msrp import ChatStreamError from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from time import time from twisted.internet import reactor from zope.interface import implements from sylk.accounts import DefaultAccount from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.logger import log from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, OutgoingInvitationMessage, STANZAS_NS from sylk.configuration import SIPConfig -from sylk.extensions import ChatStream from sylk.session import Session +from sylk.streams import ChatStream class ReferralError(Exception): def __init__(self, error, code=0): self.error = error self.code = code class SIPReferralDidFail(Exception): def __init__(self, data): self.data = data class MucInvitationFailure(object): def __init__(self, code, reason): self.code = code self.reason = reason def __str__(self): return '%s (%s)' % (self.code, self.reason) class X2SMucInvitationHandler(object): implements(IObserver) def __init__(self, sender, recipient, participant): self.sender = sender self.recipient = recipient self.participant = participant self.active = False self.route = None self._channel = coros.queue() self._referral = None self._failure = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='NetworkConditionsDidChange') proc.spawn(self._run) notification_center.post_notification('X2SMucInvitationHandlerDidStart', sender=self) def _run(self): notification_center = NotificationCenter() settings = SIPSimpleSettings() sender_uri = self.sender.uri.as_sip_uri() recipient_uri = self.recipient.uri.as_sip_uri() participant_uri = self.participant.uri.as_sip_uri() try: # Lookup routes account = DefaultAccount() if account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(recipient_uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise ReferralError(error='DNS lookup failed: %s' % e) timeout = time() + 30 for route in routes: self.route = route remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) refer_to_header = ReferToHeader(str(participant_uri)) refer_to_header.parameters['method'] = 'INVITE' referral = Referral(recipient_uri, FromHeader(sender_uri), ToHeader(recipient_uri), refer_to_header, ContactHeader(contact_uri), RouteHeader(route.uri), account.credentials) notification_center.add_observer(self, sender=referral) try: referral.send_refer(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=referral) timeout = 5 raise ReferralError(error='Internal error') self._referral = referral try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidStart': break except SIPReferralDidFail, e: notification_center.remove_observer(self, sender=referral) self._referral = None if e.data.code in (403, 405): raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code) else: # Otherwise just try the next route continue else: break else: self.route = None raise ReferralError(error='No more routes to try') # At this point it is subscribed. Handle notifications and ending/failures. try: self.active = True while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidEnd': break except SIPReferralDidFail, e: notification_center.remove_observer(self, sender=self._referral) raise ReferralError(error=e.data.reason, code=e.data.code) else: notification_center.remove_observer(self, sender=self._referral) finally: self.active = False except ReferralError, e: self._failure = MucInvitationFailure(e.code, e.error) finally: notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._referral = None if self._failure is not None: notification_center.post_notification('X2SMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure)) else: notification_center.post_notification('X2SMucInvitationHandlerDidEnd', sender=self) def _refresh(self): account = DefaultAccount() transport = self.route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters) contact_header = ContactHeader(contact_uri) self._referral.refresh(contact_header=contact_header, timeout=2) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPReferralDidStart(self, notification): self._channel.send(notification) def _NH_SIPReferralDidEnd(self, notification): self._channel.send(notification) def _NH_SIPReferralDidFail(self, notification): self._channel.send_exception(SIPReferralDidFail(notification.data)) def _NH_SIPReferralGotNotify(self, notification): self._channel.send(notification) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._refresh() class S2XMucInvitationHandler(object): implements(IObserver) def __init__(self, session, sender, recipient, inviter): self.session = session self.sender = sender self.recipient = recipient self.inviter = inviter self._timer = None self._failure = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) stanza = OutgoingInvitationMessage(self.sender, self.recipient, self.inviter, id='MUC.'+uuid.uuid4().hex) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) self._timer = reactor.callLater(90, self._timeout) notification_center.post_notification('S2XMucInvitationHandlerDidStart', sender=self) def stop(self): if self._timer is not None and self._timer.active(): self._timer.cancel() self._timer = None notification_center = NotificationCenter() if self.session is not None: notification_center.remove_observer(self, sender=self.session) reactor.callLater(5, self._end_session, self.session) self.session = None if self._failure is not None: notification_center.post_notification('S2XMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure)) else: notification_center.post_notification('S2XMucInvitationHandlerDidEnd', sender=self) def _end_session(self, session): try: session.end(480) except Exception: pass def _timeout(self): NotificationCenter().remove_observer(self, sender=self.session) try: self.session.end(408) except Exception: pass self.session = None self._failure = MucInvitationFailure('Timeout', 408) self.stop() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidFail(self, notification): notification.center.remove_observer(self, sender=self.session) self.session = None self._failure = MucInvitationFailure(notification.data.reason or notification.data.failure_reason, notification.data.code) self.stop() class X2SMucHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity, nickname): self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.nickname = nickname self._xmpp_muc_session = None self._sip_session = None self._msrp_stream = None self._first_stanza = None self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza self._participants = set() # set of (URI, nickname) tuples self.ended = False def start(self): notification_center = NotificationCenter() self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session.start() notification_center.post_notification('X2SMucHandlerDidStart', sender=self) self._start_sip_session() def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_muc_session is not None: notification_center.remove_observer(self, sender=self._xmpp_muc_session) # Send indication that the user has been kicked from the room sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('307') xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) self._xmpp_muc_session.end() self._xmpp_muc_session = None if self._sip_session is not None: notification_center.remove_observer(self, sender=self._sip_session) self._sip_session.end() self._sip_session = None self.ended = True notification_center.post_notification('X2SMucHandlerDidEnd', sender=self) @run_in_green_thread def _start_sip_session(self): # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = self.sip_identity.uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = DefaultAccount() if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) self.end() return self._msrp_stream = ChatStream() route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self._sip_session = Session(account) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._sip_session) notification_center.add_observer(self, sender=self._msrp_stream) self._sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self._msrp_stream]) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP multiparty session %s started" % notification.sender._invitation.call_id) if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed: self.end() return message_id = self._msrp_stream.set_local_nickname(self.nickname) self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza) self._first_stanza = None def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP multiparty session %s ended" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self._sip_session) notification.center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP multiparty session %s failed" % notification.sender._invitation.call_id) notification.center.remove_observer(self, sender=self._sip_session) notification.center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionNewProposal(self, notification): if notification.data.originator == 'remote': self._sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self._sip_session.reject_transfer(403) def _NH_SIPSessionGotConferenceInfo(self, notification): # Translate to XMPP payload xmpp_manager = XMPPManager() own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host) conference_info = notification.data.conference_info new_participants = set() for user in conference_info.users: user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity) nickname = user.display_text.value if user.display_text else user.entity new_participants.add((user_uri, nickname)) # Remove participants that are no longer in the room for uri, nickname in self._participants - new_participants: sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) xmpp_manager.send_muc_stanza(stanza) # Send presence for current participants for uri, nickname in new_participants: if uri == own_uri: continue sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = Identity(uri) xmpp_manager.send_muc_stanza(stanza) self._participants = new_participants # Send own status last sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('110') xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream if not self._xmpp_muc_session: return message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.body else: html_body = message.body body = None resource = message.sender.display_name or str(message.sender.uri) sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource)) self._xmpp_muc_session.send_message(sender, body, html_body, message_id='MUC.'+uuid.uuid4().hex) self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') def _NH_ChatStreamDidSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) self.nickname = nickname def _NH_ChatStreamDidNotSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', [('conflict', STANZAS_NS)]) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(error_stanza) def _NH_ChatStreamDidDeliverMessage(self, notification): # Echo back the message to the sender stanza = self._pending_messages_map.pop(notification.data.message_id) stanza.sender, stanza.recipient = stanza.recipient, stanza.sender stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamDidNotDeliverMessage(self, notification): self._pending_messages_map.pop(notification.data.message_id) def _NH_XMPPIncomingMucSessionDidEnd(self, notification): notification.center.remove_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session = None self.end() def _NH_XMPPIncomingMucSessionGotMessage(self, notification): if not self._sip_session: return message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri, display_name=self.nickname) message_id = self._msrp_stream.send_message(message.body, 'text/plain', local_identity=sender) self._pending_messages_map[message_id] = message # Message will be echoed back to the sender on ChatStreamDidDeliverMessage def _NH_XMPPIncomingMucSessionChangedNickname(self, notification): if not self._sip_session: return nickname = notification.data.nickname try: message_id = self._msrp_stream.set_local_nickname(nickname) except ChatStreamError: return self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza) diff --git a/sylk/server.py b/sylk/server.py index f44e180..c603357 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,229 +1,230 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import sys from threading import Event from uuid import uuid4 from application import log from application.notification import NotificationCenter from application.python import Null from eventlib import proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer from sipsimple.lookup import DNSManager from sipsimple.storage import MemoryStorage from sipsimple.threading import ThreadManager from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor -# Load extensions needed for integration with SIP SIMPLE SDK -import sylk.extensions +# Load stream extensions needed for integration with SIP SIMPLE SDK +import sylk.streams +del sylk.streams from sylk.accounts import DefaultAccount from sylk.applications import IncomingRequestHandler from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import Logger from sylk.session import SessionManager class SylkServer(SIPApplication): def __init__(self): self.request_handler = Null self.thor_interface = Null self.logger = Logger() self.stopping_event = Event() self.stop_event = Event() def start(self, options): self.options = options if self.options.enable_bonjour: ServerConfig.enable_bonjour = True notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, name='ThorNetworkGotFatalError') Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: super(SylkServer, self).start(MemoryStorage()) except Exception, e: log.fatal("Error starting SIP Application: %s" % e) sys.exit(1) def _initialize_core(self): # SylkServer needs to listen for extra events and request types notification_center = NotificationCenter() settings = SIPSimpleSettings() # initialize core options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP detect_sip_loops=True, udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # video video_codecs=list(settings.rtp.video_codec_list), # logging log_level=settings.logs.pjsip_level if settings.logs.trace_pjsip else 0, trace_sip=settings.logs.trace_sip, # events and requests to handle events={'conference': ['application/conference-info+xml'], 'presence': ['application/pidf+xml'], 'refer': ['message/sipfrag;version=2.0']}, incoming_events=set(['conference', 'presence']), incoming_requests=set(['MESSAGE'])) notification_center.add_observer(self, sender=self.engine) self.engine.start(**options) @run_in_green_thread def _initialize_subsystems(self): account_manager = AccountManager() dns_manager = DNSManager() notification_center = NotificationCenter() session_manager = SessionManager() settings = SIPSimpleSettings() notification_center.post_notification('SIPApplicationWillStart', sender=self) if self.state == 'stopping': reactor.stop() return # Initialize default account default_account = DefaultAccount() account_manager.default_account = default_account # initialize TLS self._initialize_tls() # initialize PJSIP internal resolver self.engine.set_nameservers(dns_manager.nameservers) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize instance id settings.instance_id = uuid4().urn settings.save() # initialize middleware components dns_manager.start() account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self) # start SylkServer components if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode self.thor_interface = ConferenceNode() self.request_handler = IncomingRequestHandler() self.request_handler.start() @run_in_green_thread def _shutdown_subsystems(self): # shutdown SylkServer components procs = [proc.spawn(self.request_handler.stop), proc.spawn(self.thor_interface.stop)] proc.waitall(procs) # shutdown middleware components dns_manager = DNSManager() account_manager = AccountManager() session_manager = SessionManager() procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(session_manager.stop)] proc.waitall(procs) # shutdown engine self.engine.stop() self.engine.join() # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _NH_AudioDevicesDidChange(self, notification): pass def _NH_DefaultAudioDeviceDidChange(self, notification): pass def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal("Couldn't set TLS options: %s" % notification.data.error) def _NH_SIPApplicationWillStart(self, notification): self.logger.start() settings = SIPSimpleSettings() if settings.logs.trace_sip and self.logger._siptrace_filename is not None: log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename) if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None: log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename) if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None: log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename) if settings.logs.trace_notifications and self.logger._notifications_filename is not None: log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename) def _NH_SIPApplicationDidStart(self, notification): settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.msg("SylkServer started, listening on:") for transport in settings.sip.transport_list: try: log.msg("%s:%d (%s)" % (local_ip, getattr(self.engine, '%s_port' % transport), transport.upper())) except TypeError: pass def _NH_SIPApplicationWillEnd(self, notification): log.msg('SIP application will end: %s' % self.end_reason) self.stopping_event.set() def _NH_SIPApplicationDidEnd(self, notification): log.msg('SIP application ended') self.logger.stop() if not self.stopping_event.is_set(): log.warning('SIP application ended without shutting down all subsystems') self.stopping_event.set() self.stop_event.set() def _NH_SIPEngineGotException(self, notification): log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback) def _NH_SIPEngineDidFail(self, notification): log.error('SIP engine failed') super(SylkServer, self)._NH_SIPEngineDidFail(notification) def _NH_ThorNetworkGotFatalError(self, notification): log.error("All Thor Event Servers have unrecoverable errors.") diff --git a/sylk/extensions.py b/sylk/streams.py similarity index 100% rename from sylk/extensions.py rename to sylk/streams.py