diff --git a/debian/control b/debian/control index a4af4a2..42d182f 100644 --- a/debian/control +++ b/debian/control @@ -1,34 +1,34 @@ Source: sylkserver Section: net Priority: optional Maintainer: Saul Ibarra Uploaders: Dan Pascu , Adrian Georgescu Build-Depends: debhelper (>= 7.3.5), python-all (>= 2.5) Standards-Version: 3.9.2 Package: sylkserver Architecture: all -Depends: ${python:Depends}, ${misc:Depends}, python-application (>= 1.2.9), python-backports, python-eventlet-0.8, python-sipsimple (>= 0.20.0), python-sqlobject (>= 0.12.4), python-twisted-web, python-twisted-words, python-wokkel (>= 0.7.0) +Depends: ${python:Depends}, ${misc:Depends}, python-application (>= 1.2.9), python-backports, python-eventlib, python-sipsimple (>= 0.20.0), python-sqlobject (>= 0.12.4), python-twisted-web, python-twisted-words, python-wokkel (>= 0.7.0) Suggests: python-mysqldb Recommends: sylkserver-sounds Description: A state of the art, extensible SIP Application Server SylkServer allows creation and delivery of rich multimedia applications accessed by SIP User Agents. The server supports SIP signaling over TLS, TCP and UDP transports, RTP and MSRP media planes, has built in capabilities for creating ad-hoc SIP multimedia conferences with HD Audio, IM and File Transfer and can be easily extended with other applications by using Python programming language. Package: sylkserver-sounds Architecture: all Depends: ${misc:Depends}, sylkserver Description: A state of the art, extensible SIP Application Server SylkServer allows creation and delivery of rich multimedia applications accessed by SIP User Agents. The server supports SIP signaling over TLS, TCP and UDP transports, RTP and MSRP media planes, has built in capabilities for creating ad-hoc SIP multimedia conferences with HD Audio, IM and File Transfer and can be easily extended with other applications by using Python programming language. . This package contains sounds used by SylkServer. diff --git a/sylk/applications/conference/database.py b/sylk/applications/conference/database.py index 2f828b1..fde82f5 100644 --- a/sylk/applications/conference/database.py +++ b/sylk/applications/conference/database.py @@ -1,74 +1,74 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # __all__ = ['async_save_message', 'get_last_messages'] import os import datetime import time from application.python import Null -from eventlet.twistedutil import block_on +from eventlib.twistedutil import block_on from sqlobject import SQLObject, DateTimeCol, UnicodeCol from twisted.internet.threads import deferToThread from sylk.applications import ApplicationLogger from sylk.applications.conference.configuration import ConferenceConfig from sylk.database import Database db = Database(ConferenceConfig.db_uri) log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) class MessageHistory(SQLObject): class sqlmeta: table = ConferenceConfig.history_table _connection = db.connection date = DateTimeCol() room_uri = UnicodeCol() sip_from = UnicodeCol() cpim_body = UnicodeCol(sqlType='LONGTEXT') cpim_content_type = UnicodeCol() cpim_sender = UnicodeCol() cpim_recipient = UnicodeCol() cpim_timestamp = DateTimeCol() db.create_table(MessageHistory) def _save_message(sip_from, room_uri, cpim_body, cpim_content_type, cpim_sender, cpim_recipient, cpim_timestamp): for x in xrange(0, 3): try: return MessageHistory(date = datetime.datetime.utcnow(), room_uri = room_uri, sip_from = sip_from, cpim_body = cpim_body, cpim_content_type = cpim_content_type, cpim_sender = cpim_sender, cpim_recipient = cpim_recipient, cpim_timestamp = cpim_timestamp) except Exception, e: time.sleep(0.2) log.error('Could not save DB records: %s' % e) return def async_save_message(sip_from, room_uri, cpim_body, cpim_content_type, cpim_sender, cpim_recipient, cpim_timestamp): if db.connection is not Null: return deferToThread(_save_message, sip_from, room_uri, cpim_body, cpim_content_type, cpim_sender, cpim_recipient, cpim_timestamp) def _get_last_messages(room_uri, count): for x in xrange(0, 3): try: return list(MessageHistory.selectBy(room_uri=room_uri)[-count:]) except Exception, e: time.sleep(0.2) log.error('Could not get DB records: %s' % e) return [] def get_last_messages(room_uri, count): if db.connection is not Null and count > 0: return block_on(deferToThread(_get_last_messages, room_uri, count)) else: return [] diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py index c713d56..955cebf 100644 --- a/sylk/applications/conference/room.py +++ b/sylk/applications/conference/room.py @@ -1,1148 +1,1148 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import hashlib import os import random import re import shutil import string import weakref from collections import defaultdict from datetime import datetime from glob import glob from itertools import chain, cycle from time import mktime try: from weakref import WeakSet except ImportError: from backports.weakref import WeakSet from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.system import makedirs -from eventlet import api, coros, proc +from eventlib import api, coros, proc from sipsimple.account import AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import WavePlayer, WavePlayerError from sipsimple.conference import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPCoreError, SIPCoreInvalidStateError, SIPURI from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import conference from sipsimple.streams import FileTransferStream from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.streams.msrp import ChatStreamError, FileSelector from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.conference import database from sylk.applications.conference.configuration import ConferenceConfig, URL from sylk.bonjour import BonjourServices from sylk.configuration import SIPConfig, ThorNodeConfig from sylk.configuration.datatypes import ResourcePath from sylk.session import ServerSession log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) def format_identity(identity, cpim_format=False): uri = identity.uri if identity.display_name: return u'%s ' % (identity.display_name, uri.user, uri.host) elif cpim_format: return u'' % (uri.user, uri.host) else: return u'sip:%s@%s' % (uri.user, uri.host) class ScreenImage(object): def __init__(self, room, sender): self.room = weakref.ref(room) self.sender = sender self.filename = os.path.join(ConferenceConfig.screen_sharing_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10)))) from sylk.applications.conference import ConferenceApplication port = ConferenceApplication().screen_sharing_web_server.port scheme = 'https' if ConferenceConfig.screen_sharing_use_https else 'http' self.url = URL('%s://%s:%s/' % (scheme, ConferenceConfig.screen_sharing_ip, port)) self.url.query_items['image'] = os.path.join(room.uri, os.path.basename(self.filename)) self.state = None self.timer = None @property def active(self): return self.state == 'active' @property def idle(self): return self.state == 'idle' @run_in_thread('file-io') def save(self, image): makedirs(os.path.dirname(self.filename)) tmp_filename = self.filename + '.tmp' try: with open(tmp_filename, 'wb') as file: file.write(image) except EnvironmentError, e: log.msg('Cannot write screen sharing image: %s: %s' % (self.filename, e)) else: try: os.rename(tmp_filename, self.filename) except EnvironmentError: pass self.advertise() @run_in_twisted_thread def advertise(self): if self.state == 'active': self.timer.reset(10) else: if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'active' self.timer = reactor.callLater(10, self.stop_advertising) room = self.room() or Null room.dispatch_conference_info() txt = '%s is sharing the screen at %s' % (format_identity(self.sender, cpim_format=True), self.url) room.dispatch_server_message(txt) log.msg(txt) @run_in_twisted_thread def stop_advertising(self): if self.state != 'idle': if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'idle' self.timer = None room = self.room() or Null room.dispatch_conference_info() txt = '%s stopped sharing the screen' % format_identity(self.sender, cpim_format=True) room.dispatch_server_message(txt) log.msg(txt) class Room(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ implements(IObserver) def __init__(self, uri): self.uri = uri self.identity = CPIMIdentity.parse('' % self.uri) self.identity.display_name = 'Conference Room' self.files = [] self.screen_images = {} self.sessions = [] self.sessions_with_proposals = {} self.subscriptions = [] self.transfer_handlers = WeakSet() self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.moh_player = None self.conference_info_payload = None self.bonjour_services = Null() self.session_nickname_map = {} self.last_nicknames_map = {} self.participants_counter = defaultdict(lambda: 0) @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' @property def stopping(self): return self.state in ('stopping', 'stopped') @property def active_media(self): return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams)))) def start(self): if self.started: return from sylk.applications.conference import ConferenceApplication if ConferenceApplication().bonjour_services is not Null: room_user = self.identity.uri.user self.bonjour_services = BonjourServices(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user) self.bonjour_services.start() self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.moh_player = MoHPlayer(self.audio_conference) self.moh_player.start() self.state = 'started' def stop(self): if not self.started: return self.state = 'stopping' self.bonjour_services.stop() self.bonjour_services = None self.incoming_message_queue.send_exception(api.GreenletExit) self.incoming_message_queue = None self.message_dispatcher.kill(proc.ProcExit) self.message_dispatcher = None self.moh_player.stop() self.moh_player = None self.audio_conference = None [handler.stop() for handler in self.transfer_handlers] notification_center = NotificationCenter() for subscription in self.subscriptions: notification_center.remove_observer(self, sender=subscription) subscription.end() self.subscriptions = [] self.cleanup_files() self.conference_info_payload = None self.state = 'stopped' @run_in_thread('file-io') def cleanup_files(self): path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass path = os.path.join(ConferenceConfig.screen_sharing_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'message': message = data.message if message.sender.uri != session.remote_identity.uri: return if message.timestamp is not None: value = message.timestamp timestamp = datetime(value.year, value.month, value.day, value.hour, value.minute, value.second, value.microsecond, value.tzinfo) else: timestamp = datetime.utcnow() recipient = message.recipients[0] sender = message.sender sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), sender.display_name) message.sender = sender database.async_save_message(format_identity(session.remote_identity, True), self.uri, message.body, message.content_type, unicode(message.sender), unicode(recipient), timestamp) private = len(message.recipients) == 1 and recipient.uri != self.identity.uri if private: self.dispatch_private_message(session, message) else: self.dispatch_message(session, message) elif message_type == 'composing_indication': if data.sender.uri != session.remote_identity.uri: return recipient = data.recipients[0] private = len(data.recipients) == 1 and recipient.uri != self.identity.uri if private: self.dispatch_private_iscomposing(session, data) else: self.dispatch_iscomposing(session, data) def dispatch_message(self, session, message): for s in (s for s in self.sessions if s is not session): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: pass else: try: chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers) except ChatStreamError, e: log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_private_message(self, session, message): # Private messages are delivered to all sessions matching the recipient but also to the sender, # for replication in clients recipient = message.recipients[0] for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: continue else: try: chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers) except ChatStreamError, e: log.error(u'Error dispatching private message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_iscomposing(self, session, data): for s in (s for s in self.sessions if s is not session): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: pass else: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) try: chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity, recipients=[self.identity]) except ChatStreamError, e: log.error(u'Error dispatching composing indication to %s: %s' % (s.remote_identity.uri, e)) def dispatch_private_iscomposing(self, session, data): recipient_uri = data.recipients[0].uri for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: continue else: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) try: chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity) except ChatStreamError, e: log.error(u'Error dispatching private composing indication to %s: %s' % (s.remote_identity.uri, e)) def dispatch_server_message(self, body, content_type='text/plain', exclude=None): for session in (session for session in self.sessions if session is not exclude): try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: chat_stream.send_message(body, content_type, local_identity=self.identity, recipients=[self.identity]) self_identity = format_identity(self.identity, cpim_format=True) database.async_save_message(self_identity, self.uri, body, content_type, self_identity, self_identity, datetime.now()) def dispatch_conference_info(self): data = self.build_conference_info_payload() for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(conference.ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def dispatch_file(self, file): sender_uri = CPIMIdentity.parse(file.sender).uri for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)): handler = OutgoingFileTransferHandler(self, uri, file) self.transfer_handlers.add(handler) handler.start() def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] += 1 try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = (stream for stream in session.streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) try: transfer_stream = (stream for stream in session.streams if stream.type == 'file-transfer').next() except StopIteration: pass else: if transfer_stream.direction == 'recvonly': transfer_handler = IncomingFileTransferHandler(self, session) transfer_handler.start() txt = u'%s is uploading file %s (%s)' % (format_identity(session.remote_identity, cpim_format=True), transfer_stream.file_selector.name.decode('utf-8'), self.format_file_size(transfer_stream.file_selector.size)) else: transfer_handler = OutgoingFileTransferRequestHandler(self, session) transfer_handler.start() txt = u'%s requested file %s' % (format_identity(session.remote_identity, cpim_format=True), transfer_stream.file_selector.name.decode('utf-8')) log.msg(txt) self.dispatch_server_message(txt) if len(session.streams) == 1: return welcome_handler = WelcomeHandler(self, session) welcome_handler.start() self.dispatch_conference_info() if len(self.sessions) == 1: log.msg(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, self.format_stream_types(session.streams))) else: log.msg(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, self.format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session) def remove_session(self, session): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.session_nickname_map.pop(session, None) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] -= 1 if self.participants_counter[remote_uri] <= 0: del self.participants_counter[remote_uri] self.last_nicknames_map.pop(remote_uri, None) try: timer = self.sessions_with_proposals.pop(session) except KeyError: pass else: if timer.active(): timer.cancel() try: chat_stream = (stream for stream in session.streams or [] if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = (stream for stream in session.streams or [] if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() try: transfer_stream = (stream for stream in session.streams if stream.type == 'file-transfer').next() except StopIteration: pass else: if len(session.streams) == 1: return self.dispatch_conference_info() log.msg(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, self.format_session_duration(session))) if not self.sessions: log.msg(u'Last participant left conference %s' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session))) def terminate_sessions(self, uri): if not self.started: return for session in (session for session in self.sessions if session.remote_identity.uri == uri): session.end() def build_conference_info_payload(self): if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent) host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com')) self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users()) user_count = len(self.participants_counter.keys()) self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True) users = conference.Users() for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')): try: user = (user for user in users if user.entity == str(session.remote_identity.uri)).next() except StopIteration: display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name) if self.bonjour_services is Null: user = conference.User(str(session.remote_identity.uri), display_text=display_text) else: user = conference.User(str(session._invitation.remote_contact_header.uri), display_text=display_text) user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host) screen_image = self.screen_images.get(user_uri, None) if screen_image is not None and screen_image.active: user.screen_image_url = screen_image.url users.add(user) joining_info = conference.JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected') display_text = self.session_nickname_map.get(session, session.remote_identity.display_name) endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status) for stream in session.streams: if stream.type == 'file-transfer': continue endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream))) user.add(endpoint) self.conference_info_payload.users = users if self.files: files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, file.status) for file in self.files) self.conference_info_payload.conference_description.resources = conference.Resources(files=files) return self.conference_info_payload.toxml() def handle_incoming_subscription(self, subscribe_request, data): if subscribe_request.event != 'conference': subscribe_request.reject(489) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=subscribe_request) data = self.build_conference_info_payload() subscribe_request.accept(conference.ConferenceDocument.content_type, data) self.subscriptions.append(subscribe_request) def accept_proposal(self, session, streams): self.sessions_with_proposals.pop(session) session.accept_proposal(streams) def add_file(self, file): if file.status == 'INCOMPLETE': self.dispatch_server_message('%s has cancelled upload of file %s (%s)' % (file.sender, os.path.basename(file.name), self.format_file_size(file.size))) else: self.dispatch_server_message('%s has uploaded file %s (%s)' % (file.sender, os.path.basename(file.name), self.format_file_size(file.size))) self.files.append(file) self.dispatch_conference_info() if ConferenceConfig.push_file_transfer: self.dispatch_file(file) def add_screen_image(self, sender, image): sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host) screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender)) screen_image.save(image) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_AudioStreamDidTimeout(self, notification): stream = notification.sender session = stream._session log.msg(u'Audio stream for session %s timed out' % format_identity(session.remote_identity)) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session message = data.message content_type = message.content_type.lower() if content_type.startswith('text/'): self.incoming_message_queue.send((session, 'message', data)) elif content_type == 'application/blink-screensharing': self.add_screen_image(message.sender, message.body) def _NH_ChatStreamGotComposingIndication(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session self.incoming_message_queue.send((session, 'composing_indication', data)) def _NH_ChatStreamGotNicknameRequest(self, notification): nickname = notification.data.nickname session = notification.sender.session chunk = notification.data.chunk if nickname: if nickname in self.session_nickname_map.values() and self.session_nickname_map[session] != nickname: notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use') return self.session_nickname_map[session] = nickname self.last_nicknames_map[str(session.remote_identity.uri)] = nickname else: self.session_nickname_map.pop(session, None) self.last_nicknames_map.pop(str(session.remote_identity.uri), None) notification.sender.accept_nickname(chunk) self.dispatch_conference_info() def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification_center = NotificationCenter() notification_center.remove_observer(self, sender=subscription) try: self.subscriptions.remove(subscription) except ValueError: pass def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.msg(u'%s has put the audio session on hold' % format_identity(session.remote_identity)) else: log.msg(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity)) self.dispatch_conference_info() def _NH_SIPSessionGotProposal(self, notification): session = notification.sender audio_streams = [stream for stream in notification.data.streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] timer = reactor.callLater(4, self.accept_proposal, session, streams) self.sessions_with_proposals[session] = timer def _NH_SIPSessionGotRejectProposal(self, notification): session = notification.sender try: timer = self.sessions_with_proposals.pop(session) except KeyError: # If the proposal couldn't be accepted by us we will not add a timer pass else: if timer.active(): timer.cancel() def _NH_SIPSessionDidRenegotiateStreams(self, notification): notification_center = NotificationCenter() session = notification.sender streams = notification.data.streams if notification.data.action == 'add': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) log.msg(u'%s has added chat to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) log.msg(u'%s has added audio to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added audio' % format_identity(session.remote_identity), exclude=session) welcome_handler = WelcomeHandler(self, session) welcome_handler.start(welcome_prompt=False) elif notification.data.action == 'remove': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) log.msg(u'%s has removed chat from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() log.msg(u'%s has removed audio from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed audio' % format_identity(session.remote_identity), exclude=session) if not session.streams: log.msg(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri)) session.end() self.dispatch_conference_info() def _NH_SIPSessionTransferNewIncoming(self, notification): notification.sender.reject_transfer(403) @staticmethod def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt @staticmethod def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type @staticmethod def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text @staticmethod def format_file_size(size): infinite = float('infinity') boundaries = [( 1024, '%d bytes', 1), ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] for boundary, format, divisor in boundaries: if size < boundary: return format % (size/divisor,) else: return "%d bytes" % size class MoHPlayer(object): implements(IObserver) def __init__(self, conference): self.conference = conference self.files = None self.paused = None self._player = None def start(self): files = glob('%s/*.wav' % ResourcePath('sounds/moh').normalized) if not files: log.error(u'No files found, MoH is disabled') return random.shuffle(files) self.files = cycle(files) self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_play=False, volume=20) self.conference.bridge.add(self._player) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._player) def stop(self): if self._player is None: return notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._player) self._player.stop() self.conference.bridge.remove(self._player) self.conference = None def play(self): if self._player is not None and self.paused: self.paused = False self._play_next_file() log.msg(u'Started playing music on hold') def pause(self): if self._player is not None and not self.paused: self.paused = True self._player.stop() log.msg(u'Stopped playing music on hold') def _play_next_file(self): self._player.filename = self.files.next() self._player.play() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_WavePlayerDidFail(self, notification): if not self.paused: self._play_next_file() def _NH_WavePlayerDidEnd(self, notification): if not self.paused: self._play_next_file() class InterruptWelcome(Exception): pass class WelcomeHandler(object): implements(IObserver) def __init__(self, room, session): self.room = room self.session = session self.procs = proc.RunningProcSet() @run_in_green_thread def start(self, welcome_prompt=True): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) self.procs.spawn(self.play_audio_welcome, welcome_prompt) self.procs.spawn(self.render_chat_welcome, welcome_prompt) self.procs.waitall() notification_center.remove_observer(self, sender=self.session) self.session = None self.room = None def play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() except WavePlayerError, e: log.warning(u"Error playing file %s: %s" % (file, e)) def play_audio_welcome(self, welcome_prompt): try: audio_stream = (stream for stream in self.session.streams if stream.type == 'audio').next() except StopIteration: return try: player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_play=False, volume=50) audio_stream.bridge.add(player) if welcome_prompt: file = ResourcePath('sounds/co_welcome_conference.wav').normalized self.play_file_in_player(player, file, 1) user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(self.session.remote_identity.uri)])) if user_count == 0: file = ResourcePath('sounds/co_only_one.wav').normalized self.play_file_in_player(player, file, 0.5) elif user_count == 1: file = ResourcePath('sounds/co_there_is.wav').normalized self.play_file_in_player(player, file, 0.5) elif user_count < 100: file = ResourcePath('sounds/co_there_are.wav').normalized self.play_file_in_player(player, file, 0.2) if user_count <= 24: file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized self.play_file_in_player(player, file, 0.1) else: file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized self.play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized self.play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/co_more_participants.wav').normalized self.play_file_in_player(player, file, 0) file = ResourcePath('sounds/connected_tone.wav').normalized self.play_file_in_player(player, file, 0.1) audio_stream.bridge.remove(player) except InterruptWelcome: try: audio_stream.bridge.remove(player) except ValueError: pass else: self.room.audio_conference.add(audio_stream) self.room.audio_conference.unhold() if len(self.room.audio_conference.streams) == 1: self.room.moh_player.play() else: self.room.moh_player.pause() def render_chat_welcome_prompt(self): txt = 'Welcome to the conference.' user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions) - set([str(self.session.remote_identity.uri)])) if user_count == 0: txt += ' You are the first participant in the room.' else: if user_count == 1: txt += ' There is one more participant in the room.' else: txt += ' There are %s more participants in the room.' % user_count return txt def render_chat_welcome(self, welcome_prompt): try: chat_stream = (stream for stream in self.session.streams if stream.type == 'chat').next() except StopIteration: return try: #welcome_prompt = self.render_chat_welcome_prompt() #chat_stream.send_message(welcome_prompt, 'text/plain', local_identity=self.room.identity, recipients=[self.room.identity]) remote_identity = CPIMIdentity.parse(format_identity(self.session.remote_identity, cpim_format=True)) for msg in database.get_last_messages(self.room.uri, ConferenceConfig.replay_history): recipient = CPIMIdentity.parse(msg.cpim_recipient) sender = CPIMIdentity.parse(msg.cpim_sender) if recipient.uri in (self.room.identity.uri, remote_identity.uri) or sender.uri == remote_identity.uri: chat_stream.send_message(msg.cpim_body, msg.cpim_content_type, local_identity=sender, recipients=[recipient], timestamp=msg.cpim_timestamp) except InterruptWelcome: pass def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): self.procs.killall(InterruptWelcome) class RoomFile(object): def __init__(self, name, hash, size, sender, status): self.name = name self.hash = hash self.size = size self.sender = sender self.status = status @property def file_selector(self): return FileSelector.for_file(self.name.encode('utf-8'), hash=self.hash) class IncomingFileTransferHandler(object): implements(IObserver) def __init__(self, room, session): self.room = weakref.ref(room) self.room_uri = room.uri self.session = session self.stream = (stream for stream in self.session.streams if stream.type == 'file-transfer' and stream.direction == 'recvonly').next() self.error = False self.ended = False self.file = None self.file_selector = None self.filename = None self.hash = None self.status = None self.timer = None self.transfer_finished = False def start(self): self.file_selector = self.stream.file_selector path = os.path.join(ConferenceConfig.file_transfer_dir, self.room_uri) makedirs(path) self.filename = filename = os.path.join(path, self.file_selector.name.decode('utf-8')) basename, ext = os.path.splitext(filename) i = 1 while os.path.exists(filename): filename = '%s_%d%s' % (basename, i, ext) i += 1 self.filename = filename try: self.file = open(self.filename, 'wb') except EnvironmentError: log.msg('Cannot write destination filename: %s' % self.filename) self.session.end() return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) self.hash = hashlib.sha1() @run_in_thread('file-transfer') def write_chunk(self, data): notification_center = NotificationCenter() if data is not None: try: self.file.write(data) except EnvironmentError, e: notification_center.post_notification('IncomingFileTransferHandlerGotError', sender=self, data=NotificationData(error=str(e))) else: self.hash.update(data) else: self.file.close() if self.error: notification_center.post_notification('IncomingFileTransferHandlerDidFail', sender=self) else: notification_center.post_notification('IncomingFileTransferHandlerDidEnd', sender=self) @run_in_thread('file-io') def remove_bogus_file(self, filename): try: os.unlink(filename) except OSError: pass @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidEnd(self, notification): self.ended = True if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) # Mark end of write operation self.write_chunk(None) def _NH_FileTransferStreamGotChunk(self, notification): self.write_chunk(notification.data.content) def _NH_FileTransferStreamDidFinish(self, notification): self.transfer_finished = True if self.timer is None: self.timer = reactor.callLater(5, self.session.end) def _NH_IncomingFileTransferHandlerGotError(self, notification): log.error('Error while handling incoming file transfer: %s' % notification.data.error) self.error = True self.status = notification.data.error if not self.ended and self.timer is None: self.timer = reactor.callLater(5, self.session.end) def _NH_IncomingFileTransferHandlerDidEnd(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self) remote_hash = self.file_selector.hash if not self.transfer_finished: log.msg('File transfer of %s cancelled' % os.path.basename(self.filename)) self.remove_bogus_file(self.filename) self.status = 'INCOMPLETE' else: local_hash = 'sha1:' + ':'.join(re.findall(r'..', self.hash.hexdigest().upper())) if local_hash != remote_hash: log.warning('Hash of transferred file does not match the remote hash (file may have changed).') self.status = 'Hash missmatch' self.remove_bogus_file(self.filename) else: self.status = 'OK' file = RoomFile(self.filename, remote_hash, self.file_selector.size, format_identity(self.session.remote_identity, cpim_format=True), self.status) room = self.room() or Null room.add_file(file) self.session = None self.stream = None def _NH_IncomingFileTransferHandlerDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self) file = RoomFile(self.filename, self.file_selector.hash, self.file_selector.size, format_identity(self.session.remote_identity, cpim_format=True), self.status) room = self.room() or Null room.add_file(file) self.session = None self.stream = None class OutgoingFileTransferRequestHandler(object): implements(IObserver) def __init__(self, room, session): self.room = weakref.ref(room) self.session = session self.stream = (stream for stream in self.session.streams if stream.type == 'file-transfer').next() self.timer = None def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_FileTransferStreamDidFinish(self, notification): if self.timer is None: self.timer = reactor.callLater(2, self.session.end) def _NH_SIPSessionDidEnd(self, notification): if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) self.session = None self.stream = None _NH_SIPSessionDidFail = _NH_SIPSessionDidEnd class InterruptFileTransfer(Exception): pass class OutgoingFileTransferHandler(object): implements(IObserver) def __init__(self, room, destination, file): self.room_uri = room.identity.uri self.destination = destination self.file = file self.session = None self.stream = None self.timer = None @run_in_green_thread def start(self): self.greenlet = api.getcurrent() settings = SIPSimpleSettings() account = AccountManager().sylkserver_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI.new(self.destination) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: return notification_center = NotificationCenter() self.session = ServerSession(account) self.stream = FileTransferStream(account, self.file.file_selector, 'sendonly') notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) subject = u'File uploaded by %s' % self.file.sender from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference File Transfer') to_header = ToHeader(SIPURI.new(self.destination)) transport = routes[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters)) extra_headers = [] if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) originator_uri = CPIMIdentity.parse(self.file.sender).uri extra_headers.append(Header('X-Originator-From', str(originator_uri))) self.session.connect(from_header, to_header, contact_header=contact_header, routes=routes, streams=[self.stream], is_focus=True, subject=subject, extra_headers=extra_headers) def stop(self): if self.session is not None: self.session.end() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_FileTransferStreamDidFinish(self, notification): if self.timer is None: self.timer = reactor.callLater(2, self.session.end) def _NH_SIPSessionDidEnd(self, notification): if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) self.session = None self.stream = None _NH_SIPSessionDidFail = _NH_SIPSessionDidEnd diff --git a/sylk/applications/ircconference/room.py b/sylk/applications/ircconference/room.py index 372c9c5..f5051da 100644 --- a/sylk/applications/ircconference/room.py +++ b/sylk/applications/ircconference/room.py @@ -1,635 +1,635 @@ # Copyright (C) 2011 AG Projects. See LICENSE for details. # import random import urllib from application import log from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton -from eventlet import coros, proc +from eventlib import coros, proc from sipsimple.audio import WavePlayer, WavePlayerError from sipsimple.conference import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError, SIPCoreInvalidStateError from sipsimple.payloads.conference import Conference, ConferenceDocument, ConferenceDescription, ConferenceState, Endpoint, EndpointStatus, HostInfo, JoiningInfo, Media, User, Users, WebPage from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.streams.msrp import ChatStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from twisted.internet import protocol, reactor from twisted.words.protocols import irc from zope.interface import implements from sylk.applications.ircconference.configuration import get_room_configuration from sylk.configuration.datatypes import ResourcePath def format_identity(identity, cpim_format=False): uri = identity.uri if identity.display_name: return u'%s ' % (identity.display_name, uri.user, uri.host) elif cpim_format: return u'' % (uri.user, uri.host) else: return u'sip:%s@%s' % (uri.user, uri.host) def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type class IRCMessage(object): def __init__(self, username, uri, body, content_type='text/plain'): self.sender = CPIMIdentity(uri, display_name=username) self.body = body self.content_type = content_type class IRCRoom(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ __metaclass__ = Singleton implements(IObserver) def __init__(self, uri): self.uri = uri self.identity = CPIMIdentity.parse('' % self.uri) self.sessions = [] self.sessions_with_proposals = [] self.subscriptions = [] self.pending_messages = [] self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.conference_info_payload = None self.irc_connector = None self.irc_protocol = None @classmethod def get_room(cls, uri): room_uri = '%s@%s' % (uri.user, uri.host) room = cls(room_uri) return room @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' def start(self): if self.state != 'stopped': return config = get_room_configuration(self.uri.split('@')[0]) factory = IRCBotFactory(config) host, port = config.server self.irc_connector = reactor.connectTCP(host, port, factory) NotificationCenter().add_observer(self, sender=self.irc_connector.factory) self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.state = 'started' def stop(self): if self.state != 'started': return self.state = 'stopped' NotificationCenter().remove_observer(self, sender=self.irc_connector.factory) self.irc_connector.factory.stop_requested = True self.irc_connector.disconnect() self.irc_connector = None self.message_dispatcher.kill(proc.ProcExit) self.moh_player = None self.audio_conference = None def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'msrp_message': if data.sender.uri != session.remote_identity.uri: return self.dispatch_message(session, data) elif message_type == 'irc_message': self.dispatch_irc_message(data) def dispatch_message(self, session, message): for s in (s for s in self.sessions if s is not session): try: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: pass else: try: chat_stream.send_message(message.body, message.content_type, local_identity=identity, recipients=[self.identity], timestamp=message.timestamp) except ChatStreamError, e: log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_irc_message(self, message): for session in self.sessions: try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: try: chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[self.identity]) except ChatStreamError, e: log.error(u'Error dispatching message to %s: %s' % (session.remote_identity.uri, e)) def dispatch_server_message(self, body, content_type='text/plain', exclude=None): for session in (session for session in self.sessions if session is not exclude): try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: try: chat_stream.send_message(body, content_type, local_identity=self.identity, recipients=[self.identity]) except ChatStreamError, e: log.error(u'Error dispatching message to %s: %s' % (session.remote_identity.uri, e)) def get_conference_info(self): # Send request to get participants list, we'll get a notification with it if self.irc_protocol is not None: self.irc_protocol.get_participants() else: self.dispatch_conference_info([]) def dispatch_conference_info(self, irc_participants): data = self.build_conference_info_payload(irc_participants) for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def build_conference_info_payload(self, irc_participants): irc_configuration = get_room_configuration(self.uri.split('@')[0]) if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = ConferenceDescription(display_text='#%s on %s' % (irc_configuration.channel, irc_configuration.server[0]), free_text='Hosted by %s' % settings.user_agent) host_info = HostInfo(web_page=WebPage(irc_configuration.website)) self.conference_info_payload = Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=Users()) user_count = len(set(str(s.remote_identity.uri) for s in self.sessions)) + len(irc_participants) self.conference_info_payload.conference_state = ConferenceState(user_count=user_count, active=True) users = Users() for session in self.sessions: try: user = (user for user in users if user.entity == str(session.remote_identity.uri)).next() except StopIteration: user = User(str(session.remote_identity.uri), display_text=session.remote_identity.display_name) users.add(user) joining_info = JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = EndpointStatus('on-hold' if session_on_hold else 'connected') endpoint = Endpoint(str(session._invitation.remote_contact_header.uri), display_text=session.remote_identity.display_name, joining_info=joining_info, status=hold_status) for stream in session.streams: endpoint.add(Media(id(stream), media_type=format_conference_stream_type(stream))) user.add(endpoint) for nick in irc_participants: irc_uri = '%s@%s' % (urllib.quote(nick), irc_configuration.server[0]) user = User(irc_uri, display_text=nick) users.add(user) endpoint = Endpoint(irc_uri, display_text=nick) endpoint.add(Media(random.randint(100000000, 999999999), media_type='message')) user.add(endpoint) self.conference_info_payload.users = users return self.conference_info_payload.toxml() def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = (stream for stream in session.streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) self.play_audio_welcome(session) self.get_conference_info() if len(self.sessions) == 1: log.msg(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams))) else: log.msg(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), format_stream_types(session.streams)), exclude=session) def remove_session(self, session): notification_center = NotificationCenter() try: chat_stream = (stream for stream in session.streams or [] if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = (stream for stream in session.streams or [] if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.audio_conference.hold() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.get_conference_info() log.msg(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, format_session_duration(session))) if not self.sessions: log.msg(u'Last participant left conference %s' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), format_session_duration(session))) def accept_proposal(self, session, streams): if session in self.sessions_with_proposals: session.accept_proposal(streams) self.sessions_with_proposals.remove(session) def _play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() except WavePlayerError, e: log.warning(u"Error playing file %s: %s" % (file, e)) @run_in_green_thread def play_audio_welcome(self, session, welcome_prompt=True): audio_stream = (stream for stream in session.streams if stream.type == 'audio').next() player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_play=False, volume=50) audio_stream.bridge.add(player) if welcome_prompt: file = ResourcePath('sounds/co_welcome_conference.wav').normalized self._play_file_in_player(player, file, 1) user_count = len(set(str(s.remote_identity.uri) for s in self.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(session.remote_identity.uri)])) if user_count == 0: file = ResourcePath('sounds/co_only_one.wav').normalized self._play_file_in_player(player, file, 0.5) elif user_count == 1: file = ResourcePath('sounds/co_there_is.wav').normalized self._play_file_in_player(player, file, 0.5) elif user_count < 100: file = ResourcePath('sounds/co_there_are.wav').normalized self._play_file_in_player(player, file, 0.2) if user_count <= 24: file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized self._play_file_in_player(player, file, 0.1) else: file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized self._play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized self._play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/co_more_participants.wav').normalized self._play_file_in_player(player, file, 0) audio_stream.bridge.remove(player) self.audio_conference.add(audio_stream) self.audio_conference.unhold() def handle_incoming_subscription(self, subscribe_request, data): if subscribe_request.event != 'conference': subscribe_request.reject(489) return NotificationCenter().add_observer(self, sender=subscribe_request) subscribe_request.accept() self.subscriptions.append(subscribe_request) self.get_conference_info() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification_center = NotificationCenter() notification_center.remove_observer(self, sender=subscription) self.subscriptions.remove(subscription) def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.msg(u'%s has put the audio session on hold' % format_identity(session.remote_identity)) else: log.msg(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity)) self.get_conference_info() def _NH_SIPSessionGotProposal(self, notification): session = notification.sender audio_streams = [stream for stream in notification.data.streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return if chat_streams: chat_streams[0].chatroom_capabilities = [] streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] self.sessions_with_proposals.append(session) reactor.callLater(4, self.accept_proposal, session, streams) def _NH_SIPSessionGotRejectProposal(self, notification): session = notification.sender self.sessions_with_proposals.remove(session) def _NH_SIPSessionDidRenegotiateStreams(self, notification): notification_center = NotificationCenter() session = notification.sender streams = notification.data.streams if notification.data.action == 'add': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) log.msg(u'%s has added chat to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) log.msg(u'%s has added audio to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added audio' % format_identity(session.remote_identity), exclude=session) self.play_audio_welcome(session, False) elif notification.data.action == 'remove': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) log.msg(u'%s has removed chat from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.audio_conference.hold() log.msg(u'%s has removed audio from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed audio' % format_identity(session.remote_identity), exclude=session) if not session.streams: log.msg(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri)) session.end() self.get_conference_info() def _NH_AudioStreamDidTimeout(self, notification): stream = notification.sender session = stream._session log.msg(u'Audio stream for session %s timed out' % format_identity(session.remote_identity)) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') # Send MSRP chat message to other participants message = notification.data.message session = notification.sender.session self.incoming_message_queue.send((session, 'msrp_message', message)) # Send MSRP chat message to IRC chat room body = message.body sender = message.sender irc_message = '%s: %s' % (format_identity(sender), body) if self.irc_protocol is not None: self.irc_protocol.send_message(irc_message.encode('utf-8')) else: self.pending_messages.append(irc_message) def _NH_IRCBotGotConnected(self, notification): self.irc_protocol = notification.data.protocol # Send enqueued messages while self.pending_messages: message = self.pending_messages.pop(0) self.irc_protocol.send_message(message.encode('utf-8')) # Update participants list self.get_conference_info() def _NH_IRCBotGotDisconnected(self, notification): self.irc_protocol = None def _NH_IRCBotGotMessage(self, notification): message = notification.data.message self.incoming_message_queue.send((None, 'irc_message', message)) def _NH_IRCBotGotParticipantsList(self, notification): self.dispatch_conference_info(notification.data.participants) def _NH_IRCBotJoinedChannel(self, notification): self.get_conference_info() def _NH_IRCBotUserJoined(self, notification): self.dispatch_server_message('%s joined the IRC channel' % notification.data.user) self.get_conference_info() def _NH_IRCBotUserLeft(self, notification): self.dispatch_server_message('%s left the IRC channel' % notification.data.user) self.get_conference_info() def _NH_IRCBotUserQuit(self, notification): self.dispatch_server_message('%s quit the IRC channel: %s' % (notification.data.user, notification.data.reason)) self.get_conference_info() def _NH_IRCBotUserKicked(self, notification): data = notification.data self.dispatch_server_message('%s kicked %s out of the IRC channel: %s' % (data.kicker, data.kickee, data.reason)) self.get_conference_info() def _NH_IRCBotUserRenamed(self, notification): self.dispatch_server_message('%s changed his name to %s' % (notification.data.oldname, notification.data.newname)) self.get_conference_info() def _NH_IRCBotUserAction(self, notification): self.dispatch_server_message('%s %s' % (notification.data.user, notification.data.action)) class IRCBot(irc.IRCClient): nickname = 'SylkServer' def __init__(self): self._nick_collector = [] self.nicks = [] def connectionMade(self): irc.IRCClient.connectionMade(self) log.msg('Connection to IRC has been established') NotificationCenter().post_notification('IRCBotGotConnected', self.factory, NotificationData(protocol=self)) def connectionLost(self, failure): irc.IRCClient.connectionLost(self, failure) NotificationCenter().post_notification('IRCBotGotDisconnected', self.factory, NotificationData()) def signedOn(self): log.msg('Logging into %s channel...' % self.factory.channel) - self.join(self.factory.channel) + self.join(self.factory.channel) def kickedFrom(self, channel, kicker, message): log.msg('Got kicked from %s by %s: %s. Rejoining...' % (channel, kicker, message)) self.join(self.factory.channel) def joined(self, channel): log.msg('Logged into %s channel' % channel) NotificationCenter().post_notification('IRCBotJoinedChannel', self.factory, NotificationData(channel=self.factory.channel)) def privmsg(self, user, channel, message): if channel == '*': return username = user.split('!', 1)[0] if username == self.nickname: return if channel == self.nickname: self.msg(username, "Sorry, I don't support private messages, I'm a bot.") return uri = SIPURI.parse('sip:%s@%s' % (urllib.quote(username), self.factory.config.server[0])) irc_message = IRCMessage(username, uri, message.decode('utf-8')) data = NotificationData(message=irc_message) NotificationCenter().post_notification('IRCBotGotMessage', self.factory, data) def send_message(self, message): self.say(self.factory.channel, message) def get_participants(self): self.sendLine("NAMES #%s" % self.factory.channel) def got_participants(self, nicks): data = NotificationData(participants=nicks) NotificationCenter().post_notification('IRCBotGotParticipantsList', self.factory, data) def irc_RPL_NAMREPLY(self, prefix, params): """Collect usernames from this channel. Several of these messages may be sent to cover the channel's full nicklist. An RPL_ENDOFNAMES signals the end of the list. """ # We just separate these into individual nicks and stuff them in # the nickCollector, transferred to 'nicks' when we get the RPL_ENDOFNAMES. for name in params[3].split(): # Remove operator and voice prefixes if name[0] in '@+': name = name[1:] if name != self.nickname: self._nick_collector.append(name) def irc_RPL_ENDOFNAMES(self, prefix, params): """This is sent after zero or more RPL_NAMREPLY commands to terminate the list of users in a channel. """ self.nicks = self._nick_collector self._nick_collector = [] self.got_participants(self.nicks) def userJoined(self, user, channel): if channel.strip('#') == self.factory.channel: data = NotificationData(user=user) NotificationCenter().post_notification('IRCBotUserJoined', self.factory, data) def userLeft(self, user, channel): if channel.strip('#') == self.factory.channel: data = NotificationData(user=user) NotificationCenter().post_notification('IRCBotUserLeft', self.factory, data) def userQuit(self, user, reason): data = NotificationData(user=user, reason=reason) NotificationCenter().post_notification('IRCBotUserQuit', self.factory, data) def userKicked(self, kickee, channel, kicker, message): if channel.strip('#') == self.factory.channel: data = NotificationData(kickee=kickee, kicker=kicker, reason=message) NotificationCenter().post_notification('IRCBotUserKicked', self.factory, data) def userRenamed(self, oldname, newname): data = NotificationData(oldname=oldname, newname=newname) NotificationCenter().post_notification('IRCBotUserRenamed', self.factory, data) def action(self, user, channel, data): if channel.strip('#') == self.factory.channel: username = user.split('!', 1)[0] data = NotificationData(user=username, action=data) NotificationCenter().post_notification('IRCBotUserAction', self.factory, data) class IRCBotFactory(protocol.ClientFactory): protocol = IRCBot def __init__(self, config): self.config = config self.channel = config.channel self.stop_requested = False def clientConnectionLost(self, connector, failure): log.msg('Disconnected from IRC: %s' % failure.getErrorMessage()) if not self.stop_requested: log.msg('Reconnecting...') connector.connect() def clientConnectionFailed(self, connector, failure): log.error('Connection to IRC server failed: %s' % failure.getErrorMessage()) diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py index e0b9b1b..8f7123d 100644 --- a/sylk/applications/xmppgateway/im.py +++ b/sylk/applications/xmppgateway/im.py @@ -1,458 +1,458 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.descriptor import WriteOnceAttribute from collections import deque -from eventlet import coros +from eventlib import coros from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Message as SIPMessageRequest from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage from sylk.extensions import ChatStream from sylk.session import ServerSession log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) __all__ = ['ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError'] SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout class ChatSessionHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self): self.started = False self.ended = False self.sip_session = None self.msrp_stream = None self._sip_session_timer = None self.use_receipts = False self.xmpp_session = None self._xmpp_message_queue = deque() self._pending_msrp_chunks = {} self._pending_xmpp_stanzas = {} def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: notification_center = NotificationCenter() notification_center.post_notification('ChatSessionDidStart', sender=self) self._send_queued_messages() def _get_started(self): return self.__dict__['started'] started = property(_get_started, _set_started) del _get_started, _set_started def _set_xmpp_session(self, session): self.__dict__['xmpp_session'] = session if session is not None: # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) session.start() # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) def _get_xmpp_session(self): return self.__dict__['xmpp_session'] xmpp_session = property(_get_xmpp_session, _set_xmpp_session) del _get_xmpp_session, _set_xmpp_session @classmethod def new_from_sip_session(cls, sip_identity, session): instance = cls() instance.sip_identity = sip_identity instance._start_incoming_sip_session(session) return instance @classmethod def new_from_xmpp_stanza(cls, xmpp_identity, recipient): instance = cls() instance.xmpp_identity = xmpp_identity instance._start_outgoing_sip_session(recipient) return instance @run_in_green_thread def _start_incoming_sip_session(self, session): self.sip_session = session self.msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.accept([self.msrp_stream]) @run_in_green_thread def _start_outgoing_sip_session(self, target_uri): notification_center = NotificationCenter() # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = target_uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = AccountManager().sylkserver_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) notification_center.post_notification('ChatSessionDidFail', sender=self) return self.msrp_stream = ChatStream(account) route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self.sip_session = ServerSession(account) notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self.msrp_stream]) def end(self): if self.ended: return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.cancel() self._sip_session_timer = None notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session.end() self.sip_session = None self.msrp_stream = None if self.xmpp_session is not None: notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session.end() self.xmpp_session = None self.ended = True if self.started: notification_center.post_notification('ChatSessionDidEnd', sender=self) else: notification_center.post_notification('ChatSessionDidFail', sender=self) def enqueue_xmpp_message(self, message): if self.started: raise RuntimeError('session is already started') self._xmpp_message_queue.append(message) def _send_queued_messages(self): if self._xmpp_message_queue: while self._xmpp_message_queue: message = self._xmpp_message_queue.popleft() if message.body is None: continue if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' sender_uri = message.sender.uri.as_sip_uri() sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8')) sender = CPIMIdentity(sender_uri) self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _inactivity_timeout(self): log.msg("Ending SIP session %s due to incactivity" % self.sip_session._invitation.call_id) self.sip_session.end() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP session %s started" % notification.sender._invitation.call_id) self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout) if self.sip_session.direction == 'outgoing': # Time to set sip_identity and create the XMPPChatSession contact_uri = self.sip_session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = self.sip_session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name) session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) self.xmpp_session = session # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: if self.xmpp_session is not None: # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: # Try to wakeup XMPP clients sender = self.sip_identity tmp = self.sip_session.local_identity.uri recipient_uri = FrozenURI(tmp.user, tmp.host) recipient = Identity(recipient_uri) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, ' ', 'text/plain')) # Send queued messages self._send_queued_messages() def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP session %s ended" % notification.sender._invitation.call_id) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP session %s failed" % notification.sender._invitation.call_id) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionGotProposal(self, notification): self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.body else: html_body = message.body body = None if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) chunk = notification.data.chunk if self.started: self.xmpp_session.send_message(body, html_body, message_id=chunk.message_id) if self.use_receipts: self._pending_msrp_chunks[chunk.message_id] = chunk else: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') else: sender = self.sip_identity recipient_uri = FrozenURI.parse(message.recipients[0].uri) recipient = Identity(recipient_uri, message.recipients[0].display_name) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, body, html_body)) self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_ChatStreamGotComposingIndication(self, notification): # Notification is sent by the MSRP stream if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) if not self.started: return state = None if notification.data.state == 'active': state = 'composing' elif notification.data.state == 'idle': state = 'paused' if state is not None: self.xmpp_session.send_composing_indication(state) def _NH_ChatStreamDidDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_ChatStreamDidNotDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_error(message, 'TODO', []) # TODO def _NH_XMPPChatSessionDidStart(self, notification): if self.sip_session is not None: # Session is now established on both ends self.started = True def _NH_XMPPChatSessionDidEnd(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session = None self.end() def _NH_XMPPChatSessionGotMessage(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': self._xmpp_message_queue.append(notification.data.message) return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.use_receipts = message.use_receipt if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' self._pending_xmpp_stanzas[message.id] = message # Prefer plaintext self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _NH_XMPPChatSessionGotComposingIndication(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message state = None if message.state == 'composing': state = 'active' elif message.state == 'paused': state = 'idle' if state is not None: sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.msrp_stream.send_composing_indication(state, 30, local_identity=sender) if message.use_receipt: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_XMPPChatSessionDidDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason) def chunks(text, size): for i in xrange(0, len(text), size): yield text[i:i+size] class SIPMessageError(Exception): def __init__(self, code, reason): Exception.__init__(self, reason) self.code = code self.reason = reason class SIPMessageSender(object): implements(IObserver) def __init__(self, message): self.from_uri = message.sender.uri.as_sip_uri() self.from_uri.parameters.pop('gr', None) # No GRUU in From header self.to_uri = message.recipient.uri.as_sip_uri() self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU self.body = message.body self.content_type = 'text/plain' self._requests = set() self._channel = coros.queue() @run_in_waitable_green_thread def send(self): lookup = DNSLookup() settings = SIPSimpleSettings() account = AccountManager().sylkserver_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: msg = 'DNS lookup error while looking for %s proxy' % uri log.warning(msg) raise SIPMessageError(0, msg) else: route = routes.pop(0) from_header = FromHeader(self.from_uri) to_header = ToHeader(self.to_uri) route_header = RouteHeader(route.get_uri()) notification_center = NotificationCenter() for chunk in chunks(self.body, 1000): request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body) notification_center.add_observer(self, sender=request) self._requests.add(request) request.send() error = None count = len(self._requests) while count > 0: notification = self._channel.wait() if notification.name == 'SIPMessageDidFail': error = (notification.data.code, notification.data.reason) count -= 1 self._requests.clear() if error is not None: raise SIPMessageError(*error) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPMessageDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) self._channel.send(notification) def _NH_SIPMessageDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) self._channel.send(notification) diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py index c8a0481..7866532 100644 --- a/sylk/applications/xmppgateway/presence.py +++ b/sylk/applications/xmppgateway/presence.py @@ -1,482 +1,482 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import hashlib import os import random import urllib from application.notification import IObserver, NotificationCenter from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute -from eventlet import coros, proc +from eventlib import coros, proc from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Subscription from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import pidf, rpid from sipsimple.payloads import ParserError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from time import time from twisted.internet import reactor from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscription, XMPPIncomingSubscription from sylk.configuration import SIPConfig log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) __all__ = ['S2XPresenceHandler', 'X2SPresenceHandler'] class S2XPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self._sip_subscriptions = [] self._stanza_cache = {} self._pidf = None self._xmpp_subscription = None self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None while self._sip_subscriptions: subscription = self._sip_subscriptions.pop() notification_center.remove_observer(self, sender=subscription) try: subscription.end() except SIPCoreError: pass self.ended = True notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self) def add_sip_subscription(self, subscription): self._sip_subscriptions.append(subscription) notification_center = NotificationCenter() notification_center.add_observer(self, sender=subscription) if self._xmpp_subscription.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None subscription.accept(content_type, pidf_doc) else: subscription.accept_pending() def _build_pidf(self): if not self._stanza_cache: self._pidf = None return None pidf_doc = pidf.PIDF(str(self.xmpp_identity)) uri = self._stanza_cache.iterkeys().next() person = pidf.Person("ID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()) person.activities = rpid.Activities() pidf_doc.add(person) for stanza in self._stanza_cache.itervalues(): if not stanza.available: status = pidf.Status('closed') status.extended = 'offline' else: status = pidf.Status('open') if stanza.show == 'away': status.extended = 'away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'xa': status.extended = 'extended-away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'dnd': stanza.extended = 'busy' if 'busy' not in person.activities: person.activities.add('busy') else: stanza.extended = 'available' resource = encode_resource(stanza.sender.uri.resource) tuple_id = "ID-%s" % resource sip_uri = stanza.sender.uri.as_sip_uri() sip_uri.parameters['gr'] = resource contact = pidf.Contact(str(sip_uri)) tuple = pidf.Service(tuple_id, status=status, contact=contact) tuple.add(pidf.DeviceID(resource)) tuple.device_info = pidf.DeviceInfo(resource, description=urllib.quote(stanza.sender.uri.resource.encode('utf-8'))) for lang, note in stanza.statuses.iteritems(): tuple.notes.add(pidf.PIDFNote(note, lang=lang)) pidf_doc.add(tuple) if not person.activities: person.activities = None self._pidf = pidf_doc.toxml() return self._pidf def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) @run_in_twisted_thread def _NH_SIPIncomingSubscriptionDidEnd(self, notification): notification_center = NotificationCenter() subscription = notification.sender notification_center.remove_observer(self, sender=subscription) self._sip_subscriptions.remove(subscription) if not self._sip_subscriptions: self.end() def _NH_XMPPSubscriptionChangedState(self, notification): if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'): subscription.accept(content_type, pidf_doc) def _NH_XMPPSubscriptionGotNotify(self, notification): stanza = notification.data.presence self._stanza_cache[stanza.sender.uri] = stanza pidf_doc = self._build_pidf() for subscription in self._sip_subscriptions: try: subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc) except SIPCoreError: pass if not stanza.available: # Only inform once about this device being unavailable del self._stanza_cache[stanza.sender.uri] def _NH_XMPPSubscriptionDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription = None self.end() _NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class SubscriptionError(Exception): def __init__(self, error, timeout, refresh_interval=None, fatal=False): self.error = error self.refresh_interval = refresh_interval self.timeout = timeout self.fatal = fatal class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class X2SPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._sip_subscription = None self._sip_subscription_proc = None self._sip_subscription_timer = None self._xmpp_subscription = None def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() self._command_proc = proc.spawn(self._run) self._subscribe_sip() notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None if self._sip_subscription: self._unsubscribe_sip() self.ended = True notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self) @run_in_green_thread def _subscribe_sip(self): command = Command('subscribe') self._command_channel.send(command) @run_in_green_thread def _unsubscribe_sip(self): command = Command('unsubscribe') self._command_channel.send(command) command.wait() self._command_proc.kill() self._command_proc = None def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_subscribe(self, command): if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._sip_subscription_proc = None command.signal() def _process_pidf(self, body): try: pidf_doc = pidf.PIDF.parse(body) except ParserError, e: log.warn('Error parsing PIDF document: %s' % e) return # Build XML stanzas out of PIDF documents try: person = (p for p in pidf_doc.persons).next() except StopIteration: person = None for service in pidf_doc.services: sip_contact = self.sip_identity.uri.as_sip_uri() if service.device_info is not None: sip_contact.parameters['gr'] = service.device_info.id else: sip_contact.parameters['gr'] = service.id # TODO: pseudorandom thing with AoR? sender = Identity(FrozenURI.parse(sip_contact)) if service.status.extended is not None: available = service.status.extended != 'offline' else: available = service.status.basic == 'open' stanza = AvailabilityPresence(sender, self.xmpp_identity, available) for note in service.notes: stanza.statuses[note.lang] = note if service.status.extended is not None: if service.status.extended == 'away': stanza.show = 'away' elif service.status.extended == 'extended-away': stanza.show = 'xa' elif service.status.extended == 'busy': stanza.show = 'dnd' elif person is not None and person.activities is not None: activities = set(list(person.activities)) if 'away' in activities: stanza.show = 'away' elif set(('holiday', 'vacation')).intersection(activities): stanza.show = 'xa' elif 'busy' in activities: stanza.show = 'dnd' self._xmpp_subscription.send_presence(stanza) def _sip_subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() account = AccountManager().sylkserver_account refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval try: # Lookup routes if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters) subscription_uri = self.sip_identity.uri.as_sip_uri() subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()), ToHeader(subscription_uri), ContactHeader(contact_uri), 'presence', RouteHeader(route.get_uri()), refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) raise SubscriptionError(error='Internal error', timeout=5) self._sip_subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail, e: notification_center.remove_observer(self, sender=subscription) self._sip_subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120)) elif e.data.code == 403: # Forbidden raise SubscriptionError(error='Forbidden', timeout=None, fatal=True) elif e.data.code == 423: # Get the value of the Min-Expires header if e.data.min_expires is not None and e.data.min_expires > refresh_interval: interval = e.data.min_expires else: interval = None raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval) elif e.data.code in (405, 406, 489): raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True) elif e.data.code == 1400: raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True) else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, give up raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._sip_subscription: continue if self._xmpp_subscription is None: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'presence': subscription_state = notification.data.headers.get('Subscription-State').state if subscription_state == 'active' and self._xmpp_subscription.state != 'active': self._xmpp_subscription.accept() elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active': # The state went from active to pending, hide the presence state? pass if notification.data.body: self._process_pidf(notification.data.body) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail, e: if e.data.code == 0 and e.data.reason == 'rejected': self._xmpp_subscription.reject() else: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._sip_subscription) except InterruptSubscription, e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: notification_center.remove_observer(self, sender=self._sip_subscription) try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription, e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._sip_subscription) except SubscriptionError, e: if not e.fatal: self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval)) finally: self.subscribed = False self._sip_subscription = None self._sip_subscription_proc = None reactor.callLater(0, self.end) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification): self.end() def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification): if self._sip_subscription.state.lower() == 'active': self._xmpp_subscription.accept() _NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe diff --git a/sylk/applications/xmppgateway/xmpp/session.py b/sylk/applications/xmppgateway/xmpp/session.py index d20d22f..df78a7f 100644 --- a/sylk/applications/xmppgateway/xmpp/session.py +++ b/sylk/applications/xmppgateway/xmpp/session.py @@ -1,223 +1,223 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from application.python.types import Singleton -from eventlet import coros, proc +from eventlib import coros, proc from twisted.internet import reactor from zope.interface import implements from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, MessageReceipt, ErrorStanza, GroupChatMessage, MUCAvailabilityPresence __all__ = ['XMPPChatSession', 'XMPPChatSessionManager', 'XMPPIncomingMucSession', 'XMPPMucSessionManager'] # Chat sessions class XMPPChatSession(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.pending_receipts = {} self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def start(self): notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidStart', sender=self) self._proc = proc.spawn(self._run) self.state = 'started' def end(self): self.send_composing_indication('gone') self._clear_pending_receipts() self._proc.kill() self._proc = None notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def send_message(self, body, html_body, message_id=None, use_receipt=True): message = ChatMessage(self.local_identity, self.remote_identity, body, html_body, id=message_id, use_receipt=use_receipt) self.xmpp_manager.send_stanza(message) if message_id is not None: timer = reactor.callLater(30, self._receipt_timer_expired, message_id) self.pending_receipts[message_id] = timer notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=NotificationData(message=message)) def send_composing_indication(self, state, message_id=None, use_receipt=False): message = ChatComposingIndication(self.local_identity, self.remote_identity, state, id=message_id, use_receipt=use_receipt) self.xmpp_manager.send_stanza(message) if message_id is not None: timer = reactor.callLater(30, self._receipt_timer_expired, message_id) self.pending_receipts[message_id] = timer notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=NotificationData(message=message)) def send_receipt_acknowledgement(self, receipt_id): message = MessageReceipt(self.local_identity, self.remote_identity, receipt_id) self.xmpp_manager.send_stanza(message) def send_error(self, stanza, error_type, conditions): message = ErrorStanza.from_stanza(stanza, error_type, conditions) self.xmpp_manager.send_stanza(message) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, ChatMessage): notification_center.post_notification('XMPPChatSessionGotMessage', sender=self, data=NotificationData(message=item)) elif isinstance(item, ChatComposingIndication): if item.state == 'gone': self._clear_pending_receipts() notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=NotificationData(originator='remote')) self.state = 'terminated' break else: notification_center.post_notification('XMPPChatSessionGotComposingIndication', sender=self, data=NotificationData(message=item)) elif isinstance(item, MessageReceipt): if item.receipt_id in self.pending_receipts: timer = self.pending_receipts.pop(item.receipt_id) timer.cancel() notification_center.post_notification('XMPPChatSessionDidDeliverMessage', sender=self, data=NotificationData(message_id=item.receipt_id)) elif isinstance(item, ErrorStanza): if item.id in self.pending_receipts: timer = self.pending_receipts.pop(item.id) timer.cancel() # TODO: translate cause notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=item.id, code=503, reason='Service Unavailable')) self._proc = None def _receipt_timer_expired(self, message_id): self.pending_receipts.pop(message_id) notification_center = NotificationCenter() notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=message_id, code=408, reason='Timeout')) def _clear_pending_receipts(self): notification_center = NotificationCenter() while self.pending_receipts: message_id, timer = self.pending_receipts.popitem() timer.cancel() notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=message_id, code=408, reason='Timeout')) class XMPPChatSessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPChatSessionDidStart') notification_center.add_observer(self, name='XMPPChatSessionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPChatSessionDidStart') notification_center.remove_observer(self, name='XMPPChatSessionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPChatSessionDidStart(self, notification): session = notification.sender self.sessions[(session.local_identity.uri, session.remote_identity.uri)] = session def _NH_XMPPChatSessionDidEnd(self, notification): session = notification.sender del self.sessions[(session.local_identity.uri, session.remote_identity.uri)] # MUC sessions class XMPPIncomingMucSession(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def start(self): notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingMucSessionDidStart', sender=self) self._proc = proc.spawn(self._run) self.state = 'started' def end(self): self._proc.kill() self._proc = None notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def send_message(self, sender, body, html_body, message_id=None): # TODO: timestamp? message = GroupChatMessage(sender, self.remote_identity, body, html_body, id=message_id) self.xmpp_manager.send_muc_stanza(message) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, GroupChatMessage): notification_center.post_notification('XMPPIncomingMucSessionGotMessage', sender=self, data=NotificationData(message=item)) elif isinstance(item, MUCAvailabilityPresence): if item.available: nickname = item.recipient.uri.resource notification_center.post_notification('XMPPIncomingMucSessionChangedNickname', sender=self, data=NotificationData(stanza=item, nickname=nickname)) else: notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' break self._proc = None class XMPPMucSessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.incoming = {} self.outgoing = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPIncomingMucSessionDidStart') notification_center.add_observer(self, name='XMPPIncomingMucSessionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidStart') notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPIncomingMucSessionDidStart(self, notification): muc = notification.sender self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)] = muc def _NH_XMPPIncomingMucSessionDidEnd(self, notification): muc = notification.sender del self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)] diff --git a/sylk/applications/xmppgateway/xmpp/subscription.py b/sylk/applications/xmppgateway/xmpp/subscription.py index a4e2695..9ca61ec 100644 --- a/sylk/applications/xmppgateway/xmpp/subscription.py +++ b/sylk/applications/xmppgateway/xmpp/subscription.py @@ -1,206 +1,206 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from application.python.types import Singleton -from eventlet import coros, proc +from eventlib import coros, proc from zope.interface import implements from sylk.applications.xmppgateway.xmpp.stanzas import SubscriptionPresence, ProbePresence, AvailabilityPresence __all__ = ['XMPPSubscription', 'XMPPIncomingSubscription', 'XMPPSubscriptionManager'] class XMPPSubscription(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def _set_state(self, new_state): prev_state = self.__dict__.get('state', None) self.__dict__['state'] = new_state if prev_state != new_state: notification_center = NotificationCenter() notification_center.post_notification('XMPPSubscriptionChangedState', sender=self, data=NotificationData(prev_state=prev_state, state=new_state)) def _get_state(self): return self.__dict__['state'] state = property(_get_state, _set_state) del _get_state, _set_state def start(self): notification_center = NotificationCenter() notification_center.post_notification('XMPPSubscriptionDidStart', sender=self) self._proc = proc.spawn(self._run) self.subscribe() def end(self): if self.state == 'terminated': return self._proc.kill() self._proc = None notification_center = NotificationCenter() notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def subscribe(self): self.state = 'subscribe_sent' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribe') self.xmpp_manager.send_stanza(stanza) # If we are already subscribed we may not receive an answer, send a probe just in case self._send_probe() def unsubscribe(self): self.state = 'unsubscribe_sent' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribe') self.xmpp_manager.send_stanza(stanza) def _send_probe(self): self.state = 'subscribe_sent' stanza = ProbePresence(self.local_identity, self.remote_identity) self.xmpp_manager.send_stanza(stanza) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, AvailabilityPresence): if self.state == 'subscribe_sent': self.state == 'active' notification_center.post_notification('XMPPSubscriptionGotNotify', sender=self, data=NotificationData(presence=item)) elif isinstance(item, SubscriptionPresence): if self.state == 'subscribe_sent' and item.type == 'subscribed': self.state = 'active' elif item.type == 'unsubscribed': prev_state = self.state self.state = 'terminated' if prev_state in ('active', 'unsubscribe_sent'): notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self) else: notification_center.post_notification('XMPPSubscriptionDidFail', sender=self) break self._proc = None class XMPPIncomingSubscription(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def _set_state(self, new_state): prev_state = self.__dict__.get('state', None) self.__dict__['state'] = new_state if prev_state != new_state: notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingSubscriptionChangedState', sender=self, data=NotificationData(prev_state=prev_state, state=new_state)) def _get_state(self): return self.__dict__['state'] state = property(_get_state, _set_state) del _get_state, _set_state def start(self): notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingSubscriptionDidStart', sender=self) self._proc = proc.spawn(self._run) def end(self): if self.state == 'terminated': return self.state = 'terminated' self._proc.kill() self._proc = None notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) def accept(self): self.state = 'active' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribed') self.xmpp_manager.send_stanza(stanza) def reject(self): self.state = 'terminating' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribed') self.xmpp_manager.send_stanza(stanza) self.end() def send_presence(self, stanza): self.xmpp_manager.send_stanza(stanza) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, SubscriptionPresence): if item.type == 'subscribe': notification_center.post_notification('XMPPIncomingSubscriptionGotSubscribe', sender=self) elif item.type == 'unsubscribe': self.state = 'terminated' notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingSubscriptionGotUnsubscribe', sender=self) notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) break elif isinstance(item, ProbePresence): notification_center = NotificationCenter() notification_center.post_notification('XMPPIncomingSubscriptionGotProbe', sender=self) self._proc = None class XMPPSubscriptionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.incoming_subscriptions = {} self.outgoing_subscriptions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPSubscriptionDidStart') notification_center.add_observer(self, name='XMPPSubscriptionDidEnd') notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidStart') notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPSubscriptionDidStart') notification_center.remove_observer(self, name='XMPPSubscriptionDidEnd') notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidStart') notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPSubscriptionDidStart(self, notification): subscription = notification.sender self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription def _NH_XMPPSubscriptionDidEnd(self, notification): subscription = notification.sender del self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] def _NH_XMPPIncomingSubscriptionDidStart(self, notification): subscription = notification.sender self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription def _NH_XMPPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender del self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] diff --git a/sylk/bonjour.py b/sylk/bonjour.py index 25b52c1..2eb6ce3 100644 --- a/sylk/bonjour.py +++ b/sylk/bonjour.py @@ -1,239 +1,239 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application import log from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null -from eventlet import api, coros, proc -from eventlet.green import select +from eventlib import api, coros, proc +from eventlib.green import select from sipsimple.account import AccountManager from sipsimple.account.bonjour import _bonjour, BonjourRegistrationFile from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from twisted.internet import reactor from zope.interface import implements class RestartSelect(Exception): pass class BonjourServices(object): implements(IObserver) def __init__(self, service='sipfocus', name='SylkServer', uri_user=None): self.account = AccountManager().sylkserver_account self.service = service self.name = name self.uri_user = uri_user self._stopped = True self._files = [] self._command_channel = coros.queue() self._select_proc = None self._register_timer = None self._update_timer = None self._wakeup_timer = None @run_in_green_thread def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='SystemIPAddressDidChange') notification_center.add_observer(self, name='SystemDidWakeUpFromSleep') self._select_proc = proc.spawn(self._process_files) proc.spawn(self._handle_commands) self._activate() @run_in_green_thread def stop(self): self._deactivate() notification_center = NotificationCenter() notification_center.remove_observer(self, name='SystemIPAddressDidChange') notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep') self._select_proc.kill() self._command_channel.send_exception(api.GreenletExit) def _activate(self): self._stopped = False self._command_channel.send(Command('register')) def _deactivate(self): command = Command('stop') self._command_channel.send(command) command.wait() self._stopped = True def restart_registration(self): self._command_channel.send(Command('unregister')) self._command_channel.send(Command('register')) def update_registrations(self): self._command_channel.send(Command('update_registrations')) def _register_cb(self, file, flags, error_code, name, regtype, domain): notification_center = NotificationCenter() file = BonjourRegistrationFile.find_by_file(file) if error_code == _bonjour.kDNSServiceErr_NoError: notification_center.post_notification('BonjourServiceRegistrationDidSucceed', sender=self, data=NotificationData(name=name, transport=file.transport)) else: error = _bonjour.BonjourError(error_code) notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, data=NotificationData(reason=str(error), transport=file.transport)) self._files.remove(file) self._select_proc.kill(RestartSelect) file.close() if self._register_timer is None: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register')) def _process_files(self): while True: try: ready = select.select([f for f in self._files if not f.active and not f.closed], [], [])[0] except RestartSelect: continue else: for file in ready: file.active = True self._command_channel.send(Command('process_results', files=[f for f in ready if not f.closed])) def _handle_commands(self): while True: command = self._command_channel.wait() if not self._stopped: handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_unregister(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile)): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() def _CH_register(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None supported_transports = set(transport for transport in settings.sip.transport_list if transport!='tls' or self.account.tls.certificate is not None) registered_transports = set(file.transport for file in self._files if isinstance(file, BonjourRegistrationFile)) missing_transports = supported_transports - registered_transports added_transports = set() for transport in missing_transports: notification_center.post_notification('BonjourServiceWillRegister', sender=self, data=NotificationData(transport=transport)) try: contact_uri = self.account.contact[transport] contact_uri.user = self.uri_user contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri)) file = _bonjour.DNSServiceRegister(name=str(contact_uri), regtype="_%s._%s" % (self.service, transport if transport == 'udp' else 'tcp'), port=contact_uri.port, callBack=self._register_cb, txtRecord=_bonjour.TXTRecord(items=txtdata)) except (_bonjour.BonjourError, KeyError), e: notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, data=NotificationData(reason=str(e), transport=transport)) else: self._files.append(BonjourRegistrationFile(file, transport)) added_transports.add(transport) if added_transports: self._select_proc.kill(RestartSelect) if added_transports != missing_transports: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register', command.event)) else: command.signal() def _CH_update_registrations(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None available_transports = settings.sip.transport_list old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile) and f.transport not in available_transports): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() update_failure = False for file in (f for f in self._files if isinstance(f, BonjourRegistrationFile)): try: contact_uri = self.account.contact[file.transport] contact_uri.user = self.user_uri contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri)) _bonjour.DNSServiceUpdateRecord(file.file, None, flags=0, rdata=_bonjour.TXTRecord(items=txtdata), ttl=0) except (_bonjour.BonjourError, KeyError), e: notification_center.post_notification('BonjourServiceRegistrationUpdateDidFail', sender=self, data=NotificationData(reason=str(e), transport=file.transport)) update_failure = True self._command_channel.send(Command('register')) if update_failure: self._update_timer = reactor.callLater(1, self._command_channel.send, Command('update_registrations', command.event)) else: command.signal() def _CH_process_results(self, command): for file in (f for f in command.files if not f.closed): try: _bonjour.DNSServiceProcessResult(file.file) except: # Should we close the file? The documentation doesn't say anything about this. -Luci log.err() for file in command.files: file.active = False self._files = [f for f in self._files if not f.closed] self._select_proc.kill(RestartSelect) def _CH_stop(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None if self._wakeup_timer is not None and self._wakeup_timer.active(): self._wakeup_timer.cancel() self._wakeup_timer = None old_files = self._files self._files = [] self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SystemIPAddressDidChange(self, notification): if self._files: self.restart_registration() def _NH_SystemDidWakeUpFromSleep(self, notification): if self._wakeup_timer is None: def wakeup_action(): if self._files: self.restart_registration() self._wakeup_timer = None self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize diff --git a/sylk/extensions.py b/sylk/extensions.py index 66a04f8..7e09562 100644 --- a/sylk/extensions.py +++ b/sylk/extensions.py @@ -1,215 +1,215 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import random from datetime import datetime from dateutil.tz import tzlocal from application.notification import NotificationCenter, NotificationData -from eventlet import api +from eventlib import api from msrplib.connect import DirectConnector, DirectAcceptor, RelayConnection, MSRPRelaySettings from msrplib.protocol import URI from msrplib.session import contains_mime_type from msrplib.transport import make_response from sipsimple.core import SDPAttribute from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage, State, LastActive, Refresh, ContentType from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError from sipsimple.streams.msrp import ChatStreamError, MSRPStreamError, NotificationProxyLogger from sipsimple.streams.msrp import ChatStream as _ChatStream, MSRPStreamBase as _MSRPStreamBase from sipsimple.threading.green import run_in_green_thread from twisted.python.failure import Failure from sylk.configuration import SIPConfig # We need to be able to set the local identity in the message CPIM envelope # so that messages appear to be coming from the users themselves, instead of # just seeying the server identity registry = MediaStreamRegistry() for stream_type in registry.stream_types[:]: if stream_type is _ChatStream: registry.stream_types.remove(stream_type) break del registry class MSRPStreamBase(_MSRPStreamBase): @run_in_green_thread def initialize(self, session, direction): self.greenlet = api.getcurrent() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) try: self.session = session self.transport = self.account.msrp.transport outgoing = direction=='outgoing' logger = NotificationProxyLogger() if self.account.msrp.connection_model == 'relay': if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' elif not outgoing and not self.account.nat_traversal.use_msrp_relay_for_inbound: if self.transport=='tls' and None in (self.account.tls_credentials.cert, self.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger) self.local_role = 'passive' elif outgoing and not self.account.nat_traversal.use_msrp_relay_for_outbound: self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if self.account.nat_traversal.msrp_relay is None: relay_host = relay_port = None else: if self.transport != self.account.nat_traversal.msrp_relay.transport: raise MSRPStreamError("MSRP relay transport conflicts with MSRP transport setting") relay_host = self.account.nat_traversal.msrp_relay.host relay_port = self.account.nat_traversal.msrp_relay.port relay = MSRPRelaySettings(domain=self.account.uri.host, username=self.account.uri.user, password=self.account.credentials.password, host=relay_host, port=relay_port, use_tls=self.transport=='tls') self.msrp_connector = RelayConnection(relay, 'passive', logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' else: if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if not outgoing and self.transport=='tls' and None in (self.account.tls_credentials.cert, self.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' full_local_path = self.msrp_connector.prepare(self.local_uri) self.local_media = self._create_local_media(full_local_path) except api.GreenletExit: raise except Exception, ex: ndata = NotificationData(context='initialize', failure=Failure(), reason=str(ex)) notification_center.post_notification('MediaStreamDidFail', self, ndata) else: notification_center.post_notification('MediaStreamDidInitialize', self) finally: if self.msrp_session is None and self.msrp is None and self.msrp_connector is None: notification_center.remove_observer(self, sender=self) self.greenlet = None class ChatStream(_ChatStream, MSRPStreamBase): accept_types = ['message/cpim'] accept_wrapped_types = ['*'] chatroom_capabilities = ['nickname', 'private-messages', 'com.ag-projects.screen-sharing'] @property def local_uri(self): return URI(host=SIPConfig.local_ip, port=0, use_tls=self.transport=='tls', credentials=self.account.tls_credentials) def _create_local_media(self, uri_path): local_media = MSRPStreamBase._create_local_media(self, uri_path) if self.session.local_focus and self.chatroom_capabilities: local_media.attributes.append(SDPAttribute('chatroom', ' '.join(self.chatroom_capabilities))) return local_media def _handle_SEND(self, chunk): # This ChatStream doesn't send MSRP REPORT chunks automatically, the developer needs to manually send them if self.direction=='sendonly': self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if not chunk.data: self.msrp_session.send_report(chunk, 200, 'OK') return if chunk.segment is not None: self.incoming_queue.setdefault(chunk.message_id, []).append(chunk.data) if chunk.final: chunk.data = ''.join(self.incoming_queue.pop(chunk.message_id)) else: self.msrp_session.send_report(chunk, 200, 'OK') return if chunk.content_type.lower() == 'message/cpim': try: message = CPIMMessage.parse(chunk.data) except CPIMParserError: self.msrp_session.send_report(chunk, 400, 'CPIM Parser Error') return else: if message.timestamp is None: message.timestamp = datetime.now(tzlocal()) if message.sender is None: message.sender = self.remote_identity private = self.session.remote_focus and len(message.recipients) == 1 and message.recipients[0] != self.remote_identity else: self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') return # TODO: check wrapped content-type and issue a report if it's invalid notification_center = NotificationCenter() if message.content_type.lower() == IsComposingDocument.content_type: data = IsComposingDocument.parse(message.body) ndata = NotificationData(state=data.state.value, refresh=data.refresh.value if data.refresh is not None else None, content_type=data.content_type.value if data.content_type is not None else None, last_active=data.last_active.value if data.last_active is not None else None, sender=message.sender, recipients=message.recipients, private=private, chunk=chunk) notification_center.post_notification('ChatStreamGotComposingIndication', self, ndata) else: notification_center.post_notification('ChatStreamGotMessage', self, NotificationData(message=message, private=private, chunk=chunk)) def _handle_NICKNAME(self, chunk): nickname = chunk.headers['Use-Nickname'].decoded notification_center = NotificationCenter() notification_center.post_notification('ChatStreamGotNicknameRequest', self, NotificationData(nickname=nickname, chunk=chunk)) @run_in_green_thread def _send_response(self, response): try: self.msrp_session.send_chunk(response) except Exception: pass def accept_nickname(self, chunk): if chunk.method != 'NICKNAME': raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method) response = make_response(chunk, 200, 'OK') self._send_response(response) def reject_nickname(self, chunk, code, reason): if chunk.method != 'NICKNAME': raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method) response = make_response(chunk, code, reason) self._send_response(response) def send_message(self, content, content_type='text/plain', local_identity=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None, message_id=None, notify_progress=True, success_report='yes', failure_report='yes'): if self.direction=='recvonly': raise ChatStreamError('Cannot send message on recvonly stream') if message_id is None: message_id = '%x' % random.getrandbits(64) if not contains_mime_type(self.accept_wrapped_types, content_type): raise ChatStreamError('Invalid content_type for outgoing message: %r' % content_type) if not recipients: recipients = [self.remote_identity] if timestamp is None: timestamp = datetime.now() # Only use CPIM, it's the only type we accept msg = CPIMMessage(content, content_type, sender=local_identity or self.local_identity, recipients=recipients, courtesy_recipients=courtesy_recipients, subject=subject, timestamp=timestamp, required=required, additional_headers=additional_headers) self._enqueue_message(str(message_id), str(msg), 'message/cpim', failure_report=failure_report, success_report=success_report, notify_progress=notify_progress) return message_id def send_composing_indication(self, state, refresh, last_active=None, recipients=None, local_identity=None, message_id=None, notify_progress=False, success_report='no', failure_report='partial'): if self.direction == 'recvonly': raise ChatStreamError('Cannot send message on recvonly stream') if state not in ('active', 'idle'): raise ValueError('Invalid value for composing indication state') if message_id is None: message_id = '%x' % random.getrandbits(64) content = IsComposingMessage(state=State(state), refresh=Refresh(refresh), last_active=LastActive(last_active or datetime.now()), content_type=ContentType('text')).toxml() if recipients is None: recipients = [self.remote_identity] # Only use CPIM, it's the only type we accept msg = CPIMMessage(content, IsComposingDocument.content_type, sender=local_identity or self.local_identity, recipients=recipients, timestamp=datetime.now()) self._enqueue_message(str(message_id), str(msg), 'message/cpim', failure_report='partial', success_report='no') return message_id diff --git a/sylk/interfaces/sipthor.py b/sylk/interfaces/sipthor.py index dd58899..09b91bc 100644 --- a/sylk/interfaces/sipthor.py +++ b/sylk/interfaces/sipthor.py @@ -1,111 +1,111 @@ # Copyright (C) 2011 AG-Projects. # # This module is proprietary to AG Projects. Use of this module by third # parties is not supported. __all__ = ['ConferenceNode'] from application import log from application.notification import NotificationCenter, NotificationData from application.python.types import Singleton -from eventlet.twistedutil import block_on, callInGreenThread +from eventlib.twistedutil import block_on, callInGreenThread from gnutls.interfaces.twisted import X509Credentials from gnutls.constants import COMP_DEFLATE, COMP_LZO, COMP_NULL from twisted.internet import defer from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from thor.scheduler import KeepRunning import sylk from sylk.configuration import SIPConfig, ThorNodeConfig class ConferenceNode(EventServiceClient): __metaclass__ = Singleton topics = ["Thor.Members"] def __init__(self): # Needs to be called from a green thread self.node = ThorEntity(SIPConfig.local_ip, ['conference_server', 'xmpp_gateway'], version=sylk.__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL) EventServiceClient.__init__(self, ThorNodeConfig.domain, credentials) def connectionLost(self, connector, reason): """Called when an event server connection goes away""" self.connections.discard(connector.transport) def connectionFailed(self, connector, reason): """Called when an event server connection has an unrecoverable error""" connector.failed = True available_connectors = set(c for c in self.connectors if not c.failed) if not available_connectors: log.fatal("All Thor Event Servers have unrecoverable errors.") NotificationCenter().post_notification('ThorNetworkGotFatalError', sender=self) def stop(self): # Needs to be called from a green thread self._shutdown() def _monitor_event_servers(self): def wrapped_func(): servers = self._get_event_servers() self._update_event_servers(servers) callInGreenThread(wrapped_func) return KeepRunning def _disconnect_all(self): for conn in self.connectors: conn.disconnect() def _shutdown(self): if self.disconnecting: return self.disconnecting = True self.dns_monitor.cancel() if self.advertiser: self.advertiser.cancel() if self.shutdown_message: self._publish(self.shutdown_message) requests = [conn.protocol.unsubscribe(*self.topics) for conn in self.connections] d = defer.DeferredList([request.deferred for request in requests]) block_on(d) self._disconnect_all() def handle_event(self, event): #print "Received event: %s" % event networks = self.networks role_map = ThorEntitiesRoleMap(event.message) # mapping between role names and lists of nodes with that role updated = False for role in ('sip_proxy', 'conference_server', 'xmpp_gateway'): try: network = networks[role] except KeyError: from thor import network as thor_network network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network new_nodes = set([node.ip for node in role_map.get(role, [])]) old_nodes = set(network.nodes) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) plural = len(removed_nodes) != 1 and 's' or '' log.msg("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes))) updated = True if added_nodes: for node in added_nodes: network.add_node(node) plural = len(added_nodes) != 1 and 's' or '' log.msg("added %s node%s: %s" % (role, plural, ', '.join(added_nodes))) updated = True #print "Thor %s nodes: %s" % (role, str(network.nodes)) if updated: NotificationCenter().post_notification('ThorNetworkGotUpdate', sender=self, data=NotificationData(networks=self.networks)) diff --git a/sylk/server.py b/sylk/server.py index d547191..685750e 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,240 +1,240 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # from __future__ import with_statement import sys from threading import Event from application import log from application.notification import NotificationCenter, NotificationData -from eventlet import api, proc +from eventlib import api, proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration import ConfigurationError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer, Engine, SIPCoreError from sipsimple.lookup import DNSManager from sipsimple.storage import MemoryStorage from sipsimple.threading import ThreadManager from sipsimple.threading.green import run_in_green_thread from twisted.internet import reactor # Load extensions needed for integration with SIP SIMPLE SDK import sylk.extensions from sylk.applications import IncomingRequestHandler from sylk.configuration import SIPConfig, ThorNodeConfig from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import Logger from sylk.session import SessionManager class SylkServer(SIPApplication): def __init__(self): self.logger = None self.request_handler = None self.stop_event = Event() def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, name='ThorNetworkGotFatalError') self.logger = Logger() Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: SIPApplication.start(self, MemoryStorage()) except ConfigurationError, e: log.fatal("Error loading configuration: ",e) sys.exit(1) def _load_configuration(self): # Command line options settings = SIPSimpleSettings() settings.bonjour.enabled = '--use-bonjour' in sys.argv # Horrible hack, I know account_manager = AccountManager() account = Account("account@example.com") # an account is required by AccountManager # Disable MSRP ACM if we are using Bonjour account.msrp.connection_model = 'relay' if settings.bonjour.enabled else 'acm' account.save() account_manager.sylkserver_account = account @run_in_green_thread def _initialize_subsystems(self): account_manager = AccountManager() engine = Engine() notification_center = NotificationCenter() session_manager = SessionManager() settings = SIPSimpleSettings() self._load_configuration() notification_center.post_notification('SIPApplicationWillStart', sender=self) if self.state == 'stopping': reactor.stop() return account = account_manager.sylkserver_account # initialize core notification_center.add_observer(self, sender=engine) options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP detect_sip_loops=False, udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_protocol='TLSv1', tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, tls_timeout=3000, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # logging log_level=settings.logs.pjsip_level, trace_sip=True, # events and requests to handle events={'conference': ['application/conference-info+xml'], 'presence': ['application/pidf+xml'], 'refer': ['message/sipfrag;version=2.0']}, incoming_events=set(['conference', 'presence']), incoming_requests=set(['MESSAGE']) ) try: engine.start(**options) except SIPCoreError: self.end_reason = 'engine failed' reactor.stop() return # initialize TLS try: engine.set_tls_options(port=settings.sip.tls_port if 'tls' in settings.sip.transport_list else None, protocol=settings.tls.protocol, verify_server=account.tls.verify_server, ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None, cert_file=account.tls.certificate.normalized if account.tls.certificate else None, privkey_file=account.tls.certificate.normalized if account.tls.certificate else None, timeout=settings.tls.timeout) except Exception, e: notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e)) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize middleware components account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' notification_center.post_notification('SIPApplicationDidStart', sender=self) @run_in_green_thread def _shutdown_subsystems(self): # cleanup internals if self._wakeup_timer is not None and self._wakeup_timer.active(): self._wakeup_timer.cancel() self._wakeup_timer = None # shutdown SIPThor interface sipthor_proc = proc.spawn(self._stop_sipthor) sipthor_proc.wait() # shutdown middleware components dns_manager = DNSManager() account_manager = AccountManager() session_manager = SessionManager() procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(session_manager.stop)] proc.waitall(procs) # shutdown engine engine = Engine() engine.stop() # TODO: timeout should be removed when the Engine is fixed so that it never hangs. -Saul try: with api.timeout(15): while True: notification = self._channel.wait() if notification.name == 'SIPEngineDidEnd': break except api.TimeoutError: pass # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _start_sipthor(self): if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode ConferenceNode() def _stop_sipthor(self): if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode ConferenceNode().stop() def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal("Couldn't set TLS options: %s" % notification.data.error) def _NH_SIPApplicationWillStart(self, notification): self.logger.start() settings = SIPSimpleSettings() if settings.logs.trace_sip and self.logger._siptrace_filename is not None: log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename) if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None: log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename) if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None: log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename) if settings.logs.trace_notifications and self.logger._notifications_filename is not None: log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename) def _NH_SIPApplicationDidStart(self, notification): engine = Engine() settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.msg("SylkServer started, listening on:") for transport in settings.sip.transport_list: try: log.msg("%s:%d (%s)" % (local_ip, getattr(engine, '%s_port' % transport), transport.upper())) except TypeError: pass # Start request handler self.request_handler = IncomingRequestHandler() self.request_handler.start() # Start SIPThor interface proc.spawn(self._start_sipthor) def _NH_SIPApplicationWillEnd(self, notification): self.request_handler.stop() def _NH_SIPApplicationDidEnd(self, notification): self.logger.stop() self.stop_event.set() def _NH_SIPEngineGotException(self, notification): log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback) def _NH_ThorNetworkGotFatalError(self, notification): self.stop() diff --git a/sylk/session.py b/sylk/session.py index f2cf939..bfeb00e 100644 --- a/sylk/session.py +++ b/sylk/session.py @@ -1,626 +1,626 @@ # Copyright (C) 2011 AG Projects. See LICENSE for details. # from __future__ import with_statement import random from datetime import datetime from time import time from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit from application.python.types import Singleton -from eventlet import api, coros, proc +from eventlib import api, coros, proc from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Invitation, Subscription, SIPCoreError, sip_status_messages from sipsimple.core import ContactHeader, RouteHeader, SubjectHeader, FromHeader, ToHeader from sipsimple.core import SIPURI, SDPConnection, SDPSession from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import ParserError from sipsimple.payloads.conference import ConferenceDocument from sipsimple.session import Session from sipsimple.session import SessionReplaceHandler, TransferHandler, DialogID, TransferInfo from sipsimple.session import InvitationDisconnectedError, MediaStreamDidFailError, InterruptSubscription, TerminateSubscription, SubscriptionError, SIPSubscriptionDidFail from sipsimple.session import transition_state from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from twisted.internet import reactor from zope.interface import implements from sylk.configuration import SIPConfig class ConferenceHandler(object): implements(IObserver) def __init__(self, session): self.session = session self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None self._wakeup_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='DNSNameserversDidChange') notification_center.add_observer(self, name='SystemIPAddressDidChange') notification_center.add_observer(self, name='SystemDidWakeUpFromSleep') self._command_proc = proc.spawn(self._run) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _activate(self): self.active = True command = Command('subscribe') self._command_channel.send(command) return command def _deactivate(self): self.active = False command = Command('unsubscribe') self._command_channel.send(command) return command def _resubscribe(self): command = Command('subscribe') self._command_channel.send(command) return command def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='DNSNameserversDidChange') notification_center.remove_observer(self, name='SystemIPAddressDidChange') notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep') self._deactivate() command = Command('terminate') self._command_channel.send(command) command.wait() self.session = None def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._wakeup_timer is not None and self._wakeup_timer.active(): self._wakeup_timer.cancel() self._wakeup_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): command.signal() raise proc.ProcExit() def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) target_uri = SIPURI.new(self.session.remote_identity.uri) refresh_interval = getattr(command, 'refresh_interval', account.sip.subscribe_interval) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters) subscription = Subscription(target_uri, FromHeader(SIPURI.new(self.session.local_identity.uri)), ToHeader(target_uri), ContactHeader(contact_uri), 'conference', RouteHeader(route.get_uri()), credentials=account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) timeout = 5 raise SubscriptionError(error='Internal error', timeout=timeout) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail, e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time timeout = random.uniform(60, 120) raise SubscriptionError(error='Authentication failed', timeout=timeout) elif e.data.code == 423: # Get the value of the Min-Expires header timeout = random.uniform(60, 120) if e.data.min_expires is not None and e.data.min_expires > refresh_interval: raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires) else: raise SubscriptionError(error='Interval too short', timeout=timeout) elif e.data.code in (405, 406, 489, 1400): command.signal(e) return else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription timeout = random.uniform(60, 180) raise SubscriptionError(error='No more routes to try', timeout=timeout) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'conference' and notification.data.body: try: conference_info = ConferenceDocument.parse(notification.data.body) except ParserError: pass else: notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info)) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) except InterruptSubscription, e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription, e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) except SubscriptionError, e: if 'min_expires' in e.attributes: command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires']) else: command = Command('subscribe', command.event) self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_SIPSessionDidStart(self, notification): if self.session.remote_focus: self._activate() @run_in_green_thread def _NH_SIPSessionDidFail(self, notification): self._terminate() @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): self._terminate() def _NH_SIPSessionDidRenegotiateStreams(self, notification): if self.session.remote_focus and not self.active: self._activate() elif not self.session.remote_focus and self.active: self._deactivate() def _NH_DNSNameserversDidChange(self, notification): if self.active: self._resubscribe() def _NH_SystemIPAddressDidChange(self, notification): if self.active: self._resubscribe() def _NH_SystemDidWakeUpFromSleep(self, notification): if self._wakeup_timer is None: def wakeup_action(): if self.active: self._resubscribe() self._wakeup_timer = None self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize class ServerSession(Session): def init_incoming(self, invitation, data): notification_center = NotificationCenter() remote_sdp = invitation.sdp.proposed_remote self.proposed_streams = [] if remote_sdp: for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry(): try: stream = stream_type.new_from_sdp(self.account, remote_sdp, index) except InvalidStreamError: break except UnknownStreamError: continue else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self.direction = 'incoming' self.state = 'incoming' self.transport = invitation.transport self._invitation = invitation self.conference = ConferenceHandler(self) self.transfer_handler = TransferHandler(self) if 'isfocus' in invitation.remote_contact_header.parameters: self.remote_focus = True try: self.__dict__['subject'] = data.headers['Subject'].subject except KeyError: pass if 'Referred-By' in data.headers or 'Replaces' in data.headers: self.transfer_info = TransferInfo() if 'Referred-By' in data.headers: self.transfer_info.referred_by = data.headers['Referred-By'].body if 'Replaces' in data.headers: replaces_header = data.headers.get('Replaces') replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=replaces_header.from_tag) session_manager = SessionManager() try: self.replaced_session = (session for session in session_manager.sessions if session._invitation is not None and session._invitation.dialog_id == replaced_dialog_id).next() except StopIteration: invitation.send_response(481) return else: self.transfer_info.replaced_dialog_id = replaced_dialog_id replace_handler = SessionReplaceHandler(self) replace_handler.start() notification_center.add_observer(self, sender=invitation) notification_center.post_notification('SIPSessionNewIncoming', self, NotificationData(streams=self.proposed_streams, headers=data.headers)) else: invitation.send_response(488) @transition_state(None, 'connecting') @run_in_green_thread def connect(self, from_header, to_header, routes, streams, contact_header=None, is_focus=False, subject=None, extra_headers=[]): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False received_code = 0 received_reason = None unhandled_notifications = [] self.direction = 'outgoing' self.proposed_streams = streams self.route = routes[0] self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() self._local_identity = from_header self._remote_identity = to_header self.conference = ConferenceHandler(self) self.transfer_handler = Null self.__dict__['subject'] = subject notification_center.add_observer(self, sender=self._invitation) notification_center.post_notification('SIPSessionNewOutgoing', self, NotificationData(streams=streams)) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 if contact_header is None: try: contact_uri = self.account.contact[self.route] except KeyError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e)) return else: contact_header = ContactHeader(contact_uri) local_ip = contact_header.uri.host local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent) stun_addresses = [] for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(for_offer=True) local_sdp.media.append(media) stun_addresses.extend((value.split(' ', 5)[4] for value in media.attributes.getall('candidate') if value.startswith('S '))) if stun_addresses: local_sdp.connection.address = stun_addresses[0] route_header = RouteHeader(self.route.get_uri()) if is_focus: contact_header.parameters['isfocus'] = None if self.subject: extra_headers.append(SubjectHeader(self.subject)) self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, extra_headers=extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=received_code, reason=received_reason, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self, ) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.greenlet = None self.end() return notification_center.post_notification('SIPSessionWillStart', self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) [self._channel.send(notification) for notification in invitation_notifications] while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except (MediaStreamDidFailError, api.TimeoutError), e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=received_code, reason=received_reason, error=error) except InvitationDisconnectedError, e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator)) if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200])) self.end_time = datetime.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason)) if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None except SIPCoreError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=received_code, reason=received_reason, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = datetime.now() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() class SessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = [] self.state = None self._channel = coros.queue() def start(self): self.state = 'starting' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillStart', sender=self) notification_center.add_observer(self, 'SIPInvitationChangedState') notification_center.add_observer(self, 'SIPSessionNewIncoming') notification_center.add_observer(self, 'SIPSessionNewOutgoing') notification_center.add_observer(self, 'SIPSessionDidFail') notification_center.add_observer(self, 'SIPSessionDidEnd') self.state = 'started' notification_center.post_notification('SIPSessionManagerDidStart', sender=self) def stop(self): self.state = 'stopping' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillEnd', sender=self) for session in self.sessions: session.end() while self.sessions: self._channel.wait() notification_center.remove_observer(self, 'SIPInvitationChangedState') notification_center.remove_observer(self, 'SIPSessionNewIncoming') notification_center.remove_observer(self, 'SIPSessionNewOutgoing') notification_center.remove_observer(self, 'SIPSessionDidFail') notification_center.remove_observer(self, 'SIPSessionDidEnd') self.state = 'stopped' notification_center.post_notification('SIPSessionManagerDidEnd', sender=self) @run_in_twisted_thread def handle_notification(self, notification): if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming': account = AccountManager().sylkserver_account notification.sender.send_response(100) session = ServerSession(account) session.init_incoming(notification.sender, notification.data) elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'): self.sessions.append(notification.sender) elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): self.sessions.remove(notification.sender) if self.state == 'stopping': self._channel.send(notification)