diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py index 94f7e86..2cf4947 100644 --- a/sylk/applications/conference/room.py +++ b/sylk/applications/conference/room.py @@ -1,1196 +1,1196 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import hashlib import os import random import re import shutil import string import weakref from collections import defaultdict from datetime import datetime from glob import glob from itertools import chain, cycle from time import mktime try: from weakref import WeakSet except ImportError: from backports.weakref import WeakSet -from application.notification import IObserver, NotificationCenter +from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.system import makedirs from eventlet import api, coros, proc from sipsimple.account import AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import WavePlayer, WavePlayerError from sipsimple.conference import AudioConference from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPCoreError, SIPCoreInvalidStateError, SIPURI from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import conference from sipsimple.streams import FileTransferStream from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.streams.msrp import ChatStreamError, FileSelector from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread -from sipsimple.util import Timestamp, TimestampedNotificationData from twisted.internet import reactor from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.conference import database from sylk.applications.conference.configuration import ConferenceConfig, URL from sylk.bonjour import BonjourServices from sylk.configuration import SIPConfig, ThorNodeConfig from sylk.configuration.datatypes import ResourcePath from sylk.session import ServerSession log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) def format_identity(identity, cpim_format=False): uri = identity.uri if identity.display_name: return u'%s ' % (identity.display_name, uri.user, uri.host) elif cpim_format: return u'' % (uri.user, uri.host) else: return u'sip:%s@%s' % (uri.user, uri.host) class ScreenImage(object): def __init__(self, room, sender): self.room = weakref.ref(room) self.sender = sender self.filename = os.path.join(ConferenceConfig.screen_sharing_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10)))) from sylk.applications.conference import ConferenceApplication port = ConferenceApplication().screen_sharing_web_server.port scheme = 'https' if ConferenceConfig.screen_sharing_use_https else 'http' self.url = URL('%s://%s:%s/' % (scheme, ConferenceConfig.screen_sharing_ip, port)) self.url.query_items['image'] = os.path.join(room.uri, os.path.basename(self.filename)) self.state = None self.timer = None @property def active(self): return self.state == 'active' @property def idle(self): return self.state == 'idle' @run_in_thread('file-io') def save(self, image): makedirs(os.path.dirname(self.filename)) tmp_filename = self.filename + '.tmp' try: with open(tmp_filename, 'wb') as file: file.write(image) except EnvironmentError, e: log.msg('Cannot write screen sharing image: %s: %s' % (self.filename, e)) else: try: os.rename(tmp_filename, self.filename) except EnvironmentError: pass self.advertise() @run_in_twisted_thread def advertise(self): if self.state == 'active': self.timer.reset(10) else: if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'active' self.timer = reactor.callLater(10, self.stop_advertising) room = self.room() or Null room.dispatch_conference_info() txt = '%s is sharing the screen at %s' % (format_identity(self.sender, cpim_format=True), self.url) room.dispatch_server_message(txt) log.msg(txt) @run_in_twisted_thread def stop_advertising(self): if self.state != 'idle': if self.timer is not None and self.timer.active(): self.timer.cancel() self.state = 'idle' self.timer = None room = self.room() or Null room.dispatch_conference_info() txt = '%s stopped sharing the screen' % format_identity(self.sender, cpim_format=True) room.dispatch_server_message(txt) log.msg(txt) class Room(object): """ Object representing a conference room, it will handle the message dispatching among all the participants. """ implements(IObserver) def __init__(self, uri): self._channel = coros.queue() self.uri = uri self.identity = CPIMIdentity.parse('' % self.uri) self.identity.display_name = 'Conference Room' self.files = [] self.screen_images = {} self.sessions = [] self.sessions_with_proposals = {} self.subscriptions = [] self.transfer_handlers = WeakSet() self.state = 'stopped' self.incoming_message_queue = coros.queue() self.message_dispatcher = None self.audio_conference = None self.moh_player = None self.conference_info_payload = None self.bonjour_services = Null() self.session_nickname_map = {} self.last_nicknames_map = {} self.participants_counter = defaultdict(lambda: 0) @property def empty(self): return len(self.sessions) == 0 @property def started(self): return self.state == 'started' @property def stopping(self): return self.state in ('stopping', 'stopped') @property def active_media(self): return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams)))) def start(self): if self.started: return from sylk.applications.conference import ConferenceApplication if ConferenceApplication().bonjour_services is not Null: room_user = self.identity.uri.user self.bonjour_services = BonjourServices(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user) self.bonjour_services.start() self.message_dispatcher = proc.spawn(self._message_dispatcher) self.audio_conference = AudioConference() self.audio_conference.hold() self.moh_player = MoHPlayer(self.audio_conference) self.moh_player.initialize() self.state = 'started' @run_in_waitable_green_thread def stop(self): if not self.started: return self.state = 'stopping' self.bonjour_services.stop() self.bonjour_services = None self.incoming_message_queue.send_exception(api.GreenletExit) self.incoming_message_queue = None self.message_dispatcher.kill(proc.ProcExit) self.message_dispatcher = None self.moh_player.stop() self.moh_player = None self.audio_conference = None procs = [proc.spawn(handler.stop) for handler in self.transfer_handlers] proc.waitall(procs) [subscription.end() for subscription in self.subscriptions] wait_count = len(self.subscriptions) while wait_count > 0: notification = self._channel.wait() if notification.name == 'SIPIncomingSubscriptionDidEnd': wait_count -= 1 self.subscriptions = [] self.cleanup_files() self.conference_info_payload = None self.state = 'stopped' @run_in_thread('file-io') def cleanup_files(self): path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass path = os.path.join(ConferenceConfig.screen_sharing_dir, self.uri) try: shutil.rmtree(path) except EnvironmentError: pass def _message_dispatcher(self): """Read from self.incoming_message_queue and dispatch the messages to other participants""" while True: session, message_type, data = self.incoming_message_queue.wait() if message_type == 'message': message = data.message if message.sender.uri != session.remote_identity.uri: return - if data.timestamp is not None and isinstance(message.timestamp, Timestamp): - timestamp = datetime.fromtimestamp(mktime(message.timestamp.timetuple())) + if message.timestamp is not None: + value = message.timestamp + timestamp = datetime(value.year, value.month, value.day, value.hour, value.minute, value.second, value.microsecond, value.tzinfo) else: - timestamp = datetime.now() + timestamp = datetime.utcnow() recipient = message.recipients[0] sender = message.sender sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), sender.display_name) message.sender = sender database.async_save_message(format_identity(session.remote_identity, True), self.uri, message.body, message.content_type, unicode(message.sender), unicode(recipient), timestamp) private = len(message.recipients) == 1 and recipient.uri != self.identity.uri if private: self.dispatch_private_message(session, message) else: self.dispatch_message(session, message) elif message_type == 'composing_indication': if data.sender.uri != session.remote_identity.uri: return recipient = data.recipients[0] private = len(data.recipients) == 1 and recipient.uri != self.identity.uri if private: self.dispatch_private_iscomposing(session, data) else: self.dispatch_iscomposing(session, data) def dispatch_message(self, session, message): for s in (s for s in self.sessions if s is not session): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: pass else: try: chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers) except ChatStreamError, e: log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_private_message(self, session, message): # Private messages are delivered to all sessions matching the recipient but also to the sender, # for replication in clients recipient = message.recipients[0] for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: continue else: try: chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers) except ChatStreamError, e: log.error(u'Error dispatching private message to %s: %s' % (s.remote_identity.uri, e)) def dispatch_iscomposing(self, session, data): for s in (s for s in self.sessions if s is not session): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: pass else: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) try: chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity, recipients=[self.identity]) except ChatStreamError, e: log.error(u'Error dispatching composing indication to %s: %s' % (s.remote_identity.uri, e)) def dispatch_private_iscomposing(self, session, data): recipient_uri = data.recipients[0].uri for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri): try: chat_stream = (stream for stream in s.streams if stream.type == 'chat').next() except StopIteration: continue else: identity = CPIMIdentity.parse(format_identity(session.remote_identity, True)) try: chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity) except ChatStreamError, e: log.error(u'Error dispatching private composing indication to %s: %s' % (s.remote_identity.uri, e)) def dispatch_server_message(self, body, content_type='text/plain', exclude=None): for session in (session for session in self.sessions if session is not exclude): try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: chat_stream.send_message(body, content_type, local_identity=self.identity, recipients=[self.identity]) self_identity = format_identity(self.identity, cpim_format=True) database.async_save_message(self_identity, self.uri, body, content_type, self_identity, self_identity, datetime.now()) def dispatch_conference_info(self): data = self.build_conference_info_payload() for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'): try: subscription.push_content(conference.ConferenceDocument.content_type, data) except (SIPCoreError, SIPCoreInvalidStateError): pass def dispatch_file(self, file): sender_uri = CPIMIdentity.parse(file.sender).uri for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)): handler = OutgoingFileTransferHandler(self, uri, file) self.transfer_handlers.add(handler) handler.start() def add_session(self, session): notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) self.sessions.append(session) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] += 1 try: chat_stream = (stream for stream in session.streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) try: audio_stream = (stream for stream in session.streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) try: transfer_stream = (stream for stream in session.streams if stream.type == 'file-transfer').next() except StopIteration: pass else: if transfer_stream.direction == 'recvonly': transfer_handler = IncomingFileTransferHandler(self, session) transfer_handler.start() txt = u'%s is uploading file %s (%s)' % (format_identity(session.remote_identity, cpim_format=True), transfer_stream.file_selector.name.decode('utf-8'), self.format_file_size(transfer_stream.file_selector.size)) else: transfer_handler = OutgoingFileTransferRequestHandler(self, session) transfer_handler.start() txt = u'%s requested file %s' % (format_identity(session.remote_identity, cpim_format=True), transfer_stream.file_selector.name.decode('utf-8')) log.msg(txt) self.dispatch_server_message(txt) if len(session.streams) == 1: return welcome_handler = WelcomeHandler(self, session) welcome_handler.start() self.dispatch_conference_info() if len(self.sessions) == 1: log.msg(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, self.format_stream_types(session.streams))) else: log.msg(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, self.format_stream_types(session.streams))) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session) def remove_session(self, session): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=session) self.sessions.remove(session) self.session_nickname_map.pop(session, None) remote_uri = str(session.remote_identity.uri) self.participants_counter[remote_uri] -= 1 if self.participants_counter[remote_uri] <= 0: del self.participants_counter[remote_uri] self.last_nicknames_map.pop(remote_uri, None) try: timer = self.sessions_with_proposals.pop(session) except KeyError: pass else: if timer.active(): timer.cancel() try: chat_stream = (stream for stream in session.streams or [] if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) try: audio_stream = (stream for stream in session.streams or [] if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() try: transfer_stream = (stream for stream in session.streams if stream.type == 'file-transfer').next() except StopIteration: pass else: if len(session.streams) == 1: return self.dispatch_conference_info() log.msg(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, self.format_session_duration(session))) if not self.sessions: log.msg(u'Last participant left conference %s' % self.uri) if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session): self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session))) def terminate_sessions(self, uri): if not self.started: return for session in (session for session in self.sessions if session.remote_identity.uri == uri): session.end() def build_conference_info_payload(self): if self.conference_info_payload is None: settings = SIPSimpleSettings() conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent) host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com')) self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users()) user_count = len(self.participants_counter.keys()) self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True) users = conference.Users() for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')): try: user = (user for user in users if user.entity == str(session.remote_identity.uri)).next() except StopIteration: display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name) if self.bonjour_services is Null: user = conference.User(str(session.remote_identity.uri), display_text=display_text) else: user = conference.User(str(session._invitation.remote_contact_header.uri), display_text=display_text) user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host) screen_image = self.screen_images.get(user_uri, None) if screen_image is not None and screen_image.active: user.screen_image_url = screen_image.url users.add(user) joining_info = conference.JoiningInfo(when=session.start_time) holdable_streams = [stream for stream in session.streams if stream.hold_supported] session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams) hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected') display_text = self.session_nickname_map.get(session, session.remote_identity.display_name) endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status) for stream in session.streams: if stream.type == 'file-transfer': continue endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream))) user.add(endpoint) self.conference_info_payload.users = users if self.files: files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, file.status) for file in self.files) self.conference_info_payload.conference_description.resources = conference.Resources(files=files) return self.conference_info_payload.toxml() def handle_incoming_subscription(self, subscribe_request, data): if subscribe_request.event != 'conference': subscribe_request.reject(489) return notification_center = NotificationCenter() notification_center.add_observer(self, sender=subscribe_request) data = self.build_conference_info_payload() subscribe_request.accept(conference.ConferenceDocument.content_type, data) self.subscriptions.append(subscribe_request) def accept_proposal(self, session, streams): self.sessions_with_proposals.pop(session) session.accept_proposal(streams) def add_file(self, file): if file.status == 'INCOMPLETE': self.dispatch_server_message('%s has cancelled upload of file %s (%s)' % (file.sender, os.path.basename(file.name), self.format_file_size(file.size))) else: self.dispatch_server_message('%s has uploaded file %s (%s)' % (file.sender, os.path.basename(file.name), self.format_file_size(file.size))) self.files.append(file) self.dispatch_conference_info() if ConferenceConfig.push_file_transfer: self.dispatch_file(file) def add_screen_image(self, sender, image): sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host) screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender)) screen_image.save(image) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_AudioStreamDidTimeout(self, notification): stream = notification.sender session = stream._session log.msg(u'Audio stream for session %s timed out' % format_identity(session.remote_identity)) if session.streams == [stream]: session.end() def _NH_ChatStreamGotMessage(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session message = data.message content_type = message.content_type.lower() if content_type.startswith('text/'): self.incoming_message_queue.send((session, 'message', data)) elif content_type == 'application/blink-screensharing': self.add_screen_image(message.sender, message.body) def _NH_ChatStreamGotComposingIndication(self, notification): stream = notification.sender stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') data = notification.data session = notification.sender.session self.incoming_message_queue.send((session, 'composing_indication', data)) def _NH_ChatStreamGotNicknameRequest(self, notification): nickname = notification.data.nickname session = notification.sender.session chunk = notification.data.chunk if nickname: if nickname in self.session_nickname_map.values() and self.session_nickname_map[session] != nickname: notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use') return self.session_nickname_map[session] = nickname self.last_nicknames_map[str(session.remote_identity.uri)] = nickname else: self.session_nickname_map.pop(session, None) self.last_nicknames_map.pop(str(session.remote_identity.uri), None) notification.sender.accept_nickname(chunk) self.dispatch_conference_info() def _NH_SIPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender notification_center = NotificationCenter() notification_center.remove_observer(self, sender=subscription) if self.state == 'stopping': self._channel.send(notification) else: self.subscriptions.remove(subscription) def _NH_SIPSessionDidChangeHoldState(self, notification): session = notification.sender if notification.data.originator == 'remote': if notification.data.on_hold: log.msg(u'%s has put the audio session on hold' % format_identity(session.remote_identity)) else: log.msg(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity)) self.dispatch_conference_info() def _NH_SIPSessionGotProposal(self, notification): session = notification.sender audio_streams = [stream for stream in notification.data.streams if stream.type=='audio'] chat_streams = [stream for stream in notification.data.streams if stream.type=='chat'] if not audio_streams and not chat_streams: session.reject_proposal() return streams = [streams[0] for streams in (audio_streams, chat_streams) if streams] timer = reactor.callLater(4, self.accept_proposal, session, streams) self.sessions_with_proposals[session] = timer def _NH_SIPSessionGotRejectProposal(self, notification): session = notification.sender try: timer = self.sessions_with_proposals.pop(session) except KeyError: # If the proposal couldn't be accepted by us we will not add a timer pass else: if timer.active(): timer.cancel() def _NH_SIPSessionDidRenegotiateStreams(self, notification): notification_center = NotificationCenter() session = notification.sender streams = notification.data.streams if notification.data.action == 'add': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.add_observer(self, sender=chat_stream) log.msg(u'%s has added chat to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.add_observer(self, sender=audio_stream) log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate, 'encrypted' if audio_stream.srtp_active else 'unencrypted', audio_stream.local_rtp_address, audio_stream.local_rtp_port, audio_stream.remote_rtp_address, audio_stream.remote_rtp_port)) log.msg(u'%s has added audio to %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has added audio' % format_identity(session.remote_identity), exclude=session) welcome_handler = WelcomeHandler(self, session) welcome_handler.start(welcome_prompt=False) elif notification.data.action == 'remove': try: chat_stream = (stream for stream in streams if stream.type == 'chat').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=chat_stream) log.msg(u'%s has removed chat from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed chat' % format_identity(session.remote_identity), exclude=session) try: audio_stream = (stream for stream in streams if stream.type == 'audio').next() except StopIteration: pass else: notification_center.remove_observer(self, sender=audio_stream) try: self.audio_conference.remove(audio_stream) except ValueError: # User may hangup before getting bridged into the conference pass if len(self.audio_conference.streams) == 0: self.moh_player.pause() self.audio_conference.hold() elif len(self.audio_conference.streams) == 1: self.moh_player.play() log.msg(u'%s has removed audio from %s' % (format_identity(session.remote_identity), self.uri)) self.dispatch_server_message('%s has removed audio' % format_identity(session.remote_identity), exclude=session) if not session.streams: log.msg(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri)) session.end() self.dispatch_conference_info() def _NH_SIPSessionTransferNewIncoming(self, notification): notification.sender.reject_transfer(403) @staticmethod def format_stream_types(streams): if not streams: return '' if len(streams) == 1: txt = 'with %s' % streams[0].type else: txt = 'with %s' % ','.join(stream.type for stream in streams[:-1]) txt += ' and %s' % streams[-1:][0].type return txt @staticmethod def format_conference_stream_type(stream): if stream.type == 'chat': return 'message' return stream.type @staticmethod def format_session_duration(session): if session.start_time: duration = session.end_time - session.start_time seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1 minutes, seconds = seconds / 60, seconds % 60 hours, minutes = minutes / 60, minutes % 60 hours += duration.days*24 if not minutes and not hours: duration_text = '%d seconds' % seconds elif not hours: duration_text = '%02d:%02d' % (minutes, seconds) else: duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds) else: duration_text = '0s' return duration_text @staticmethod def format_file_size(size): infinite = float('infinity') boundaries = [( 1024, '%d bytes', 1), ( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0), ( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0), (10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)] for boundary, format, divisor in boundaries: if size < boundary: return format % (size/divisor,) else: return "%d bytes" % size class MoHPlayer(object): implements(IObserver) def __init__(self, conference): self.conference = conference self.files = None self.paused = True self._player = None def initialize(self): files = glob('%s/*.wav' % ResourcePath('sounds/moh').normalized) if not files: log.error(u'No files found, MoH is disabled') return random.shuffle(files) self.files = cycle(files) self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_play=False, volume=20) self.conference.bridge.add(self._player) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._player) def stop(self): if self._player is None: return notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._player) self._player.stop() self.conference.bridge.remove(self._player) self.conference = None def play(self): if self._player is not None and self.paused: self.paused = False self._play_next_file() log.msg(u'Started playing music on hold') def pause(self): if self._player is not None and not self.paused: self.paused = True self._player.stop() log.msg(u'Stopped playing music on hold') def _play_next_file(self): self._player.filename = self.files.next() self._player.play() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_WavePlayerDidFail(self, notification): if not self.paused: self._play_next_file() def _NH_WavePlayerDidEnd(self, notification): if not self.paused: self._play_next_file() class InterruptWelcome(Exception): pass class WelcomeHandler(object): implements(IObserver) def __init__(self, room, session): self.room = room self.session = session self.procs = proc.RunningProcSet() @run_in_green_thread def start(self, welcome_prompt=True): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) self.procs.spawn(self.play_audio_welcome, welcome_prompt) self.procs.spawn(self.render_chat_welcome, welcome_prompt) self.procs.waitall() notification_center.remove_observer(self, sender=self.session) self.session = None self.room = None def play_file_in_player(self, player, file, delay): player.filename = file player.pause_time = delay try: player.play().wait() except WavePlayerError, e: log.warning(u"Error playing file %s: %s" % (file, e)) def play_audio_welcome(self, welcome_prompt): try: audio_stream = (stream for stream in self.session.streams if stream.type == 'audio').next() except StopIteration: return try: player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_play=False, volume=50) audio_stream.bridge.add(player) if welcome_prompt: file = ResourcePath('sounds/co_welcome_conference.wav').normalized self.play_file_in_player(player, file, 1) user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(self.session.remote_identity.uri)])) if user_count == 0: file = ResourcePath('sounds/co_only_one.wav').normalized self.play_file_in_player(player, file, 0.5) elif user_count == 1: file = ResourcePath('sounds/co_there_is.wav').normalized self.play_file_in_player(player, file, 0.5) elif user_count < 100: file = ResourcePath('sounds/co_there_are.wav').normalized self.play_file_in_player(player, file, 0.2) if user_count <= 24: file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized self.play_file_in_player(player, file, 0.1) else: file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized self.play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized self.play_file_in_player(player, file, 0.1) file = ResourcePath('sounds/co_more_participants.wav').normalized self.play_file_in_player(player, file, 0) file = ResourcePath('sounds/connected_tone.wav').normalized self.play_file_in_player(player, file, 0.1) audio_stream.bridge.remove(player) except InterruptWelcome: try: audio_stream.bridge.remove(player) except ValueError: pass else: self.room.audio_conference.add(audio_stream) self.room.audio_conference.unhold() if len(self.room.audio_conference.streams) == 1: self.room.moh_player.play() else: self.room.moh_player.pause() def render_chat_welcome_prompt(self): txt = 'Welcome to the conference.' user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions) - set([str(self.session.remote_identity.uri)])) if user_count == 0: txt += ' You are the first participant in the room.' else: if user_count == 1: txt += ' There is one more participant in the room.' else: txt += ' There are %s more participants in the room.' % user_count return txt def render_chat_welcome(self, welcome_prompt): try: chat_stream = (stream for stream in self.session.streams if stream.type == 'chat').next() except StopIteration: return try: #welcome_prompt = self.render_chat_welcome_prompt() #chat_stream.send_message(welcome_prompt, 'text/plain', local_identity=self.room.identity, recipients=[self.room.identity]) remote_identity = CPIMIdentity.parse(format_identity(self.session.remote_identity, cpim_format=True)) for msg in database.get_last_messages(self.room.uri, ConferenceConfig.replay_history): recipient = CPIMIdentity.parse(msg.cpim_recipient) sender = CPIMIdentity.parse(msg.cpim_sender) if recipient.uri in (self.room.identity.uri, remote_identity.uri) or sender.uri == remote_identity.uri: chat_stream.send_message(msg.cpim_body, msg.cpim_content_type, local_identity=sender, recipients=[recipient], timestamp=msg.cpim_timestamp) except InterruptWelcome: pass def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionWillEnd(self, notification): self.procs.killall(InterruptWelcome) class RoomFile(object): def __init__(self, name, hash, size, sender, status): self.name = name self.hash = hash self.size = size self.sender = sender self.status = status @property def file_selector(self): return FileSelector.for_file(self.name.encode('utf-8'), hash=self.hash) class IncomingFileTransferHandler(object): implements(IObserver) def __init__(self, room, session): self.room = room self.session = session self.stream = (stream for stream in self.session.streams if stream.type == 'file-transfer' and stream.direction == 'recvonly').next() self.error = False self.ended = False self.file = None self.file_selector = None self.filename = None self.hash = None self.status = None self.timer = None self.transfer_finished = False def start(self): self.file_selector = self.stream.file_selector path = os.path.join(ConferenceConfig.file_transfer_dir, self.room.uri) makedirs(path) self.filename = filename = os.path.join(path, self.file_selector.name.decode('utf-8')) basename, ext = os.path.splitext(filename) i = 1 while os.path.exists(filename): filename = '%s_%d%s' % (basename, i, ext) i += 1 self.filename = filename try: self.file = open(self.filename, 'wb') except EnvironmentError: log.msg('Cannot write destination filename: %s' % self.filename) self.session.end() return notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) self.hash = hashlib.sha1() @run_in_thread('file-transfer') def write_chunk(self, data): notification_center = NotificationCenter() if data is not None: try: self.file.write(data) except EnvironmentError, e: - notification_center.post_notification('IncomingFileTransferHandlerGotError', sender=self, data=TimestampedNotificationData(error=str(e))) + notification_center.post_notification('IncomingFileTransferHandlerGotError', sender=self, data=NotificationData(error=str(e))) else: self.hash.update(data) else: self.file.close() if self.error: - notification_center.post_notification('IncomingFileTransferHandlerDidFail', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('IncomingFileTransferHandlerDidFail', sender=self) else: - notification_center.post_notification('IncomingFileTransferHandlerDidEnd', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('IncomingFileTransferHandlerDidEnd', sender=self) @run_in_thread('file-io') def remove_bogus_file(self, filename): try: os.unlink(filename) except OSError: pass @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidEnd(self, notification): self.ended = True if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) # Mark end of write operation self.write_chunk(None) def _NH_FileTransferStreamGotChunk(self, notification): self.write_chunk(notification.data.content) def _NH_FileTransferStreamDidFinish(self, notification): self.transfer_finished = True if self.timer is None: self.timer = reactor.callLater(5, self.session.end) def _NH_IncomingFileTransferHandlerGotError(self, notification): log.error('Error while handling incoming file transfer: %s' % notification.data.error) self.error = True self.status = notification.data.error if not self.ended and self.timer is None: self.timer = reactor.callLater(5, self.session.end) def _NH_IncomingFileTransferHandlerDidEnd(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self) remote_hash = self.file_selector.hash if not self.transfer_finished: log.msg('File transfer of %s cancelled' % os.path.basename(self.filename)) self.remove_bogus_file(self.filename) self.status = 'INCOMPLETE' else: local_hash = 'sha1:' + ':'.join(re.findall(r'..', self.hash.hexdigest().upper())) if local_hash != remote_hash: log.warning('Hash of transferred file does not match the remote hash (file may have changed).') self.status = 'Hash missmatch' self.remove_bogus_file(self.filename) else: self.status = 'OK' file = RoomFile(self.filename, remote_hash, self.file_selector.size, format_identity(self.session.remote_identity, cpim_format=True), self.status) self.room.add_file(file) self.session = None self.stream = None self.room = None def _NH_IncomingFileTransferHandlerDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self) file = RoomFile(self.filename, self.file_selector.hash, self.file_selector.size, format_identity(self.session.remote_identity, cpim_format=True), self.status) self.room.add_file(file) self.session = None self.stream = None self.room = None class OutgoingFileTransferRequestHandler(object): implements(IObserver) def __init__(self, room, session): self._channel = coros.queue() self.room = room self.session = session self.stream = (stream for stream in self.session.streams if stream.type == 'file-transfer').next() self.timer = None @run_in_green_thread def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) while True: notification = self._channel.wait() if notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): break if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) self.session = None self.stream = None self.room = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_FileTransferStreamDidFinish(self, notification): if self.timer is None: self.timer = reactor.callLater(2, self.session.end) def _NH_SIPSessionDidFail(self, notification): self._channel.send(notification) def _NH_SIPSessionDidEnd(self, notification): self._channel.send(notification) class InterruptFileTransfer(Exception): pass class OutgoingFileTransferHandler(object): implements(IObserver) def __init__(self, room, destination, file): self._channel = coros.queue() self.greenlet = None self.room = room self.destination = destination self.file = file self.session = None self.stream = None self.timer = None @run_in_green_thread def start(self): self.greenlet = api.getcurrent() settings = SIPSimpleSettings() account = AccountManager().default_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI.new(self.destination) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except (DNSLookupError, InterruptFileTransfer): self.greenlet = None self.room = None else: notification_center = NotificationCenter() self.session = ServerSession(account) self.stream = FileTransferStream(account, self.file.file_selector, 'sendonly') notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.stream) subject = u'File uploaded by %s' % self.file.sender from_header = FromHeader(SIPURI.new(self.room.identity.uri), u'Conference File Transfer') to_header = ToHeader(SIPURI.new(self.destination)) transport = routes[0].transport parameters = {} if transport=='udp' else {'transport': transport} contact_header = ContactHeader(SIPURI(user=self.room.identity.uri.user, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters)) extra_headers = [] if ThorNodeConfig.enabled: extra_headers.append(Header('Thor-Scope', 'conference-invitation')) originator_uri = CPIMIdentity.parse(self.file.sender).uri extra_headers.append(Header('X-Originator-From', str(originator_uri))) self.session.connect(from_header, to_header, contact_header=contact_header, routes=routes, streams=[self.stream], is_focus=True, subject=subject, extra_headers=extra_headers) try: while True: notification = self._channel.wait() if notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): break except InterruptFileTransfer: self.session.end() else: if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None self.greenlet = None notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) self.session = None self.stream = None self.room = None def stop(self): # Needs to be called from a green thread if self.greenlet is None: return api.kill(self.greenlet, InterruptFileTransfer) self.greenlet = api.getcurrent() while True: notification = self._channel.wait() if notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): break if self.timer is not None and self.timer.active(): self.timer.cancel() self.timer = None self.greenlet = None notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) self.session = None self.stream = None self.room = None def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_FileTransferStreamDidFinish(self, notification): if self.timer is None: self.timer = reactor.callLater(2, self.session.end) def _NH_SIPSessionDidStart(self, notification): self._channel.send(notification) def _NH_SIPSessionDidFail(self, notification): self._channel.send(notification) def _NH_SIPSessionDidEnd(self, notification): self._channel.send(notification) diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py index cca206c..c4a477f 100644 --- a/sylk/applications/xmppgateway/im.py +++ b/sylk/applications/xmppgateway/im.py @@ -1,459 +1,458 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.descriptor import WriteOnceAttribute from collections import deque from eventlet import coros from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Message as SIPMessageRequest from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread -from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage from sylk.extensions import ChatStream from sylk.session import ServerSession log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) __all__ = ['ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError'] SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout class ChatSessionHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self): self.started = False self.ended = False self.sip_session = None self.msrp_stream = None self._sip_session_timer = None self.use_receipts = False self.xmpp_session = None self._xmpp_message_queue = deque() self._pending_msrp_chunks = {} self._pending_xmpp_stanzas = {} def _set_started(self, value): old_value = self.__dict__.get('started', False) self.__dict__['started'] = value if not old_value and value: notification_center = NotificationCenter() - notification_center.post_notification('ChatSessionDidStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('ChatSessionDidStart', sender=self) self._send_queued_messages() def _get_started(self): return self.__dict__['started'] started = property(_get_started, _set_started) del _get_started, _set_started def _set_xmpp_session(self, session): self.__dict__['xmpp_session'] = session if session is not None: # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) notification_center = NotificationCenter() notification_center.add_observer(self, sender=session) session.start() # Reet SIP session timer in case it's active if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) def _get_xmpp_session(self): return self.__dict__['xmpp_session'] xmpp_session = property(_get_xmpp_session, _set_xmpp_session) del _get_xmpp_session, _set_xmpp_session @classmethod def new_from_sip_session(cls, sip_identity, session): instance = cls() instance.sip_identity = sip_identity instance._start_incoming_sip_session(session) return instance @classmethod def new_from_xmpp_stanza(cls, xmpp_identity, recipient): instance = cls() instance.xmpp_identity = xmpp_identity instance._start_outgoing_sip_session(recipient) return instance @run_in_green_thread def _start_incoming_sip_session(self, session): self.sip_session = session self.msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.accept([self.msrp_stream]) @run_in_green_thread def _start_outgoing_sip_session(self, target_uri): notification_center = NotificationCenter() # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = target_uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = AccountManager().default_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) - notification_center.post_notification('ChatSessionDidFail', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('ChatSessionDidFail', sender=self) return self.msrp_stream = ChatStream(account) route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self.sip_session = ServerSession(account) notification_center.add_observer(self, sender=self.sip_session) notification_center.add_observer(self, sender=self.msrp_stream) self.sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self.msrp_stream]) def end(self): if self.ended: return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.cancel() self._sip_session_timer = None notification_center = NotificationCenter() if self.sip_session is not None: notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session.end() self.sip_session = None self.msrp_stream = None if self.xmpp_session is not None: notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session.end() self.xmpp_session = None self.ended = True if self.started: - notification_center.post_notification('ChatSessionDidEnd', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('ChatSessionDidEnd', sender=self) else: - notification_center.post_notification('ChatSessionDidFail', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('ChatSessionDidFail', sender=self) def enqueue_xmpp_message(self, message): if self.started: raise RuntimeError('session is already started') self._xmpp_message_queue.append(message) def _send_queued_messages(self): if self._xmpp_message_queue: while self._xmpp_message_queue: message = self._xmpp_message_queue.popleft() if message.body is None: continue if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' sender_uri = message.sender.uri.as_sip_uri() sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8')) sender = CPIMIdentity(sender_uri) self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _inactivity_timeout(self): log.msg("Ending SIP session %s due to incactivity" % self.sip_session._invitation.call_id) self.sip_session.end() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP session %s started" % notification.sender._invitation.call_id) self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout) if self.sip_session.direction == 'outgoing': # Time to set sip_identity and create the XMPPChatSession contact_uri = self.sip_session._invitation.remote_contact_header.uri if contact_uri.parameters.get('gr') is not None: sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr')) else: tmp = self.sip_session.remote_identity.uri sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource()) self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name) session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) self.xmpp_session = session # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: if self.xmpp_session is not None: # Session is now established on both ends self.started = True # Try to wakeup XMPP clients self.xmpp_session.send_composing_indication('active') self.xmpp_session.send_message(' ', 'text/plain') else: # Try to wakeup XMPP clients sender = self.sip_identity tmp = self.sip_session.local_identity.uri recipient_uri = FrozenURI(tmp.user, tmp.host) recipient = Identity(recipient_uri) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, ' ', 'text/plain')) # Send queued messages self._send_queued_messages() def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP session %s ended" % notification.sender._invitation.call_id) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP session %s failed" % notification.sender._invitation.call_id) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.sip_session) notification_center.remove_observer(self, sender=self.msrp_stream) self.sip_session = None self.msrp_stream = None self.end() def _NH_SIPSessionGotProposal(self, notification): self.sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self.sip_session.reject_transfer(403) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.body else: html_body = message.body body = None if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) chunk = notification.data.chunk if self.started: self.xmpp_session.send_message(body, html_body, message_id=chunk.message_id) if self.use_receipts: self._pending_msrp_chunks[chunk.message_id] = chunk else: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') else: sender = self.sip_identity recipient_uri = FrozenURI.parse(message.recipients[0].uri) recipient = Identity(recipient_uri, message.recipients[0].display_name) xmpp_manager = XMPPManager() xmpp_manager.send_stanza(ChatMessage(sender, recipient, body, html_body)) self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_ChatStreamGotComposingIndication(self, notification): # Notification is sent by the MSRP stream if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) if not self.started: return state = None if notification.data.state == 'active': state = 'composing' elif notification.data.state == 'idle': state = 'paused' if state is not None: self.xmpp_session.send_composing_indication(state) def _NH_ChatStreamDidDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_ChatStreamDidNotDeliverMessage(self, notification): if self.started: message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None) if message is not None: self.xmpp_session.send_error(message, 'TODO', []) # TODO def _NH_XMPPChatSessionDidStart(self, notification): if self.sip_session is not None: # Session is now established on both ends self.started = True def _NH_XMPPChatSessionDidEnd(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.xmpp_session) self.xmpp_session = None self.end() def _NH_XMPPChatSessionGotMessage(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': self._xmpp_message_queue.append(notification.data.message) return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.use_receipts = message.use_receipt if not message.use_receipt: success_report = 'no' failure_report = 'no' else: success_report = 'yes' failure_report = 'yes' self._pending_xmpp_stanzas[message.id] = message # Prefer plaintext self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report) self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender) def _NH_XMPPChatSessionGotComposingIndication(self, notification): if self.sip_session is None or self.sip_session.state != 'connected': return if self._sip_session_timer is not None and self._sip_session_timer.active(): self._sip_session_timer.reset(SESSION_TIMEOUT) message = notification.data.message state = None if message.state == 'composing': state = 'active' elif message.state == 'paused': state = 'idle' if state is not None: sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri) self.msrp_stream.send_composing_indication(state, 30, local_identity=sender) if message.use_receipt: self.xmpp_session.send_receipt_acknowledgement(message.id) def _NH_XMPPChatSessionDidDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK') def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification): chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None) if chunk is not None: self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason) def chunks(text, size): for i in xrange(0, len(text), size): yield text[i:i+size] class SIPMessageError(Exception): def __init__(self, code, reason): Exception.__init__(self, reason) self.code = code self.reason = reason class SIPMessageSender(object): implements(IObserver) def __init__(self, message): self.from_uri = message.sender.uri.as_sip_uri() self.from_uri.parameters.pop('gr', None) # No GRUU in From header self.to_uri = message.recipient.uri.as_sip_uri() self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU self.body = message.body self.content_type = 'text/plain' self._requests = set() self._channel = coros.queue() @run_in_waitable_green_thread def send(self): lookup = DNSLookup() settings = SIPSimpleSettings() account = AccountManager().default_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = self.to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: msg = 'DNS lookup error while looking for %s proxy' % uri log.warning(msg) raise SIPMessageError(0, msg) else: route = routes.pop(0) from_header = FromHeader(self.from_uri) to_header = ToHeader(self.to_uri) route_header = RouteHeader(route.get_uri()) notification_center = NotificationCenter() for chunk in chunks(self.body, 1000): request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body) notification_center.add_observer(self, sender=request) self._requests.add(request) request.send() error = None count = len(self._requests) while count > 0: notification = self._channel.wait() if notification.name == 'SIPMessageDidFail': error = (notification.data.code, notification.data.reason) count -= 1 self._requests.clear() if error is not None: raise SIPMessageError(*error) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPMessageDidSucceed(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) self._channel.send(notification) def _NH_SIPMessageDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=notification.sender) self._channel.send(notification) diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py index 7dd36ba..4e819b0 100644 --- a/sylk/applications/xmppgateway/muc.py +++ b/sylk/applications/xmppgateway/muc.py @@ -1,251 +1,250 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os import uuid from application.notification import IObserver, NotificationCenter from application.python import Null from application.python.descriptor import WriteOnceAttribute from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI from sipsimple.core import ContactHeader, FromHeader, ToHeader from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.streams.msrp import ChatStreamError from sipsimple.streams.applications.chat import CPIMIdentity from sipsimple.threading.green import run_in_green_thread -from sipsimple.util import TimestampedNotificationData from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.xmpp import XMPPManager from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, STANZAS_NS from sylk.extensions import ChatStream from sylk.session import ServerSession log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) class X2SMucHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity, nickname): self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.nickname = nickname self._xmpp_muc_session = None self._sip_session = None self._msrp_stream = None self._first_stanza = None self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza self._participants = set() # set of (URI, nickname) tuples self.ended = False def start(self): notification_center = NotificationCenter() self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session.start() - notification_center.post_notification('X2SMucHandlerDidStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('X2SMucHandlerDidStart', sender=self) self._start_sip_session() def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_muc_session is not None: notification_center.remove_observer(self, sender=self._xmpp_muc_session) # Send indication that the user has been kicked from the room sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('307') xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) self._xmpp_muc_session.end() self._xmpp_muc_session = None if self._sip_session is not None: notification_center.remove_observer(self, sender=self._sip_session) self._sip_session.end() self._sip_session = None self.ended = True - notification_center.post_notification('X2SMucHandlerDidEnd', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('X2SMucHandlerDidEnd', sender=self) @run_in_green_thread def _start_sip_session(self): notification_center = NotificationCenter() # self.xmpp_identity is our local identity from_uri = self.xmpp_identity.uri.as_sip_uri() del from_uri.parameters['gr'] # no GRUU in From header contact_uri = self.xmpp_identity.uri.as_sip_uri() contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8')) to_uri = self.sip_identity.uri.as_sip_uri() lookup = DNSLookup() settings = SIPSimpleSettings() account = AccountManager().default_account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = to_uri try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError: log.warning('DNS lookup error while looking for %s proxy' % uri) - notification_center.post_notification('ChatSessionDidFail', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('ChatSessionDidFail', sender=self) return self._msrp_stream = ChatStream(account) route = routes.pop(0) from_header = FromHeader(from_uri) to_header = ToHeader(to_uri) contact_header = ContactHeader(contact_uri) self._sip_session = ServerSession(account) notification_center.add_observer(self, sender=self._sip_session) notification_center.add_observer(self, sender=self._msrp_stream) self._sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self._msrp_stream]) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): log.msg("SIP session (MUC) %s started" % notification.sender._invitation.call_id) if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed: self.end() return message_id = self._msrp_stream.set_local_nickname(self.nickname) self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza) self._first_stanza = None def _NH_SIPSessionDidEnd(self, notification): log.msg("SIP session (MUC) %s ended" % notification.sender._invitation.call_id) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._sip_session) notification_center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionDidFail(self, notification): log.msg("SIP session (MUC) %s failed" % notification.sender._invitation.call_id) notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._sip_session) notification_center.remove_observer(self, sender=self._msrp_stream) self._sip_session = None self._msrp_stream = None self.end() def _NH_SIPSessionGotProposal(self, notification): self._sip_session.reject_proposal() def _NH_SIPSessionTransferNewIncoming(self, notification): self._sip_session.reject_transfer(403) def _NH_SIPSessionGotConferenceInfo(self, notification): # Translate to XMPP payload xmpp_manager = XMPPManager() own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host) conference_info = notification.data.conference_info new_participants = set() for user in conference_info.users: user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity) nickname = user.display_text.value if user.display_text else user.entity new_participants.add((user_uri, nickname)) # Remove participants that are no longer in the room for uri, nickname in self._participants - new_participants: sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False) xmpp_manager.send_muc_stanza(stanza) # Send presence for current participants for uri, nickname in new_participants: if uri == own_uri: continue sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = Identity(uri) xmpp_manager.send_muc_stanza(stanza) self._participants = new_participants # Send own status last sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname)) stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True) stanza.jid = self.xmpp_identity stanza.muc_statuses.append('110') xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamGotMessage(self, notification): # Notification is sent by the MSRP stream if not self._xmpp_muc_session: return message = notification.data.message content_type = message.content_type.lower() if content_type not in ('text/plain', 'text/html'): return if content_type == 'text/plain': html_body = None body = message.body else: html_body = message.body body = None resource = message.sender.display_name or str(message.sender.uri) sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource)) self._xmpp_muc_session.send_message(sender, body, html_body, message_id=str(uuid.uuid4())) self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK') def _NH_ChatStreamDidSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) self.nickname = nickname def _NH_ChatStreamDidNotSetNickname(self, notification): # Notification is sent by the MSRP stream nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id) error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', [('conflict', STANZAS_NS)]) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(error_stanza) def _NH_ChatStreamDidDeliverMessage(self, notification): # Echo back the message to the sender stanza = self._pending_messages_map.pop(notification.data.message_id) stanza.sender, stanza.recipient = stanza.recipient, stanza.sender stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname) xmpp_manager = XMPPManager() xmpp_manager.send_muc_stanza(stanza) def _NH_ChatStreamDidNotDeliverMessage(self, notification): self._pending_messages_map.pop(notification.data.message_id) def _NH_XMPPIncomingMucSessionDidEnd(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._xmpp_muc_session) self._xmpp_muc_session = None self.end() def _NH_XMPPIncomingMucSessionGotMessage(self, notification): if not self._sip_session: return message = notification.data.message sender_uri = message.sender.uri.as_sip_uri() del sender_uri.parameters['gr'] # no GRUU in CPIM From header sender = CPIMIdentity(sender_uri, display_name=self.nickname) message_id = self._msrp_stream.send_message(message.body, 'text/plain', local_identity=sender) self._pending_messages_map[message_id] = message # Message will be echoed back to the sender on ChatStreamDidDeliverMessage def _NH_XMPPIncomingMucSessionChangedNickname(self, notification): if not self._sip_session: return nickname = notification.data.nickname try: message_id = self._msrp_stream.set_local_nickname(nickname) except ChatStreamError: return self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza) diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py index e925d79..07a2bf7 100644 --- a/sylk/applications/xmppgateway/presence.py +++ b/sylk/applications/xmppgateway/presence.py @@ -1,481 +1,480 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import hashlib import os import random import urllib from application.notification import IObserver, NotificationCenter from application.python import Null, limit from application.python.descriptor import WriteOnceAttribute from eventlet import coros, proc from sipsimple.account import AccountManager from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SIPURI, SIPCoreError from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader from sipsimple.core import Subscription from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import pidf, rpid from sipsimple.payloads import ParserError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread -from sipsimple.util import TimestampedNotificationData from time import time from twisted.internet import reactor from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscription, XMPPIncomingSubscription from sylk.configuration import SIPConfig log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) __all__ = ['S2XPresenceHandler', 'X2SPresenceHandler'] class S2XPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self._sip_subscriptions = [] self._stanza_cache = {} self._pidf = None self._xmpp_subscription = None self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() - notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None while self._sip_subscriptions: subscription = self._sip_subscriptions.pop() notification_center.remove_observer(self, sender=subscription) try: subscription.end() except SIPCoreError: pass self.ended = True - notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self) def add_sip_subscription(self, subscription): self._sip_subscriptions.append(subscription) notification_center = NotificationCenter() notification_center.add_observer(self, sender=subscription) if self._xmpp_subscription.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None subscription.accept(content_type, pidf_doc) else: subscription.accept_pending() def _build_pidf(self): if not self._stanza_cache: self._pidf = None return None pidf_doc = pidf.PIDF(str(self.xmpp_identity)) uri = self._stanza_cache.iterkeys().next() person = pidf.Person("ID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()) person.activities = rpid.Activities() pidf_doc.add(person) for stanza in self._stanza_cache.itervalues(): if not stanza.available: status = pidf.Status('closed') status.extended = 'offline' else: status = pidf.Status('open') if stanza.show == 'away': status.extended = 'away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'xa': status.extended = 'extended-away' if 'away' not in person.activities: person.activities.add('away') elif stanza.show == 'dnd': stanza.extended = 'busy' if 'busy' not in person.activities: person.activities.add('busy') else: stanza.extended = 'available' resource = encode_resource(stanza.sender.uri.resource) tuple_id = "ID-%s" % resource sip_uri = stanza.sender.uri.as_sip_uri() sip_uri.parameters['gr'] = resource contact = pidf.Contact(str(sip_uri)) tuple = pidf.Service(tuple_id, status=status, contact=contact) tuple.add(pidf.DeviceID(resource)) tuple.device_info = pidf.DeviceInfo(resource, description=urllib.quote(stanza.sender.uri.resource.encode('utf-8'))) for lang, note in stanza.statuses.iteritems(): tuple.notes.add(pidf.PIDFNote(note, lang=lang)) pidf_doc.add(tuple) if not person.activities: person.activities = None self._pidf = pidf_doc.toxml() return self._pidf def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) @run_in_twisted_thread def _NH_SIPIncomingSubscriptionDidEnd(self, notification): notification_center = NotificationCenter() subscription = notification.sender notification_center.remove_observer(self, sender=subscription) self._sip_subscriptions.remove(subscription) if not self._sip_subscriptions: self.end() def _NH_XMPPSubscriptionChangedState(self, notification): if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active': pidf_doc = self._pidf content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'): subscription.accept(content_type, pidf_doc) def _NH_XMPPSubscriptionGotNotify(self, notification): stanza = notification.data.presence self._stanza_cache[stanza.sender.uri] = stanza pidf_doc = self._build_pidf() for subscription in self._sip_subscriptions: try: subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc) except SIPCoreError: pass if not stanza.available: # Only inform once about this device being unavailable del self._stanza_cache[stanza.sender.uri] def _NH_XMPPSubscriptionDidFail(self, notification): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription = None self.end() _NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class SubscriptionError(Exception): def __init__(self, error, timeout, refresh_interval=None, fatal=False): self.error = error self.refresh_interval = refresh_interval self.timeout = timeout self.fatal = fatal class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class X2SPresenceHandler(object): implements(IObserver) sip_identity = WriteOnceAttribute() xmpp_identity = WriteOnceAttribute() def __init__(self, sip_identity, xmpp_identity): self.ended = False self.sip_identity = sip_identity self.xmpp_identity = xmpp_identity self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._sip_subscription = None self._sip_subscription_proc = None self._sip_subscription_timer = None self._xmpp_subscription = None def start(self): notification_center = NotificationCenter() self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity) notification_center.add_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.start() self._command_proc = proc.spawn(self._run) self._subscribe_sip() - notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self) def end(self): if self.ended: return notification_center = NotificationCenter() if self._xmpp_subscription is not None: notification_center.remove_observer(self, sender=self._xmpp_subscription) self._xmpp_subscription.end() self._xmpp_subscription = None if self._sip_subscription: self._unsubscribe_sip() self.ended = True - notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self) @run_in_green_thread def _subscribe_sip(self): command = Command('subscribe') self._command_channel.send(command) @run_in_green_thread def _unsubscribe_sip(self): command = Command('unsubscribe') self._command_channel.send(command) command.wait() self._command_proc.kill() self._command_proc = None def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_subscribe(self, command): if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._sip_subscription_timer is not None and self._sip_subscription_timer.active(): self._sip_subscription_timer.cancel() self._sip_subscription_timer = None if self._sip_subscription_proc is not None: subscription_proc = self._sip_subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._sip_subscription_proc = None command.signal() def _process_pidf(self, body): try: pidf_doc = pidf.PIDF.parse(body) except ParserError, e: log.warn('Error parsing PIDF document: %s' % e) return # Build XML stanzas out of PIDF documents try: person = (p for p in pidf_doc.persons).next() except StopIteration: person = None for service in pidf_doc.services: sip_contact = self.sip_identity.uri.as_sip_uri() if service.device_info is not None: sip_contact.parameters['gr'] = service.device_info.id else: sip_contact.parameters['gr'] = service.id # TODO: pseudorandom thing with AoR? sender = Identity(FrozenURI.parse(sip_contact)) if service.status.extended is not None: available = service.status.extended != 'offline' else: available = service.status.basic == 'open' stanza = AvailabilityPresence(sender, self.xmpp_identity, available) for note in service.notes: stanza.statuses[note.lang] = note if service.status.extended is not None: if service.status.extended == 'away': stanza.show = 'away' elif service.status.extended == 'extended-away': stanza.show = 'xa' elif service.status.extended == 'busy': stanza.show = 'dnd' elif person is not None and person.activities is not None: activities = set(list(person.activities)) if 'away' in activities: stanza.show = 'away' elif set(('holiday', 'vacation')).intersection(activities): stanza.show = 'xa' elif 'busy' in activities: stanza.show = 'dnd' self._xmpp_subscription.send_presence(stanza) def _sip_subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() account = AccountManager().default_account refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval try: # Lookup routes if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) else: uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters) subscription_uri = self.sip_identity.uri.as_sip_uri() subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()), ToHeader(subscription_uri), ContactHeader(contact_uri), 'presence', RouteHeader(route.get_uri()), refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) raise SubscriptionError(error='Internal error', timeout=5) self._sip_subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail, e: notification_center.remove_observer(self, sender=subscription) self._sip_subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120)) elif e.data.code == 403: # Forbidden raise SubscriptionError(error='Forbidden', timeout=None, fatal=True) elif e.data.code == 423: # Get the value of the Min-Expires header if e.data.min_expires is not None and e.data.min_expires > refresh_interval: interval = e.data.min_expires else: interval = None raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval) elif e.data.code in (405, 406, 489): raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True) elif e.data.code == 1400: raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True) else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, give up raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._sip_subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'presence': subscription_state = notification.data.headers.get('Subscription-State').state if subscription_state == 'active' and self._xmpp_subscription.state != 'active': self._xmpp_subscription.accept() elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active': # The state went from active to pending, hide the presence state? pass if notification.data.body: self._process_pidf(notification.data.body) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail, e: if e.data.code == 0 and e.data.reason == 'rejected': self._xmpp_subscription.reject() else: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._sip_subscription) except InterruptSubscription, e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: notification_center.remove_observer(self, sender=self._sip_subscription) try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription, e: if not self.subscribed: command.signal(e) if self._sip_subscription is not None: try: self._sip_subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._sip_subscription) except SubscriptionError, e: if not e.fatal: self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval)) finally: self.subscribed = False self._sip_subscription = None self._sip_subscription_proc = None reactor.callLater(0, self.end) @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification): self.end() def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification): if self._sip_subscription.state.lower() == 'active': self._xmpp_subscription.accept() _NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe diff --git a/sylk/applications/xmppgateway/xmpp/__init__.py b/sylk/applications/xmppgateway/xmpp/__init__.py index 8015062..660826f 100644 --- a/sylk/applications/xmppgateway/xmpp/__init__.py +++ b/sylk/applications/xmppgateway/xmpp/__init__.py @@ -1,304 +1,304 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # import os -from application.notification import IObserver, NotificationCenter +from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from datetime import datetime -from sipsimple.util import Timestamp, TimestampedNotificationData +from sipsimple.util import ISOTimestamp from twisted.internet import reactor from twisted.words.protocols.jabber.jid import internJID as JID from wokkel.component import InternalComponent, Router as _Router from wokkel.server import ServerService, XMPPS2SServerFactory, DeferredS2SClientFactory from zope.interface import implements from sylk.applications import ApplicationLogger from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig from sylk.applications.xmppgateway.datatypes import FrozenURI from sylk.applications.xmppgateway.xmpp.logger import Logger as XMPPLogger from sylk.applications.xmppgateway.xmpp.protocols import MessageProtocol, MUCProtocol, PresenceProtocol from sylk.applications.xmppgateway.xmpp.session import XMPPChatSessionManager, XMPPMucSessionManager from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscriptionManager log = ApplicationLogger(os.path.dirname(__file__).split(os.path.sep)[-1]) xmpp_logger = XMPPLogger() # Utility classes class Router(_Router): def route(self, stanza): """ Route a stanza. (subclassed to avoid vebose logging) @param stanza: The stanza to be routed. @type stanza: L{domish.Element}. """ destination = JID(stanza['to']) if destination.host in self.routes: self.routes[destination.host].send(stanza) else: self.routes[None].send(stanza) class XMPPS2SServerFactory(XMPPS2SServerFactory): def onConnectionMade(self, xs): super(self.__class__, self).onConnectionMade(xs) def logDataIn(buf): buf = buf.strip() if buf: - xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) + xmpp_logger.msg("RECEIVED", ISOTimestamp.now(), buf) def logDataOut(buf): buf = buf.strip() if buf: - xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) + xmpp_logger.msg("SENDING", ISOTimestamp.now(), buf) if XMPPGatewayConfig.trace_xmpp: xs.rawDataInFn = logDataIn xs.rawDataOutFn = logDataOut class DeferredS2SClientFactory(DeferredS2SClientFactory): def onConnectionMade(self, xs): super(self.__class__, self).onConnectionMade(xs) def logDataIn(buf): buf = buf.strip() if buf: - xmpp_logger.msg("RECEIVED", Timestamp(datetime.now()), buf) + xmpp_logger.msg("RECEIVED", ISOTimestamp.now(), buf) def logDataOut(buf): buf = buf.strip() if buf: - xmpp_logger.msg("SENDING", Timestamp(datetime.now()), buf) + xmpp_logger.msg("SENDING", ISOTimestamp.now(), buf) if XMPPGatewayConfig.trace_xmpp: xs.rawDataInFn = logDataIn xs.rawDataOutFn = logDataOut # Patch Wokkel's DeferredS2SClientFactory to use our logger import wokkel.server wokkel.server.DeferredS2SClientFactory = DeferredS2SClientFactory del wokkel.server # Manager class XMPPManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): config = XMPPGatewayConfig self.stopped = False self.domains = config.domains self.muc_domains = ['%s.%s' % (config.muc_prefix, domain) for domain in self.domains] router = Router() self._server_service = ServerService(router) self._server_service.domains = set(self.domains+self.muc_domains) self._server_service.logTraffic = False # done manually self._s2s_factory = XMPPS2SServerFactory(self._server_service) self._s2s_factory.logTraffic = False # done manually self._internal_component = InternalComponent(router) self._internal_component.domains = set(self.domains) self._message_protocol = MessageProtocol() self._message_protocol.setHandlerParent(self._internal_component) self._presence_protocol = PresenceProtocol() self._presence_protocol.setHandlerParent(self._internal_component) self._muc_component = InternalComponent(router) self._muc_component.domains = set(self.muc_domains) self._muc_protocol = MUCProtocol() self._muc_protocol.setHandlerParent(self._muc_component) self._s2s_listener = None self.chat_session_manager = XMPPChatSessionManager() self.muc_session_manager = XMPPMucSessionManager() self.subscription_manager = XMPPSubscriptionManager() def start(self): self.stopped = False xmpp_logger.start() config = XMPPGatewayConfig self._s2s_listener = reactor.listenTCP(config.local_port, self._s2s_factory, interface=config.local_ip) listen_address = self._s2s_listener.getHost() log.msg("XMPP listener started on %s:%d" % (listen_address.host, listen_address.port)) self.chat_session_manager.start() self.muc_session_manager.start() self.subscription_manager.start() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self._internal_component) notification_center.add_observer(self, sender=self._muc_component) self._internal_component.startService() self._muc_component.startService() def stop(self): self.stopped = True self._s2s_listener.stopListening() self.subscription_manager.stop() self.muc_session_manager.stop() self.chat_session_manager.stop() self._internal_component.stopService() self._muc_component.stopService() notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self._internal_component) notification_center.remove_observer(self, sender=self._muc_component) xmpp_logger.stop() def send_stanza(self, stanza): if self.stopped: return self._internal_component.send(stanza.to_xml_element()) def send_muc_stanza(self, stanza): if self.stopped: return self._muc_component.send(stanza.to_xml_element()) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Process message stanzas def _NH_XMPPGotChatMessage(self, notification): message = notification.data.message try: session = self.chat_session_manager.sessions[(message.recipient.uri, message.sender.uri)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotChatMessage', sender=self, data=notification.data) else: session.channel.send(message) def _NH_XMPPGotNormalMessage(self, notification): notification_center = NotificationCenter() notification_center.post_notification('XMPPGotNormalMessage', sender=self, data=notification.data) def _NH_XMPPGotComposingIndication(self, notification): notification_center = NotificationCenter() composing_indication = notification.data.composing_indication try: session = self.chat_session_manager.sessions[(composing_indication.recipient.uri, composing_indication.sender.uri)] except KeyError: notification_center.post_notification('XMPPGotComposingIndication', sender=self, data=notification.data) else: session.channel.send(composing_indication) def _NH_XMPPGotErrorMessage(self, notification): error_message = notification.data.error_message try: session = self.chat_session_manager.sessions[(error_message.recipient.uri, error_message.sender.uri)] except KeyError: notification_center = NotificationCenter() notification_center.post_notification('XMPPGotErrorMessage', sender=self, data=notification.data) else: session.channel.send(error_message) def _NH_XMPPGotReceipt(self, notification): receipt = notification.data.receipt try: session = self.chat_session_manager.sessions[(receipt.recipient.uri, receipt.sender.uri)] except KeyError: pass else: session.channel.send(receipt) # Process presence stanzas def _NH_XMPPGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: # Ignore incoming presence stanzas if there is no subscription pass else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceSubscriptionStatus(self, notification): stanza = notification.data.presence_stanza if stanza.sender.uri.resource is not None or stanza.recipient.uri.resource is not None: # Skip directed presence return if stanza.type in ('subscribed', 'unsubscribed'): try: subscription = self.subscription_manager.outgoing_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: pass else: subscription.channel.send(stanza) elif stanza.type in ('subscribe', 'unsubscribe'): try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, stanza.sender.uri)] except KeyError: if stanza.type == 'subscribe': notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza)) else: subscription.channel.send(stanza) def _NH_XMPPGotPresenceProbe(self, notification): stanza = notification.data.presence_stanza if stanza.recipient.uri.resource is not None: # Skip directed presence return sender_uri = stanza.sender.uri sender_uri_bare = FrozenURI(sender_uri.user, sender_uri.host) try: subscription = self.subscription_manager.incoming_subscriptions[(stanza.recipient.uri, sender_uri_bare)] except KeyError: notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + notification_center.post_notification('XMPPGotPresenceSubscriptionRequest', sender=self, data=NotificationData(stanza=stanza)) else: subscription.channel.send(stanza) # Process muc stanzas def _NH_XMPPMucGotGroupChat(self, notification): message = notification.data.message muc_uri = FrozenURI(message.recipient.uri.user, message.recipient.uri.host) try: session = self.muc_session_manager.incoming[(muc_uri, message.sender.uri)] except KeyError: # Ignore groupchat messages if there was no session created pass else: session.channel.send(message) def _NH_XMPPMucGotPresenceAvailability(self, notification): stanza = notification.data.presence_stanza if not stanza.sender.uri.resource: return muc_uri = FrozenURI(stanza.recipient.uri.user, stanza.recipient.uri.host) try: session = self.muc_session_manager.incoming[(muc_uri, stanza.sender.uri)] except KeyError: notification_center = NotificationCenter() if stanza.available: - notification_center.post_notification('XMPPGotMucJoinRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + notification_center.post_notification('XMPPGotMucJoinRequest', sender=self, data=NotificationData(stanza=stanza)) else: - notification_center.post_notification('XMPPGotMucLeaveRequest', sender=self, data=TimestampedNotificationData(stanza=stanza)) + notification_center.post_notification('XMPPGotMucLeaveRequest', sender=self, data=NotificationData(stanza=stanza)) else: session.channel.send(stanza) diff --git a/sylk/applications/xmppgateway/xmpp/protocols.py b/sylk/applications/xmppgateway/xmpp/protocols.py index 4fb1a0e..cff0ef6 100644 --- a/sylk/applications/xmppgateway/xmpp/protocols.py +++ b/sylk/applications/xmppgateway/xmpp/protocols.py @@ -1,193 +1,192 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # -from application.notification import NotificationCenter -from sipsimple.util import TimestampedNotificationData +from application.notification import NotificationCenter, NotificationData from wokkel.muc import UserPresence from wokkel.xmppim import BasePresenceProtocol, MessageProtocol, PresenceProtocol from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI from sylk.applications.xmppgateway.xmpp.stanzas import RECEIPTS_NS, CHATSTATES_NS, ErrorStanza, \ NormalMessage, MessageReceipt, ChatMessage, ChatComposingIndication, \ AvailabilityPresence, SubscriptionPresence, ProbePresence, \ MUCAvailabilityPresence, GroupChatMessage \ __all__ = ['MessageProtocol', 'MUCProtocol', 'PresenceProtocol'] class MessageProtocol(MessageProtocol): messageTypes = None, 'normal', 'chat', 'headline', 'groupchat', 'error' def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType not in self.messageTypes: message["type"] = 'normal' self.onMessage(message) def onMessage(self, msg): notification_center = NotificationCenter() sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) type = msg.getAttribute('type') if type == 'error': error_type = msg.error['type'] conditions = [(child.name, child.defaultUri) for child in msg.error.children] error_message = ErrorStanza('message', sender, recipient, error_type, conditions, id=msg.getAttribute('id', None)) - notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=TimestampedNotificationData(error_message=error_message)) + notification_center.post_notification('XMPPGotErrorMessage', sender=self.parent, data=NotificationData(error_message=error_message)) return if type in (None, 'normal', 'chat') and msg.body is not None or msg.html is not None: body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) use_receipt = msg.request is not None and msg.request.defaultUri == RECEIPTS_NS if type == 'chat': message = ChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None), use_receipt=use_receipt) - notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) + notification_center.post_notification('XMPPGotChatMessage', sender=self.parent, data=NotificationData(message=message)) else: message = NormalMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None), use_receipt=use_receipt) - notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=TimestampedNotificationData(message=message)) + notification_center.post_notification('XMPPGotNormalMessage', sender=self.parent, data=NotificationData(message=message)) return if type == 'chat' and msg.body is None and msg.html is None: # Check if it's a composing indication for state in ('active', 'inactive', 'composing', 'paused', 'gone'): state_obj = getattr(msg, state, None) if state_obj is not None and state_obj.defaultUri == CHATSTATES_NS: composing_indication = ChatComposingIndication(sender, recipient, state, id=msg.getAttribute('id', None)) - notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=TimestampedNotificationData(composing_indication=composing_indication)) + notification_center.post_notification('XMPPGotComposingIndication', sender=self.parent, data=NotificationData(composing_indication=composing_indication)) return # Check if it's a receipt acknowledgement if msg.body is None and msg.html is None and msg.received is not None and msg.received.defaultUri == RECEIPTS_NS: receipt_id = msg.getAttribute('id', None) if receipt_id is not None: receipt = MessageReceipt(sender, recipient, receipt_id) - notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=TimestampedNotificationData(receipt=receipt)) + notification_center.post_notification('XMPPGotReceipt', sender=self.parent, data=NotificationData(receipt=receipt)) class PresenceProtocol(PresenceProtocol): def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') show = stanza.show statuses = stanza.statuses presence_stanza = AvailabilityPresence(sender, recipient, available=True, show=show, statuses=statuses, id=id) notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = AvailabilityPresence(sender, recipient, available=False, id=id) notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + notification_center.post_notification('XMPPGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def _process_subscription_stanza(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') type = stanza.element.getAttribute('type') presence_stanza = SubscriptionPresence(sender, recipient, type, id=id) notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + notification_center.post_notification('XMPPGotPresenceSubscriptionStatus', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def subscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribedReceived(self, stanza): self._process_subscription_stanza(stanza) def subscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def unsubscribeReceived(self, stanza): self._process_subscription_stanza(stanza) def probeReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = ProbePresence(sender, recipient, id=id) notification_center = NotificationCenter() - notification_center.post_notification('XMPPGotPresenceProbe', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + notification_center.post_notification('XMPPGotPresenceProbe', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) class MUCProtocol(BasePresenceProtocol): messageTypes = None, 'normal', 'chat', 'groupchat' presenceTypeParserMap = {'available': UserPresence, 'unavailable': UserPresence} def connectionInitialized(self): BasePresenceProtocol.connectionInitialized(self) self.xmlstream.addObserver('/message', self._onMessage) def _onMessage(self, message): if message.handled: return messageType = message.getAttribute("type") if messageType == 'error': return if messageType not in self.messageTypes: message['type'] = 'normal' if messageType == 'groupchat': self.onGroupChat(message) else: # TODO: give error, private messages not supported pass def onGroupChat(self, msg): sender_uri = FrozenURI.parse('xmpp:'+msg['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+msg['to']) recipient = Identity(recipient_uri) body = None html_body = None if msg.html is not None: html_body = msg.html.toXml() if msg.body is not None: body = unicode(msg.body) message = GroupChatMessage(sender, recipient, body, html_body, id=msg.getAttribute('id', None)) notification_center = NotificationCenter() - notification_center.post_notification('XMPPMucGotGroupChat', sender=self.parent, data=TimestampedNotificationData(message=message)) + notification_center.post_notification('XMPPMucGotGroupChat', sender=self.parent, data=NotificationData(message=message)) def availableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=True, id=id) notification_center = NotificationCenter() - notification_center.post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + notification_center.post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) def unavailableReceived(self, stanza): sender_uri = FrozenURI.parse('xmpp:'+stanza.element['from']) sender = Identity(sender_uri) recipient_uri = FrozenURI.parse('xmpp:'+stanza.element['to']) recipient = Identity(recipient_uri) id = stanza.element.getAttribute('id') presence_stanza = MUCAvailabilityPresence(sender, recipient, available=False, id=id) notification_center = NotificationCenter() - notification_center.post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=TimestampedNotificationData(presence_stanza=presence_stanza)) + notification_center.post_notification('XMPPMucGotPresenceAvailability', sender=self.parent, data=NotificationData(presence_stanza=presence_stanza)) diff --git a/sylk/applications/xmppgateway/xmpp/session.py b/sylk/applications/xmppgateway/xmpp/session.py index cd77dd6..d20d22f 100644 --- a/sylk/applications/xmppgateway/xmpp/session.py +++ b/sylk/applications/xmppgateway/xmpp/session.py @@ -1,224 +1,223 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # -from application.notification import IObserver, NotificationCenter +from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from application.python.types import Singleton from eventlet import coros, proc -from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from zope.interface import implements from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage, ChatComposingIndication, MessageReceipt, ErrorStanza, GroupChatMessage, MUCAvailabilityPresence __all__ = ['XMPPChatSession', 'XMPPChatSessionManager', 'XMPPIncomingMucSession', 'XMPPMucSessionManager'] # Chat sessions class XMPPChatSession(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.pending_receipts = {} self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def start(self): notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPChatSessionDidStart', sender=self) self._proc = proc.spawn(self._run) self.state = 'started' def end(self): self.send_composing_indication('gone') self._clear_pending_receipts() self._proc.kill() self._proc = None notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def send_message(self, body, html_body, message_id=None, use_receipt=True): message = ChatMessage(self.local_identity, self.remote_identity, body, html_body, id=message_id, use_receipt=use_receipt) self.xmpp_manager.send_stanza(message) if message_id is not None: timer = reactor.callLater(30, self._receipt_timer_expired, message_id) self.pending_receipts[message_id] = timer notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) + notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=NotificationData(message=message)) def send_composing_indication(self, state, message_id=None, use_receipt=False): message = ChatComposingIndication(self.local_identity, self.remote_identity, state, id=message_id, use_receipt=use_receipt) self.xmpp_manager.send_stanza(message) if message_id is not None: timer = reactor.callLater(30, self._receipt_timer_expired, message_id) self.pending_receipts[message_id] = timer notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=TimestampedNotificationData(message=message)) + notification_center.post_notification('XMPPChatSessionDidSendMessage', sender=self, data=NotificationData(message=message)) def send_receipt_acknowledgement(self, receipt_id): message = MessageReceipt(self.local_identity, self.remote_identity, receipt_id) self.xmpp_manager.send_stanza(message) def send_error(self, stanza, error_type, conditions): message = ErrorStanza.from_stanza(stanza, error_type, conditions) self.xmpp_manager.send_stanza(message) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, ChatMessage): - notification_center.post_notification('XMPPChatSessionGotMessage', sender=self, data=TimestampedNotificationData(message=item)) + notification_center.post_notification('XMPPChatSessionGotMessage', sender=self, data=NotificationData(message=item)) elif isinstance(item, ChatComposingIndication): if item.state == 'gone': self._clear_pending_receipts() - notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='remote')) + notification_center.post_notification('XMPPChatSessionDidEnd', sender=self, data=NotificationData(originator='remote')) self.state = 'terminated' break else: - notification_center.post_notification('XMPPChatSessionGotComposingIndication', sender=self, data=TimestampedNotificationData(message=item)) + notification_center.post_notification('XMPPChatSessionGotComposingIndication', sender=self, data=NotificationData(message=item)) elif isinstance(item, MessageReceipt): if item.receipt_id in self.pending_receipts: timer = self.pending_receipts.pop(item.receipt_id) timer.cancel() - notification_center.post_notification('XMPPChatSessionDidDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.receipt_id)) + notification_center.post_notification('XMPPChatSessionDidDeliverMessage', sender=self, data=NotificationData(message_id=item.receipt_id)) elif isinstance(item, ErrorStanza): if item.id in self.pending_receipts: timer = self.pending_receipts.pop(item.id) timer.cancel() # TODO: translate cause - notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=item.id, code=503, reason='Service Unavailable')) + notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=item.id, code=503, reason='Service Unavailable')) self._proc = None def _receipt_timer_expired(self, message_id): self.pending_receipts.pop(message_id) notification_center = NotificationCenter() - notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) + notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=message_id, code=408, reason='Timeout')) def _clear_pending_receipts(self): notification_center = NotificationCenter() while self.pending_receipts: message_id, timer = self.pending_receipts.popitem() timer.cancel() - notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=TimestampedNotificationData(message_id=message_id, code=408, reason='Timeout')) + notification_center.post_notification('XMPPChatSessionDidNotDeliverMessage', sender=self, data=NotificationData(message_id=message_id, code=408, reason='Timeout')) class XMPPChatSessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.sessions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPChatSessionDidStart') notification_center.add_observer(self, name='XMPPChatSessionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPChatSessionDidStart') notification_center.remove_observer(self, name='XMPPChatSessionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPChatSessionDidStart(self, notification): session = notification.sender self.sessions[(session.local_identity.uri, session.remote_identity.uri)] = session def _NH_XMPPChatSessionDidEnd(self, notification): session = notification.sender del self.sessions[(session.local_identity.uri, session.remote_identity.uri)] # MUC sessions class XMPPIncomingMucSession(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def start(self): notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingMucSessionDidStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPIncomingMucSessionDidStart', sender=self) self._proc = proc.spawn(self._run) self.state = 'started' def end(self): self._proc.kill() self._proc = None notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def send_message(self, sender, body, html_body, message_id=None): # TODO: timestamp? message = GroupChatMessage(sender, self.remote_identity, body, html_body, id=message_id) self.xmpp_manager.send_muc_stanza(message) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, GroupChatMessage): - notification_center.post_notification('XMPPIncomingMucSessionGotMessage', sender=self, data=TimestampedNotificationData(message=item)) + notification_center.post_notification('XMPPIncomingMucSessionGotMessage', sender=self, data=NotificationData(message=item)) elif isinstance(item, MUCAvailabilityPresence): if item.available: nickname = item.recipient.uri.resource - notification_center.post_notification('XMPPIncomingMucSessionChangedNickname', sender=self, data=TimestampedNotificationData(stanza=item, nickname=nickname)) + notification_center.post_notification('XMPPIncomingMucSessionChangedNickname', sender=self, data=NotificationData(stanza=item, nickname=nickname)) else: - notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + notification_center.post_notification('XMPPIncomingMucSessionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' break self._proc = None class XMPPMucSessionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.incoming = {} self.outgoing = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPIncomingMucSessionDidStart') notification_center.add_observer(self, name='XMPPIncomingMucSessionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidStart') notification_center.remove_observer(self, name='XMPPIncomingMucSessionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPIncomingMucSessionDidStart(self, notification): muc = notification.sender self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)] = muc def _NH_XMPPIncomingMucSessionDidEnd(self, notification): muc = notification.sender del self.incoming[(muc.local_identity.uri, muc.remote_identity.uri)] diff --git a/sylk/applications/xmppgateway/xmpp/subscription.py b/sylk/applications/xmppgateway/xmpp/subscription.py index 5821b48..a4e2695 100644 --- a/sylk/applications/xmppgateway/xmpp/subscription.py +++ b/sylk/applications/xmppgateway/xmpp/subscription.py @@ -1,207 +1,206 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # -from application.notification import IObserver, NotificationCenter +from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.descriptor import WriteOnceAttribute from application.python.types import Singleton from eventlet import coros, proc -from sipsimple.util import TimestampedNotificationData from zope.interface import implements from sylk.applications.xmppgateway.xmpp.stanzas import SubscriptionPresence, ProbePresence, AvailabilityPresence __all__ = ['XMPPSubscription', 'XMPPIncomingSubscription', 'XMPPSubscriptionManager'] class XMPPSubscription(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def _set_state(self, new_state): prev_state = self.__dict__.get('state', None) self.__dict__['state'] = new_state if prev_state != new_state: notification_center = NotificationCenter() - notification_center.post_notification('XMPPSubscriptionChangedState', sender=self, data=TimestampedNotificationData(prev_state=prev_state, state=new_state)) + notification_center.post_notification('XMPPSubscriptionChangedState', sender=self, data=NotificationData(prev_state=prev_state, state=new_state)) def _get_state(self): return self.__dict__['state'] state = property(_get_state, _set_state) del _get_state, _set_state def start(self): notification_center = NotificationCenter() - notification_center.post_notification('XMPPSubscriptionDidStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPSubscriptionDidStart', sender=self) self._proc = proc.spawn(self._run) self.subscribe() def end(self): if self.state == 'terminated': return self._proc.kill() self._proc = None notification_center = NotificationCenter() - notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) self.state = 'terminated' def subscribe(self): self.state = 'subscribe_sent' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribe') self.xmpp_manager.send_stanza(stanza) # If we are already subscribed we may not receive an answer, send a probe just in case self._send_probe() def unsubscribe(self): self.state = 'unsubscribe_sent' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribe') self.xmpp_manager.send_stanza(stanza) def _send_probe(self): self.state = 'subscribe_sent' stanza = ProbePresence(self.local_identity, self.remote_identity) self.xmpp_manager.send_stanza(stanza) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, AvailabilityPresence): if self.state == 'subscribe_sent': self.state == 'active' - notification_center.post_notification('XMPPSubscriptionGotNotify', sender=self, data=TimestampedNotificationData(presence=item)) + notification_center.post_notification('XMPPSubscriptionGotNotify', sender=self, data=NotificationData(presence=item)) elif isinstance(item, SubscriptionPresence): if self.state == 'subscribe_sent' and item.type == 'subscribed': self.state = 'active' elif item.type == 'unsubscribed': prev_state = self.state self.state = 'terminated' if prev_state in ('active', 'unsubscribe_sent'): - notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPSubscriptionDidEnd', sender=self) else: - notification_center.post_notification('XMPPSubscriptionDidFail', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPSubscriptionDidFail', sender=self) break self._proc = None class XMPPIncomingSubscription(object): local_identity = WriteOnceAttribute() remote_identity = WriteOnceAttribute() def __init__(self, local_identity, remote_identity): self.local_identity = local_identity self.remote_identity = remote_identity self.state = None self.channel = coros.queue() self._proc = None from sylk.applications.xmppgateway.xmpp import XMPPManager self.xmpp_manager = XMPPManager() def _set_state(self, new_state): prev_state = self.__dict__.get('state', None) self.__dict__['state'] = new_state if prev_state != new_state: notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionChangedState', sender=self, data=TimestampedNotificationData(prev_state=prev_state, state=new_state)) + notification_center.post_notification('XMPPIncomingSubscriptionChangedState', sender=self, data=NotificationData(prev_state=prev_state, state=new_state)) def _get_state(self): return self.__dict__['state'] state = property(_get_state, _set_state) del _get_state, _set_state def start(self): notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionDidStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPIncomingSubscriptionDidStart', sender=self) self._proc = proc.spawn(self._run) def end(self): if self.state == 'terminated': return self.state = 'terminated' self._proc.kill() self._proc = None notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) def accept(self): self.state = 'active' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'subscribed') self.xmpp_manager.send_stanza(stanza) def reject(self): self.state = 'terminating' stanza = SubscriptionPresence(self.local_identity, self.remote_identity, 'unsubscribed') self.xmpp_manager.send_stanza(stanza) self.end() def send_presence(self, stanza): self.xmpp_manager.send_stanza(stanza) def _run(self): notification_center = NotificationCenter() while True: item = self.channel.wait() if isinstance(item, SubscriptionPresence): if item.type == 'subscribe': - notification_center.post_notification('XMPPIncomingSubscriptionGotSubscribe', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPIncomingSubscriptionGotSubscribe', sender=self) elif item.type == 'unsubscribe': self.state = 'terminated' notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionGotUnsubscribe', sender=self, data=TimestampedNotificationData()) - notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=TimestampedNotificationData(originator='local')) + notification_center.post_notification('XMPPIncomingSubscriptionGotUnsubscribe', sender=self) + notification_center.post_notification('XMPPIncomingSubscriptionDidEnd', sender=self, data=NotificationData(originator='local')) break elif isinstance(item, ProbePresence): notification_center = NotificationCenter() - notification_center.post_notification('XMPPIncomingSubscriptionGotProbe', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('XMPPIncomingSubscriptionGotProbe', sender=self) self._proc = None class XMPPSubscriptionManager(object): __metaclass__ = Singleton implements(IObserver) def __init__(self): self.incoming_subscriptions = {} self.outgoing_subscriptions = {} def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='XMPPSubscriptionDidStart') notification_center.add_observer(self, name='XMPPSubscriptionDidEnd') notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidStart') notification_center.add_observer(self, name='XMPPIncomingSubscriptionDidEnd') def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self, name='XMPPSubscriptionDidStart') notification_center.remove_observer(self, name='XMPPSubscriptionDidEnd') notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidStart') notification_center.remove_observer(self, name='XMPPIncomingSubscriptionDidEnd') def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_XMPPSubscriptionDidStart(self, notification): subscription = notification.sender self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription def _NH_XMPPSubscriptionDidEnd(self, notification): subscription = notification.sender del self.outgoing_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] def _NH_XMPPIncomingSubscriptionDidStart(self, notification): subscription = notification.sender self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] = subscription def _NH_XMPPIncomingSubscriptionDidEnd(self, notification): subscription = notification.sender del self.incoming_subscriptions[(subscription.local_identity.uri, subscription.remote_identity.uri)] diff --git a/sylk/bonjour.py b/sylk/bonjour.py index ad19c94..ef2ced1 100644 --- a/sylk/bonjour.py +++ b/sylk/bonjour.py @@ -1,240 +1,239 @@ # Copyright (C) 2012 AG Projects. See LICENSE for details # from application import log -from application.notification import NotificationCenter, IObserver +from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from eventlet import api, coros, proc from eventlet.green import select -from sipsimple.account import bonjour, BonjourRegistrationFile from sipsimple.account import AccountManager +from sipsimple.account.bonjour import _bonjour, BonjourRegistrationFile from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread -from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from zope.interface import implements class RestartSelect(Exception): pass class BonjourServices(object): implements(IObserver) def __init__(self, service='sipfocus', name='SylkServer', uri_user=None): self.account = AccountManager().default_account self.service = service self.name = name self.uri_user = uri_user self._stopped = True self._files = [] self._command_channel = coros.queue() self._select_proc = None self._register_timer = None self._update_timer = None self._wakeup_timer = None @run_in_green_thread def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, name='SystemIPAddressDidChange') notification_center.add_observer(self, name='SystemDidWakeUpFromSleep') self._select_proc = proc.spawn(self._process_files) proc.spawn(self._handle_commands) self._activate() @run_in_green_thread def stop(self): self._deactivate() notification_center = NotificationCenter() notification_center.remove_observer(self, name='SystemIPAddressDidChange') notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep') self._select_proc.kill() self._command_channel.send_exception(api.GreenletExit) def _activate(self): self._stopped = False self._command_channel.send(Command('register')) def _deactivate(self): command = Command('stop') self._command_channel.send(command) command.wait() self._stopped = True def restart_registration(self): self._command_channel.send(Command('unregister')) self._command_channel.send(Command('register')) def update_registrations(self): self._command_channel.send(Command('update_registrations')) def _register_cb(self, file, flags, error_code, name, regtype, domain): notification_center = NotificationCenter() file = BonjourRegistrationFile.find_by_file(file) - if error_code == bonjour.kDNSServiceErr_NoError: + if error_code == _bonjour.kDNSServiceErr_NoError: notification_center.post_notification('BonjourServiceRegistrationDidSucceed', sender=self, - data=TimestampedNotificationData(name=name, transport=file.transport)) + data=NotificationData(name=name, transport=file.transport)) else: - error = bonjour.BonjourError(error_code) + error = _bonjour.BonjourError(error_code) notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, - data=TimestampedNotificationData(reason=str(error), transport=file.transport)) + data=NotificationData(reason=str(error), transport=file.transport)) self._files.remove(file) self._select_proc.kill(RestartSelect) file.close() if self._register_timer is None: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register')) def _process_files(self): while True: try: ready = select.select([f for f in self._files if not f.active and not f.closed], [], [])[0] except RestartSelect: continue else: for file in ready: file.active = True self._command_channel.send(Command('process_results', files=[f for f in ready if not f.closed])) def _handle_commands(self): while True: command = self._command_channel.wait() if not self._stopped: handler = getattr(self, '_CH_%s' % command.name) handler(command) def _CH_unregister(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile)): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): - notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=TimestampedNotificationData(transport=transport)) + notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() def _CH_register(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None supported_transports = set(transport for transport in settings.sip.transport_list if transport!='tls' or self.account.tls.certificate is not None) registered_transports = set(file.transport for file in self._files if isinstance(file, BonjourRegistrationFile)) missing_transports = supported_transports - registered_transports added_transports = set() for transport in missing_transports: - notification_center.post_notification('BonjourServiceWillRegister', sender=self, data=TimestampedNotificationData(transport=transport)) + notification_center.post_notification('BonjourServiceWillRegister', sender=self, data=NotificationData(transport=transport)) try: contact_uri = self.account.contact[transport] contact_uri.user = self.uri_user contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri)) - file = bonjour.DNSServiceRegister(name=str(contact_uri), + file = _bonjour.DNSServiceRegister(name=str(contact_uri), regtype="_%s._%s" % (self.service, transport if transport == 'udp' else 'tcp'), port=contact_uri.port, callBack=self._register_cb, - txtRecord=bonjour.TXTRecord(items=txtdata)) - except (bonjour.BonjourError, KeyError), e: + txtRecord=_bonjour.TXTRecord(items=txtdata)) + except (_bonjour.BonjourError, KeyError), e: notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self, - data=TimestampedNotificationData(reason=str(e), transport=transport)) + data=NotificationData(reason=str(e), transport=transport)) else: self._files.append(BonjourRegistrationFile(file, transport)) added_transports.add(transport) if added_transports: self._select_proc.kill(RestartSelect) if added_transports != missing_transports: self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register', command.event)) else: command.signal() def _CH_update_registrations(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None available_transports = settings.sip.transport_list old_files = [] for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile) and f.transport not in available_transports): old_files.append(file) self._files.remove(file) self._select_proc.kill(RestartSelect) for file in old_files: file.close() update_failure = False for file in (f for f in self._files if isinstance(f, BonjourRegistrationFile)): try: contact_uri = self.account.contact[file.transport] contact_uri.user = self.user_uri contact_uri.parameters['isfocus'] = None txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri)) - bonjour.DNSServiceUpdateRecord(file.file, None, flags=0, rdata=bonjour.TXTRecord(items=txtdata), ttl=0) - except (bonjour.BonjourError, KeyError), e: + _bonjour.DNSServiceUpdateRecord(file.file, None, flags=0, rdata=_bonjour.TXTRecord(items=txtdata), ttl=0) + except (_bonjour.BonjourError, KeyError), e: notification_center.post_notification('BonjourServiceRegistrationUpdateDidFail', sender=self, - data=TimestampedNotificationData(reason=str(e), transport=file.transport)) + data=NotificationData(reason=str(e), transport=file.transport)) update_failure = True self._command_channel.send(Command('register')) if update_failure: self._update_timer = reactor.callLater(1, self._command_channel.send, Command('update_registrations', command.event)) else: command.signal() def _CH_process_results(self, command): for file in (f for f in command.files if not f.closed): try: - bonjour.DNSServiceProcessResult(file.file) + _bonjour.DNSServiceProcessResult(file.file) except: # Should we close the file? The documentation doesn't say anything about this. -Luci log.err() for file in command.files: file.active = False self._files = [f for f in self._files if not f.closed] self._select_proc.kill(RestartSelect) def _CH_stop(self, command): if self._register_timer is not None and self._register_timer.active(): self._register_timer.cancel() self._register_timer = None if self._update_timer is not None and self._update_timer.active(): self._update_timer.cancel() self._update_timer = None if self._wakeup_timer is not None and self._wakeup_timer.active(): self._wakeup_timer.cancel() self._wakeup_timer = None old_files = self._files self._files = [] self._select_proc.kill(RestartSelect) for file in old_files: file.close() notification_center = NotificationCenter() for transport in set(file.transport for file in self._files): - notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=TimestampedNotificationData(transport=transport)) + notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport)) command.signal() @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SystemIPAddressDidChange(self, notification): if self._files: self.restart_registration() def _NH_SystemDidWakeUpFromSleep(self, notification): if self._wakeup_timer is None: def wakeup_action(): if self._files: self.restart_registration() self._wakeup_timer = None self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize diff --git a/sylk/extensions.py b/sylk/extensions.py index f928733..200c69d 100644 --- a/sylk/extensions.py +++ b/sylk/extensions.py @@ -1,229 +1,228 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # import random from datetime import datetime from dateutil.tz import tzlocal -from application.notification import NotificationCenter +from application.notification import NotificationCenter, NotificationData from eventlet import api from msrplib.connect import DirectConnector, DirectAcceptor, RelayConnection, MSRPRelaySettings from msrplib.protocol import URI from msrplib.session import contains_mime_type from msrplib.transport import make_response from sipsimple.account import AccountManager from sipsimple.core import SDPAttribute from sipsimple.payloads.iscomposing import IsComposingDocument, IsComposingMessage, State, LastActive, Refresh, ContentType from sipsimple.streams import MediaStreamRegistry from sipsimple.streams.applications.chat import CPIMMessage, CPIMParserError from sipsimple.streams.msrp import ChatStreamError, MSRPStreamError, NotificationProxyLogger from sipsimple.streams.msrp import ChatStream as _ChatStream, MSRPStreamBase as _MSRPStreamBase from sipsimple.threading.green import run_in_green_thread -from sipsimple.util import TimestampedNotificationData from twisted.python.failure import Failure from sylk.configuration import SIPConfig from sylk.session import ServerSession # We need to match on the only account that will be available def _always_find_default_account(self, contact_uri): return self.default_account AccountManager.find_account = _always_find_default_account # Patch sipsimple.session to use ServerSession instead import sipsimple.session sipsimple.session.Session = ServerSession # We need to be able to set the local identity in the message CPIM envelope # so that messages appear to be coming from the users themselves, instead of # just seeying the server identity registry = MediaStreamRegistry() for stream_type in registry.stream_types[:]: if stream_type is _ChatStream: registry.stream_types.remove(stream_type) break del registry class MSRPStreamBase(_MSRPStreamBase): @run_in_green_thread def initialize(self, session, direction): self.greenlet = api.getcurrent() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) try: self.session = session self.transport = self.account.msrp.transport outgoing = direction=='outgoing' logger = NotificationProxyLogger() if self.account.msrp.connection_model == 'relay': if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' elif not outgoing and not self.account.nat_traversal.use_msrp_relay_for_inbound: if self.transport=='tls' and None in (self.account.tls_credentials.cert, self.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger) self.local_role = 'passive' elif outgoing and not self.account.nat_traversal.use_msrp_relay_for_outbound: self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if self.account.nat_traversal.msrp_relay is None: relay_host = relay_port = None else: if self.transport != self.account.nat_traversal.msrp_relay.transport: raise MSRPStreamError("MSRP relay transport conflicts with MSRP transport setting") relay_host = self.account.nat_traversal.msrp_relay.host relay_port = self.account.nat_traversal.msrp_relay.port relay = MSRPRelaySettings(domain=self.account.uri.host, username=self.account.uri.user, password=self.account.credentials.password, host=relay_host, port=relay_port, use_tls=self.transport=='tls') self.msrp_connector = RelayConnection(relay, 'passive', logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' else: if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if not outgoing and self.transport=='tls' and None in (self.account.tls_credentials.cert, self.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' full_local_path = self.msrp_connector.prepare(self.local_uri) self.local_media = self._create_local_media(full_local_path) except api.GreenletExit: raise except Exception, ex: - ndata = TimestampedNotificationData(context='initialize', failure=Failure(), reason=str(ex)) + ndata = NotificationData(context='initialize', failure=Failure(), reason=str(ex)) notification_center.post_notification('MediaStreamDidFail', self, ndata) else: - notification_center.post_notification('MediaStreamDidInitialize', self, data=TimestampedNotificationData()) + notification_center.post_notification('MediaStreamDidInitialize', self) finally: if self.msrp_session is None and self.msrp is None and self.msrp_connector is None: notification_center.remove_observer(self, sender=self) self.greenlet = None class ChatStream(_ChatStream, MSRPStreamBase): accept_types = ['message/cpim'] accept_wrapped_types = ['*'] chatroom_capabilities = ['nickname', 'private-messages', 'com.ag-projects.screen-sharing'] @property def local_uri(self): return URI(host=SIPConfig.local_ip, port=0, use_tls=self.transport=='tls', credentials=self.account.tls_credentials) def _create_local_media(self, uri_path): local_media = MSRPStreamBase._create_local_media(self, uri_path) if self.session.local_focus and self.chatroom_capabilities: local_media.attributes.append(SDPAttribute('chatroom', ' '.join(self.chatroom_capabilities))) return local_media def _handle_SEND(self, chunk): # This ChatStream doesn't send MSRP REPORT chunks automatically, the developer needs to manually send them if self.direction=='sendonly': self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if not chunk.data: self.msrp_session.send_report(chunk, 200, 'OK') return if chunk.segment is not None: self.incoming_queue.setdefault(chunk.message_id, []).append(chunk.data) if chunk.final: chunk.data = ''.join(self.incoming_queue.pop(chunk.message_id)) else: self.msrp_session.send_report(chunk, 200, 'OK') return if chunk.content_type.lower() == 'message/cpim': try: message = CPIMMessage.parse(chunk.data) except CPIMParserError: self.msrp_session.send_report(chunk, 400, 'CPIM Parser Error') return else: if message.timestamp is None: message.timestamp = datetime.now(tzlocal()) if message.sender is None: message.sender = self.remote_identity private = self.session.remote_focus and len(message.recipients) == 1 and message.recipients[0] != self.remote_identity else: self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') return # TODO: check wrapped content-type and issue a report if it's invalid notification_center = NotificationCenter() if message.content_type.lower() == IsComposingDocument.content_type: data = IsComposingDocument.parse(message.body) - ndata = TimestampedNotificationData(state=data.state.value, + ndata = NotificationData(state=data.state.value, refresh=data.refresh.value if data.refresh is not None else None, content_type=data.content_type.value if data.content_type is not None else None, last_active=data.last_active.value if data.last_active is not None else None, sender=message.sender, recipients=message.recipients, private=private, chunk=chunk) notification_center.post_notification('ChatStreamGotComposingIndication', self, ndata) else: - notification_center.post_notification('ChatStreamGotMessage', self, TimestampedNotificationData(message=message, private=private, chunk=chunk)) + notification_center.post_notification('ChatStreamGotMessage', self, NotificationData(message=message, private=private, chunk=chunk)) def _handle_NICKNAME(self, chunk): nickname = chunk.headers['Use-Nickname'].decoded notification_center = NotificationCenter() - notification_center.post_notification('ChatStreamGotNicknameRequest', self, TimestampedNotificationData(nickname=nickname, chunk=chunk)) + notification_center.post_notification('ChatStreamGotNicknameRequest', self, NotificationData(nickname=nickname, chunk=chunk)) @run_in_green_thread def _send_response(self, response): try: self.msrp_session.send_chunk(response) except Exception: pass def accept_nickname(self, chunk): if chunk.method != 'NICKNAME': raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method) response = make_response(chunk, 200, 'OK') self._send_response(response) def reject_nickname(self, chunk, code, reason): if chunk.method != 'NICKNAME': raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method) response = make_response(chunk, code, reason) self._send_response(response) def send_message(self, content, content_type='text/plain', local_identity=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None, message_id=None, notify_progress=True, success_report='yes', failure_report='yes'): if self.direction=='recvonly': raise ChatStreamError('Cannot send message on recvonly stream') if message_id is None: message_id = '%x' % random.getrandbits(64) if not contains_mime_type(self.accept_wrapped_types, content_type): raise ChatStreamError('Invalid content_type for outgoing message: %r' % content_type) if not recipients: recipients = [self.remote_identity] if timestamp is None: timestamp = datetime.now() # Only use CPIM, it's the only type we accept msg = CPIMMessage(content, content_type, sender=local_identity or self.local_identity, recipients=recipients, courtesy_recipients=courtesy_recipients, subject=subject, timestamp=timestamp, required=required, additional_headers=additional_headers) self._enqueue_message(str(message_id), str(msg), 'message/cpim', failure_report=failure_report, success_report=success_report, notify_progress=notify_progress) return message_id def send_composing_indication(self, state, refresh, last_active=None, recipients=None, local_identity=None, message_id=None, notify_progress=False, success_report='no', failure_report='partial'): if self.direction == 'recvonly': raise ChatStreamError('Cannot send message on recvonly stream') if state not in ('active', 'idle'): raise ValueError('Invalid value for composing indication state') if message_id is None: message_id = '%x' % random.getrandbits(64) content = IsComposingMessage(state=State(state), refresh=Refresh(refresh), last_active=LastActive(last_active or datetime.now()), content_type=ContentType('text')).toxml() if recipients is None: recipients = [self.remote_identity] # Only use CPIM, it's the only type we accept msg = CPIMMessage(content, IsComposingDocument.content_type, sender=local_identity or self.local_identity, recipients=recipients, timestamp=datetime.now()) self._enqueue_message(str(message_id), str(msg), 'message/cpim', failure_report='partial', success_report='no') return message_id diff --git a/sylk/interfaces/sipthor.py b/sylk/interfaces/sipthor.py index 4f201c4..dd58899 100644 --- a/sylk/interfaces/sipthor.py +++ b/sylk/interfaces/sipthor.py @@ -1,112 +1,111 @@ # Copyright (C) 2011 AG-Projects. # # This module is proprietary to AG Projects. Use of this module by third # parties is not supported. __all__ = ['ConferenceNode'] from application import log -from application.notification import NotificationCenter +from application.notification import NotificationCenter, NotificationData from application.python.types import Singleton from eventlet.twistedutil import block_on, callInGreenThread from gnutls.interfaces.twisted import X509Credentials from gnutls.constants import COMP_DEFLATE, COMP_LZO, COMP_NULL -from sipsimple.util import TimestampedNotificationData from twisted.internet import defer from thor.eventservice import EventServiceClient, ThorEvent from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity from thor.scheduler import KeepRunning import sylk from sylk.configuration import SIPConfig, ThorNodeConfig class ConferenceNode(EventServiceClient): __metaclass__ = Singleton topics = ["Thor.Members"] def __init__(self): # Needs to be called from a green thread self.node = ThorEntity(SIPConfig.local_ip, ['conference_server', 'xmpp_gateway'], version=sylk.__version__) self.networks = {} self.presence_message = ThorEvent('Thor.Presence', self.node.id) self.shutdown_message = ThorEvent('Thor.Leave', self.node.id) credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca]) credentials.verify_peer = True credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL) EventServiceClient.__init__(self, ThorNodeConfig.domain, credentials) def connectionLost(self, connector, reason): """Called when an event server connection goes away""" self.connections.discard(connector.transport) def connectionFailed(self, connector, reason): """Called when an event server connection has an unrecoverable error""" connector.failed = True available_connectors = set(c for c in self.connectors if not c.failed) if not available_connectors: log.fatal("All Thor Event Servers have unrecoverable errors.") - NotificationCenter().post_notification('ThorNetworkGotFatalError', sender=self, data=TimestampedNotificationData()) + NotificationCenter().post_notification('ThorNetworkGotFatalError', sender=self) def stop(self): # Needs to be called from a green thread self._shutdown() def _monitor_event_servers(self): def wrapped_func(): servers = self._get_event_servers() self._update_event_servers(servers) callInGreenThread(wrapped_func) return KeepRunning def _disconnect_all(self): for conn in self.connectors: conn.disconnect() def _shutdown(self): if self.disconnecting: return self.disconnecting = True self.dns_monitor.cancel() if self.advertiser: self.advertiser.cancel() if self.shutdown_message: self._publish(self.shutdown_message) requests = [conn.protocol.unsubscribe(*self.topics) for conn in self.connections] d = defer.DeferredList([request.deferred for request in requests]) block_on(d) self._disconnect_all() def handle_event(self, event): #print "Received event: %s" % event networks = self.networks role_map = ThorEntitiesRoleMap(event.message) # mapping between role names and lists of nodes with that role updated = False for role in ('sip_proxy', 'conference_server', 'xmpp_gateway'): try: network = networks[role] except KeyError: from thor import network as thor_network network = thor_network.new(ThorNodeConfig.multiply) networks[role] = network new_nodes = set([node.ip for node in role_map.get(role, [])]) old_nodes = set(network.nodes) added_nodes = new_nodes - old_nodes removed_nodes = old_nodes - new_nodes if removed_nodes: for node in removed_nodes: network.remove_node(node) plural = len(removed_nodes) != 1 and 's' or '' log.msg("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes))) updated = True if added_nodes: for node in added_nodes: network.add_node(node) plural = len(added_nodes) != 1 and 's' or '' log.msg("added %s node%s: %s" % (role, plural, ', '.join(added_nodes))) updated = True #print "Thor %s nodes: %s" % (role, str(network.nodes)) if updated: - NotificationCenter().post_notification('ThorNetworkGotUpdate', sender=self, data=TimestampedNotificationData(networks=self.networks)) + NotificationCenter().post_notification('ThorNetworkGotUpdate', sender=self, data=NotificationData(networks=self.networks)) diff --git a/sylk/log.py b/sylk/log.py index fbe48ff..4d8db4c 100644 --- a/sylk/log.py +++ b/sylk/log.py @@ -1,305 +1,305 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # """ Logging support adapted from SIP SIMPLE Client logger. """ __all__ = ["Logger"] import datetime import os import sys from pprint import pformat from application import log from application.notification import IObserver, NotificationCenter from application.python import Null from application.system import makedirs from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.threading import run_in_thread from zope.interface import implements class Logger(object): implements(IObserver) # public methods # def __init__(self): self.stopped = False self.msrp_level = log.level.ERROR self._siptrace_filename = None self._siptrace_file = None self._siptrace_error = False self._siptrace_start_time = None self._siptrace_packet_count = 0 self._msrptrace_filename = None self._msrptrace_file = None self._msrptrace_error = False self._pjsiptrace_filename = None self._pjsiptrace_file = None self._pjsiptrace_error = False self._notifications_filename = None self._notifications_file = None self._notifications_error = False self._log_directory_error = False def start(self): # try to create the log directory try: self._init_log_directory() except Exception: pass # register to receive log notifications notification_center = NotificationCenter() notification_center.add_observer(self) self.stopped = False def stop(self): notification_center = NotificationCenter() notification_center.remove_observer(self) self.stopped = False self._close_files() @run_in_thread('log-io') def _close_files(self): # close sip trace file if self._siptrace_file is not None: self._siptrace_file.close() self._siptrace_file = None # close msrp trace file if self._msrptrace_file is not None: self._msrptrace_file.close() self._msrptrace_file = None # close pjsip trace file if self._pjsiptrace_file is not None: self._pjsiptrace_file.close() self._pjsiptrace_file = None # close notifications trace file if self._notifications_file is not None: self._notifications_file.close() self._notifications_file = None def handle_notification(self, notification): if self.stopped: return self._process_notification(notification) @run_in_thread('log-io') def _process_notification(self, notification): settings = SIPSimpleSettings() handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) handler = getattr(self, '_LH_%s' % notification.name, Null) handler(notification) if notification.name not in ('SIPEngineLog', 'SIPEngineSIPTrace') and settings.logs.trace_notifications: message = 'Notification name=%s sender=%s' % (notification.name, notification.sender) if notification.data is not None: message += '\n%s' % pformat(notification.data.__dict__) if settings.logs.trace_notifications: try: self._init_log_file('notifications') except Exception: pass else: self._notifications_file.write('%s [%s %d]: %s\n' % (datetime.datetime.now(), os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) self._notifications_file.flush() # notification handlers # def _NH_CFGSettingsObjectDidChange(self, notification): settings = SIPSimpleSettings() if notification.sender is settings: if 'logs.directory' in notification.data.modified: # sip trace if self._siptrace_file is not None: self._siptrace_file.close() self._siptrace_file = None # pjsip trace if self._pjsiptrace_file is not None: self._pjsiptrace_file.close() self._pjsiptrace_file = None # notifications trace if self._notifications_file is not None: self._notifications_file.close() self._notifications_file = None # try to create the log directory try: self._init_log_directory() except Exception: pass # log handlers # def _LH_SIPEngineSIPTrace(self, notification): settings = SIPSimpleSettings() if not settings.logs.trace_sip: return if self._siptrace_start_time is None: - self._siptrace_start_time = notification.data.timestamp + self._siptrace_start_time = notification.datetime self._siptrace_packet_count += 1 if notification.data.received: direction = "RECEIVED" else: direction = "SENDING" - buf = ["%s: Packet %d, +%s" % (direction, self._siptrace_packet_count, (notification.data.timestamp - self._siptrace_start_time))] + buf = ["%s: Packet %d, +%s" % (direction, self._siptrace_packet_count, (notification.datetime - self._siptrace_start_time))] buf.append("%(source_ip)s:%(source_port)d -(SIP over %(transport)s)-> %(destination_ip)s:%(destination_port)d" % notification.data.__dict__) buf.append(notification.data.data) buf.append('--') message = '\n'.join(buf) if settings.logs.trace_sip: try: self._init_log_file('siptrace') except Exception: pass else: - self._siptrace_file.write('%s [%s %d]: %s\n' % (notification.data.timestamp, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) + self._siptrace_file.write('%s [%s %d]: %s\n' % (notification.datetime, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) self._siptrace_file.flush() def _LH_SIPEngineLog(self, notification): settings = SIPSimpleSettings() if not settings.logs.trace_pjsip: return message = "(%(level)d) %(sender)14s: %(message)s" % notification.data.__dict__ if settings.logs.trace_pjsip: try: self._init_log_file('pjsiptrace') except Exception: pass else: - self._pjsiptrace_file.write('%s [%s %d] %s\n' % (notification.data.timestamp, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) + self._pjsiptrace_file.write('%s [%s %d] %s\n' % (notification.datetime, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) self._pjsiptrace_file.flush() def _LH_DNSLookupTrace(self, notification): settings = SIPSimpleSettings() if not settings.logs.trace_sip: return message = 'DNS lookup %(query_type)s %(query_name)s' % notification.data.__dict__ if notification.data.error is None: message += ' succeeded, ttl=%d: ' % notification.data.answer.ttl if notification.data.query_type == 'A': message += ", ".join(record.address for record in notification.data.answer) elif notification.data.query_type == 'SRV': message += ", ".join('%d %d %d %s' % (record.priority, record.weight, record.port, record.target) for record in notification.data.answer) elif notification.data.query_type == 'NAPTR': message += ", ".join('%d %d "%s" "%s" "%s" %s' % (record.order, record.preference, record.flags, record.service, record.regexp, record.replacement) for record in notification.data.answer) else: import dns.resolver message_map = {dns.resolver.NXDOMAIN: 'DNS record does not exist', dns.resolver.NoAnswer: 'DNS response contains no answer', dns.resolver.NoNameservers: 'no DNS name servers could be reached', dns.resolver.Timeout: 'no DNS response received, the query has timed out'} message += ' failed: %s' % message_map.get(notification.data.error.__class__, '') if settings.logs.trace_sip: try: self._init_log_file('siptrace') except Exception: pass else: - self._siptrace_file.write('%s [%s %d]: %s\n' % (notification.data.timestamp, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) + self._siptrace_file.write('%s [%s %d]: %s\n' % (notification.datetime, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) self._siptrace_file.flush() def _LH_MSRPTransportTrace(self, notification): settings = SIPSimpleSettings() if not settings.logs.trace_msrp: return arrow = {'incoming': '<--', 'outgoing': '-->'}[notification.data.direction] local_address = notification.sender.getHost() local_address = '%s:%d' % (local_address.host, local_address.port) remote_address = notification.sender.getPeer() remote_address = '%s:%d' % (remote_address.host, remote_address.port) message = '%s %s %s\n' % (local_address, arrow, remote_address) + notification.data.data if settings.logs.trace_msrp: try: self._init_log_file('msrptrace') except Exception: pass else: - self._msrptrace_file.write('%s [%s %d]: %s\n' % (notification.data.timestamp, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) + self._msrptrace_file.write('%s [%s %d]: %s\n' % (notification.datetime, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) self._msrptrace_file.flush() def _LH_MSRPLibraryLog(self, notification): settings = SIPSimpleSettings() if not settings.logs.trace_msrp: return if notification.data.level < self.msrp_level: return message = '%s%s' % (notification.data.level.prefix, notification.data.message) if settings.logs.trace_msrp: try: self._init_log_file('msrptrace') except Exception: pass else: - self._msrptrace_file.write('%s [%s %d]: %s\n' % (notification.data.timestamp, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) + self._msrptrace_file.write('%s [%s %d]: %s\n' % (notification.datetime, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message)) self._msrptrace_file.flush() # private methods # def _init_log_directory(self): settings = SIPSimpleSettings() log_directory = settings.logs.directory.normalized try: makedirs(log_directory) except Exception, e: if not self._log_directory_error: print "failed to create logs directory '%s': %s" % (log_directory, e) self._log_directory_error = True self._siptrace_error = True self._pjsiptrace_error = True self._notifications_error = True raise else: self._log_directory_error = False # sip trace if self._siptrace_filename is None: self._siptrace_filename = os.path.join(log_directory, 'sip_trace.txt') self._siptrace_error = False # msrp trace if self._msrptrace_filename is None: self._msrptrace_filename = os.path.join(log_directory, 'msrp_trace.txt') self._msrptrace_error = False # pjsip trace if self._pjsiptrace_filename is None: self._pjsiptrace_filename = os.path.join(log_directory, 'core_trace.txt') self._pjsiptrace_error = False # notifications trace if self._notifications_filename is None: self._notifications_filename = os.path.join(log_directory, 'notifications_trace.txt') self._notifications_error = False def _init_log_file(self, type): if getattr(self, '_%s_file' % type) is None: self._init_log_directory() filename = getattr(self, '_%s_filename' % type) try: setattr(self, '_%s_file' % type, open(filename, 'a')) except Exception, e: if not getattr(self, '_%s_error' % type): print "failed to create log file '%s': %s" % (filename, e) setattr(self, '_%s_error' % type, True) raise else: setattr(self, '_%s_error' % type, False) diff --git a/sylk/server.py b/sylk/server.py index ab66312..5559e4f 100644 --- a/sylk/server.py +++ b/sylk/server.py @@ -1,241 +1,240 @@ # Copyright (C) 2010-2011 AG Projects. See LICENSE for details. # from __future__ import with_statement import sys from threading import Event from application import log -from application.notification import NotificationCenter +from application.notification import NotificationCenter, NotificationData from eventlet import api, proc from sipsimple.account import Account, BonjourAccount, AccountManager from sipsimple.application import SIPApplication from sipsimple.audio import AudioDevice, RootAudioBridge from sipsimple.configuration import ConfigurationError from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioMixer, Engine, SIPCoreError from sipsimple.lookup import DNSManager from sipsimple.session import SessionManager from sipsimple.storage import MemoryStorage from sipsimple.threading import ThreadManager from sipsimple.threading.green import run_in_green_thread -from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from sylk.applications import IncomingRequestHandler from sylk.configuration import SIPConfig, ThorNodeConfig from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension from sylk.log import Logger # Load extensions needed for integration with SIP SIMPLE SDK import sylk.extensions class SylkServer(SIPApplication): def __init__(self): self.logger = None self.request_handler = None self.stop_event = Event() def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) notification_center.add_observer(self, name='ThorNetworkGotFatalError') self.logger = Logger() Account.register_extension(AccountExtension) BonjourAccount.register_extension(BonjourAccountExtension) SIPSimpleSettings.register_extension(SylkServerSettingsExtension) try: SIPApplication.start(self, MemoryStorage()) except ConfigurationError, e: log.fatal("Error loading configuration: ",e) sys.exit(1) def _load_configuration(self): # Command line options settings = SIPSimpleSettings() settings.bonjour.enabled = '--use-bonjour' in sys.argv # Horrible hack, I know account_manager = AccountManager() account = Account("account@example.com") # an account is required by AccountManager # Disable MSRP ACM if we are using Bonjour account.msrp.connection_model = 'relay' if settings.bonjour.enabled else 'acm' account.save() account_manager.default_account = account @run_in_green_thread def _initialize_subsystems(self): account_manager = AccountManager() engine = Engine() notification_center = NotificationCenter() session_manager = SessionManager() settings = SIPSimpleSettings() self._load_configuration() - notification_center.post_notification('SIPApplicationWillStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('SIPApplicationWillStart', sender=self) if self.state == 'stopping': reactor.stop() return account = account_manager.default_account # initialize core notification_center.add_observer(self, sender=engine) options = dict(# general ip_address=SIPConfig.local_ip, user_agent=settings.user_agent, # SIP detect_sip_loops=False, udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None, tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None, tls_port=None, # TLS tls_protocol='TLSv1', tls_verify_server=False, tls_ca_file=None, tls_cert_file=None, tls_privkey_file=None, tls_timeout=3000, # rtp rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end), # audio codecs=list(settings.rtp.audio_codec_list), # logging log_level=settings.logs.pjsip_level, trace_sip=True, # events and requests to handle events={'conference': ['application/conference-info+xml'], 'presence': ['application/pidf+xml'], 'refer': ['message/sipfrag;version=2.0']}, incoming_events=set(['conference', 'presence']), incoming_requests=set(['MESSAGE']) ) try: engine.start(**options) except SIPCoreError: self.end_reason = 'engine failed' reactor.stop() return # initialize TLS try: engine.set_tls_options(port=settings.sip.tls_port if 'tls' in settings.sip.transport_list else None, protocol=settings.tls.protocol, verify_server=account.tls.verify_server if account else False, ca_file=settings.tls.ca_list.normalized if settings.tls.ca_list else None, cert_file=account.tls.certificate.normalized if account and account.tls.certificate else None, privkey_file=account.tls.certificate.normalized if account and account.tls.certificate else None, timeout=settings.tls.timeout) except Exception, e: - notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=TimestampedNotificationData(error=e)) + notification_center.post_notification('SIPApplicationFailedToStartTLS', sender=self, data=NotificationData(error=e)) # initialize audio objects voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999) self.voice_audio_device = AudioDevice(voice_mixer) self.voice_audio_bridge = RootAudioBridge(voice_mixer) self.voice_audio_bridge.add(self.voice_audio_device) # initialize middleware components account_manager.start() session_manager.start() notification_center.add_observer(self, name='CFGSettingsObjectDidChange') self.state = 'started' - notification_center.post_notification('SIPApplicationDidStart', sender=self, data=TimestampedNotificationData()) + notification_center.post_notification('SIPApplicationDidStart', sender=self) @run_in_green_thread def _shutdown_subsystems(self): # cleanup internals if self._wakeup_timer is not None and self._wakeup_timer.active(): self._wakeup_timer.cancel() self._wakeup_timer = None # shutdown SIPThor interface sipthor_proc = proc.spawn(self._stop_sipthor) sipthor_proc.wait() # shutdown middleware components dns_manager = DNSManager() account_manager = AccountManager() session_manager = SessionManager() procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(session_manager.stop)] proc.waitall(procs) # shutdown engine engine = Engine() engine.stop() # TODO: timeout should be removed when the Engine is fixed so that it never hangs. -Saul try: with api.timeout(15): while True: notification = self._channel.wait() if notification.name == 'SIPEngineDidEnd': break except api.TimeoutError: pass # stop threads thread_manager = ThreadManager() thread_manager.stop() # stop the reactor reactor.stop() def _start_sipthor(self): if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode ConferenceNode() def _stop_sipthor(self): if ThorNodeConfig.enabled: from sylk.interfaces.sipthor import ConferenceNode ConferenceNode().stop() def _NH_SIPApplicationFailedToStartTLS(self, notification): log.fatal("Couldn't set TLS options: %s" % notification.data.error) def _NH_SIPApplicationWillStart(self, notification): self.logger.start() settings = SIPSimpleSettings() if settings.logs.trace_sip and self.logger._siptrace_filename is not None: log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename) if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None: log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename) if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None: log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename) if settings.logs.trace_notifications and self.logger._notifications_filename is not None: log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename) def _NH_SIPApplicationDidStart(self, notification): engine = Engine() settings = SIPSimpleSettings() local_ip = SIPConfig.local_ip log.msg("SylkServer started, listening on:") for transport in settings.sip.transport_list: try: log.msg("%s:%d (%s)" % (local_ip, getattr(engine, '%s_port' % transport), transport.upper())) except TypeError: pass # Start request handler self.request_handler = IncomingRequestHandler() self.request_handler.start() # Start SIPThor interface proc.spawn(self._start_sipthor) def _NH_SIPApplicationWillEnd(self, notification): self.request_handler.stop() def _NH_SIPApplicationDidEnd(self, notification): self.logger.stop() self.stop_event.set() def _NH_SIPEngineGotException(self, notification): log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback) def _NH_ThorNetworkGotFatalError(self, notification): self.stop() diff --git a/sylk/session.py b/sylk/session.py index 060bd40..65a669b 100644 --- a/sylk/session.py +++ b/sylk/session.py @@ -1,573 +1,572 @@ # Copyright (C) 2011 AG Projects. See LICENSE for details. # from __future__ import with_statement import random from datetime import datetime from time import time -from application.notification import IObserver, NotificationCenter +from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null, limit from eventlet import api, coros, proc from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import Invitation, Subscription, SIPCoreError, sip_status_messages from sipsimple.core import ContactHeader, RouteHeader, SubjectHeader, FromHeader, ToHeader from sipsimple.core import SIPURI, SDPConnection, SDPSession from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import ParserError from sipsimple.payloads.conference import ConferenceDocument from sipsimple.session import Session, SessionManager from sipsimple.session import SessionReplaceHandler, TransferHandler, DialogID, TransferInfo from sipsimple.session import InvitationDisconnectedError, MediaStreamDidFailError, InterruptSubscription, TerminateSubscription, SubscriptionError, SIPSubscriptionDidFail from sipsimple.session import transition_state from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread -from sipsimple.util import TimestampedNotificationData from twisted.internet import reactor from zope.interface import implements from sylk.configuration import SIPConfig class ConferenceHandler(object): implements(IObserver) def __init__(self, session): self.session = session self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None self._wakeup_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='DNSNameserversDidChange') notification_center.add_observer(self, name='SystemIPAddressDidChange') notification_center.add_observer(self, name='SystemDidWakeUpFromSleep') self._command_proc = proc.spawn(self._run) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _activate(self): self.active = True command = Command('subscribe') self._command_channel.send(command) return command def _deactivate(self): self.active = False command = Command('unsubscribe') self._command_channel.send(command) return command def _resubscribe(self): command = Command('subscribe') self._command_channel.send(command) return command def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='DNSNameserversDidChange') notification_center.remove_observer(self, name='SystemIPAddressDidChange') notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep') self._deactivate() command = Command('terminate') self._command_channel.send(command) command.wait() self.session = None def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._wakeup_timer is not None and self._wakeup_timer.active(): self._wakeup_timer.cancel() self._wakeup_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): command.signal() raise proc.ProcExit() def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account.sip.outbound_proxy is not None: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError, e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) target_uri = SIPURI.new(self.session.remote_identity.uri) refresh_interval = getattr(command, 'refresh_interval', account.sip.subscribe_interval) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: transport = route.transport parameters = {} if transport=='udp' else {'transport': transport} contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip, port=getattr(SIPConfig, 'local_%s_port' % transport), parameters=parameters) subscription = Subscription(target_uri, FromHeader(SIPURI.new(self.session.local_identity.uri)), ToHeader(target_uri), ContactHeader(contact_uri), 'conference', RouteHeader(route.get_uri()), credentials=account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) timeout = 5 raise SubscriptionError(error='Internal error', timeout=timeout) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail, e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time timeout = random.uniform(60, 120) raise SubscriptionError(error='Authentication failed', timeout=timeout) elif e.data.code == 423: # Get the value of the Min-Expires header timeout = random.uniform(60, 120) if e.data.min_expires is not None and e.data.min_expires > refresh_interval: raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires) else: raise SubscriptionError(error='Interval too short', timeout=timeout) elif e.data.code in (405, 406, 489, 1400): command.signal(e) return else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription timeout = random.uniform(60, 180) raise SubscriptionError(error='No more routes to try', timeout=timeout) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'conference' and notification.data.body: try: conference_info = ConferenceDocument.parse(notification.data.body) except ParserError: pass else: - notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=TimestampedNotificationData(conference_info=conference_info)) + notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info)) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) except InterruptSubscription, e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription, e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) except SubscriptionError, e: if 'min_expires' in e.attributes: command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires']) else: command = Command('subscribe', command.event) self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_SIPSessionDidStart(self, notification): if self.session.remote_focus: self._activate() @run_in_green_thread def _NH_SIPSessionDidFail(self, notification): self._terminate() @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): self._terminate() def _NH_SIPSessionDidRenegotiateStreams(self, notification): if self.session.remote_focus and not self.active: self._activate() elif not self.session.remote_focus and self.active: self._deactivate() def _NH_DNSNameserversDidChange(self, notification): if self.active: self._resubscribe() def _NH_SystemIPAddressDidChange(self, notification): if self.active: self._resubscribe() def _NH_SystemDidWakeUpFromSleep(self, notification): if self._wakeup_timer is None: def wakeup_action(): if self.active: self._resubscribe() self._wakeup_timer = None self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize class ServerSession(Session): def init_incoming(self, invitation, data): notification_center = NotificationCenter() remote_sdp = invitation.sdp.proposed_remote self.proposed_streams = [] if remote_sdp: for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry(): try: stream = stream_type.new_from_sdp(self.account, remote_sdp, index) except InvalidStreamError: break except UnknownStreamError: continue else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self.direction = 'incoming' self.state = 'incoming' self.transport = invitation.transport self._invitation = invitation self.conference = ConferenceHandler(self) self.transfer_handler = TransferHandler(self) if 'isfocus' in invitation.remote_contact_header.parameters: self.remote_focus = True try: self.__dict__['subject'] = data.headers['Subject'].subject except KeyError: pass if 'Referred-By' in data.headers or 'Replaces' in data.headers: self.transfer_info = TransferInfo() if 'Referred-By' in data.headers: self.transfer_info.referred_by = data.headers['Referred-By'].body if 'Replaces' in data.headers: replaces_header = data.headers.get('Replaces') replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=replaces_header.from_tag) session_manager = SessionManager() try: self.replaced_session = (session for session in session_manager.sessions if session._invitation is not None and session._invitation.dialog_id == replaced_dialog_id).next() except StopIteration: invitation.send_response(481) return else: self.transfer_info.replaced_dialog_id = replaced_dialog_id replace_handler = SessionReplaceHandler(self) replace_handler.start() notification_center.add_observer(self, sender=invitation) - notification_center.post_notification('SIPSessionNewIncoming', self, TimestampedNotificationData(streams=self.proposed_streams, headers=data.headers)) + notification_center.post_notification('SIPSessionNewIncoming', self, NotificationData(streams=self.proposed_streams, headers=data.headers)) else: invitation.send_response(488) @transition_state(None, 'connecting') @run_in_green_thread def connect(self, from_header, to_header, routes, streams, contact_header=None, is_focus=False, subject=None, extra_headers=[]): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False received_code = 0 received_reason = None unhandled_notifications = [] self.direction = 'outgoing' self.proposed_streams = streams self.route = routes[0] self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() self._local_identity = from_header self._remote_identity = to_header self.conference = ConferenceHandler(self) self.transfer_handler = Null self.__dict__['subject'] = subject notification_center.add_observer(self, sender=self._invitation) - notification_center.post_notification('SIPSessionNewOutgoing', self, TimestampedNotificationData(streams=streams)) + notification_center.post_notification('SIPSessionNewOutgoing', self, NotificationData(streams=streams)) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 if contact_header is None: try: contact_uri = self.account.contact[self.route] except KeyError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e)) return else: contact_header = ContactHeader(contact_uri) local_ip = contact_header.uri.host local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent) stun_addresses = [] for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(for_offer=True) local_sdp.media.append(media) stun_addresses.extend((value.split(' ', 5)[4] for value in media.attributes.getall('candidate') if value.startswith('S '))) if stun_addresses: local_sdp.connection.address = stun_addresses[0] route_header = RouteHeader(self.route.get_uri()) if is_focus: contact_header.parameters['isfocus'] = None if self.subject: extra_headers.append(SubjectHeader(self.subject)) self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, extra_headers=extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=received_code, reason=received_reason, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: - notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData()) - notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason)) + notification_center.post_notification('SIPSessionGotRingIndication', self, ) + notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, - TimestampedNotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) + NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.greenlet = None self.end() return - notification_center.post_notification('SIPSessionWillStart', self, TimestampedNotificationData()) + notification_center.post_notification('SIPSessionWillStart', self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) [self._channel.send(notification) for notification in invitation_notifications] while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: - notification_center.post_notification('SIPSessionGotRingIndication', self, TimestampedNotificationData()) - notification_center.post_notification('SIPSessionGotProvisionalResponse', self, TimestampedNotificationData(code=notification.data.code, reason=notification.data.reason)) + notification_center.post_notification('SIPSessionGotRingIndication', self) + notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, - TimestampedNotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) + NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except (MediaStreamDidFailError, api.TimeoutError), e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed out while starting' else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=received_code, reason=received_reason, error=error) except InvitationDisconnectedError, e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': - notification_center.post_notification('SIPSessionWillEnd', self, TimestampedNotificationData(originator=e.data.originator)) + notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator)) if e.data.originator == 'remote': - notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200])) + notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200])) self.end_time = datetime.now() - notification_center.post_notification('SIPSessionDidEnd', self, TimestampedNotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) + notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': - notification_center.post_notification('SIPSessionDidProcessTransaction', self, TimestampedNotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason)) + notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason)) if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None - notification_center.post_notification('SIPSessionDidFail', self, TimestampedNotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) + notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None except SIPCoreError, e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=received_code, reason=received_reason, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = datetime.now() - notification_center.post_notification('SIPSessionDidStart', self, TimestampedNotificationData(streams=self.streams)) + notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold()