Page MenuHomePhabricator

No OneTemporary

This file is larger than 256 KB, so syntax highlighting was skipped.
diff --git a/sylk-server b/sylk-server
index 2528db6..bd83d85 100755
--- a/sylk-server
+++ b/sylk-server
@@ -1,117 +1,117 @@
#!/usr/bin/env python
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details
#
import os
import signal
import sys
from application import log
from application.configuration import ConfigFile
from application.process import process, ProcessError
from optparse import OptionParser
import sipsimple
import sylk
def main():
name = 'sylk-server'
fullname = 'SylkServer'
runtime_directory = '/var/run/sylkserver'
system_config_directory = '/etc/sylkserver'
default_pid = os.path.join(runtime_directory, 'server.pid')
default_config = sylk.configuration_filename if os.path.isfile(sylk.configuration_filename) else os.path.join(system_config_directory, sylk.configuration_filename)
parser = OptionParser(version='%%prog %s' % sylk.__version__)
parser.add_option('--no-fork', action='store_false', dest='fork', default=1,
help='run the process in the foreground (for debugging)')
parser.add_option('--pid', dest='pid_file',
help='pid file ("%s")' % default_pid, metavar='File')
parser.add_option('--config-file', dest='config_file', default=default_config,
help='path to configuration file to read ("%s")' % default_config,
metavar='File')
parser.add_option('--enable-bonjour', action='store_true', dest='enable_bonjour', default=False,
help='enable Bonjour services')
parser.add_option('--debug-memory', action='store_true', dest='debug_memory', default=False,
help='enable memory debugging (works only if --no-fork is specified)')
(options, args) = parser.parse_args()
path, configuration_filename = os.path.split(options.config_file)
if path:
system_config_directory = path
process.system_config_directory = system_config_directory
sylk.configuration_filename = process.config_file(options.config_file)
pid_file = options.pid_file or default_pid
# when run in foreground, do not require root access because of /var/run/sylkserver
if not options.fork:
process._runtime_directory = None
else:
try:
process.runtime_directory = runtime_directory
process.daemonize(pid_file)
- except ProcessError, e:
+ except ProcessError as e:
log.fatal('Cannot start {name}: {exception!s}'.format(name=fullname, exception=e))
sys.exit(1)
log.start_syslog(name)
from sylk.server import SylkServer
log.info('Starting {name} {module.__version__}, using SIP SIMPLE SDK {sipsimple.__version__}'.format(name=fullname, module=sylk, sipsimple=sipsimple))
config_file = ConfigFile(sylk.configuration_filename)
if config_file.files:
log.info('Reading configuration from {}'.format(', '.join(config_file.files)))
else:
log.info('Not reading any configuration files (using internal defaults)')
if not options.fork and options.debug_memory:
import atexit
from application.debug.memory import memory_dump
atexit.register(memory_dump)
server = SylkServer()
def stop_server(*args):
if not server.stopping_event.is_set():
log.info('Stopping {name}...'.format(name=fullname))
server.stop()
process.signals.add_handler(signal.SIGTERM, stop_server)
process.signals.add_handler(signal.SIGINT, stop_server)
def toggle_debugging(*args):
from sylk.configuration import ServerConfig
if log.level.current != log.level.DEBUG:
log.level.current = log.level.DEBUG
log.info('Switched logging level to DEBUG')
else:
log.info('Switched logging level to {}'.format(ServerConfig.log_level))
log.level.current = ServerConfig.log_level
process.signals.add_handler(signal.SIGUSR1, toggle_debugging)
try:
server.start(options)
- except Exception, e:
+ except Exception as e:
log.fatal('Failed to run {name}: {exception!s}'.format(name=fullname, exception=e))
log.exception()
sys.exit(1)
else:
while not server.stopping_event.wait(9999):
pass
server.stop_event.wait(5)
if server.stop_event.is_set():
log.info('{name} stopped'.format(name=fullname))
else:
log.info('Forcefully exiting {name}...'.format(name=fullname))
os._exit(1)
sys.exit(int(server.failed))
if __name__ == "__main__":
main()
diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py
index ccd683b..8a66b36 100644
--- a/sylk/applications/conference/__init__.py
+++ b/sylk/applications/conference/__init__.py
@@ -1,411 +1,411 @@
import os
import re
import shutil
from application.notification import IObserver, NotificationCenter
from application.python import Null
from sipsimple.account.bonjour import BonjourPresenceState
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI, SIPCoreError
from sipsimple.core import Header, FromHeader, ToHeader, SubjectHeader
from sipsimple.lookup import DNSLookup
from sipsimple.streams import MediaStreamRegistry
from sipsimple.threading.green import run_in_green_thread
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications import SylkApplication
from sylk.applications.conference.configuration import get_room_config, ConferenceConfig
from sylk.applications.conference.logger import log
from sylk.applications.conference.room import Room
from sylk.applications.conference.web import ConferenceWeb
from sylk.bonjour import BonjourService
from sylk.configuration import ServerConfig, ThorNodeConfig
from sylk.session import Session, IllegalStateError
from sylk.web import server as web_server
class ACLValidationError(Exception): pass
class RoomNotFoundError(Exception): pass
class ConferenceApplication(SylkApplication):
implements(IObserver)
def __init__(self):
self._rooms = {}
self.invited_participants_map = {}
self.bonjour_focus_service = Null
self.bonjour_room_service = Null
self.web = Null
def start(self):
self.web = ConferenceWeb(self)
web_server.register_resource('conference', self.web.resource())
# cleanup old files
for path in (ConferenceConfig.file_transfer_dir, ConferenceConfig.screensharing_images_dir):
try:
shutil.rmtree(path)
except EnvironmentError:
pass
if ServerConfig.enable_bonjour and ServerConfig.default_application == 'conference':
self.bonjour_focus_service = BonjourService(service='sipfocus')
self.bonjour_focus_service.start()
log.info("Bonjour publication started for service 'sipfocus'")
self.bonjour_room_service = BonjourService(service='sipuri', name='Conference Room', uri_user='conference')
self.bonjour_room_service.start()
self.bonjour_room_service.presence_state = BonjourPresenceState('available', u'No participants')
log.info("Bonjour publication started for service 'sipuri'")
def stop(self):
self.bonjour_focus_service.stop()
self.bonjour_room_service.stop()
def get_room(self, uri, create=False):
room_uri = '%s@%s' % (uri.user, uri.host)
try:
room = self._rooms[room_uri]
except KeyError:
if create:
room = Room(room_uri)
self._rooms[room_uri] = room
return room
else:
raise RoomNotFoundError
else:
return room
def remove_room(self, uri):
room_uri = '%s@%s' % (uri.user, uri.host)
self._rooms.pop(room_uri, None)
def validate_acl(self, room_uri, from_uri):
room_uri = '%s@%s' % (room_uri.user, room_uri.host)
cfg = get_room_config(room_uri)
if cfg.access_policy == 'allow,deny':
if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri):
return
raise ACLValidationError
else:
if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri):
raise ACLValidationError
def incoming_session(self, session):
log.info('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri))
audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat']
transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer']
if not audio_streams and not chat_streams and not transfer_streams:
log.info(u'Session rejected: invalid media, only RTP audio and MSRP chat are supported')
session.reject(488)
return
audio_stream = audio_streams[0] if audio_streams else None
chat_stream = chat_streams[0] if chat_streams else None
transfer_stream = transfer_streams[0] if transfer_streams else None
try:
self.validate_acl(session.request_uri, session.remote_identity.uri)
except ACLValidationError:
log.info(u'Session rejected: unauthorized by access list')
session.reject(403)
return
if transfer_stream is not None:
try:
room = self.get_room(session.request_uri)
except RoomNotFoundError:
log.info(u'Session rejected: room not found')
session.reject(404)
return
if transfer_stream.direction == 'sendonly':
# file transfer 'pull'
try:
file = next(file for file in room.files if file.hash == transfer_stream.file_selector.hash)
except StopIteration:
log.info(u'Session rejected: requested file not found')
session.reject(404)
return
try:
transfer_stream.file_selector = file.file_selector
- except EnvironmentError, e:
+ except EnvironmentError as e:
log.info(u'Session rejected: error opening requested file: %s' % e)
session.reject(404)
return
else:
transfer_stream.handler.save_directory = os.path.join(ConferenceConfig.file_transfer_dir.normalized, room.uri)
NotificationCenter().add_observer(self, sender=session)
if audio_stream:
session.send_ring_indication()
streams = [stream for stream in (audio_stream, chat_stream, transfer_stream) if stream]
reactor.callLater(4 if audio_stream is not None else 0, self.accept_session, session, streams)
def incoming_subscription(self, subscribe_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
if Null in (from_header, to_header):
subscribe_request.reject(400)
return
if subscribe_request.event != 'conference':
log.info(u'Subscription for event %s rejected: only conference event is supported' % subscribe_request.event)
subscribe_request.reject(489)
return
try:
self.validate_acl(data.request_uri, from_header.uri)
except ACLValidationError:
try:
self.validate_acl(to_header.uri, from_header.uri)
except ACLValidationError:
# Check if we need to skip the ACL because this was an invited participant
if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or
str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})):
log.info(u'Subscription rejected: unauthorized by access list')
subscribe_request.reject(403)
return
try:
room = self.get_room(data.request_uri)
except RoomNotFoundError:
try:
room = self.get_room(to_header.uri)
except RoomNotFoundError:
log.info(u'Subscription rejected: room not yet created')
subscribe_request.reject(480)
return
if not room.started:
log.info(u'Subscription rejected: room not started yet')
subscribe_request.reject(480)
else:
room.handle_incoming_subscription(subscribe_request, data)
def incoming_referral(self, refer_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
refer_to_header = data.headers.get('Refer-To', Null)
if Null in (from_header, to_header, refer_to_header):
refer_request.reject(400)
return
log.info(u'Room %s - join request from %s to %s' % ('%s@%s' % (to_header.uri.user, to_header.uri.host), from_header.uri, refer_to_header.uri))
try:
self.validate_acl(data.request_uri, from_header.uri)
except ACLValidationError:
log.info(u'Room %s - invite participant request rejected: unauthorized by access list' % data.request_uri)
refer_request.reject(403)
return
referral_handler = IncomingReferralHandler(refer_request, data)
referral_handler.start()
def incoming_message(self, message_request, data):
log.info(u'SIP MESSAGE is not supported, use MSRP media instead')
message_request.answer(405)
def accept_session(self, session, streams):
if session.state == 'incoming':
try:
session.accept(streams, is_focus=True)
except IllegalStateError:
pass
def add_participant(self, session, room_uri):
# Keep track of the invited participants, we must skip ACL policy
# for SUBSCRIBE requests
room_uri_str = '%s@%s' % (room_uri.user, room_uri.host)
log.info(u'Room %s - outgoing session to %s started' % (room_uri_str, session.remote_identity.uri))
d = self.invited_participants_map.setdefault(room_uri_str, {})
d.setdefault(str(session.remote_identity.uri), 0)
d[str(session.remote_identity.uri)] += 1
NotificationCenter().add_observer(self, sender=session)
room = self.get_room(room_uri, True)
room.start()
room.add_session(session)
def remove_participant(self, participant_uri, room_uri):
try:
room = self.get_room(room_uri)
except RoomNotFoundError:
pass
else:
log.info('Room %s - %s removed from conference' % (room_uri, participant_uri))
room.terminate_sessions(participant_uri)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
session = notification.sender
room = self.get_room(session.request_uri, True)
room.start()
room.add_session(session)
@run_in_green_thread
def _NH_SIPSessionDidEnd(self, notification):
session = notification.sender
notification.center.remove_observer(self, sender=session)
if session.direction == 'incoming':
room_uri = session.request_uri
else:
# Clear invited participants mapping
room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host)
d = self.invited_participants_map[room_uri_str]
d[str(session.remote_identity.uri)] -= 1
if d[str(session.remote_identity.uri)] == 0:
del d[str(session.remote_identity.uri)]
room_uri = session.local_identity.uri
# We could get this notifiction even if we didn't get SIPSessionDidStart
try:
room = self.get_room(room_uri)
except RoomNotFoundError:
return
if session in room.sessions:
room.remove_session(session)
if not room.stopping and room.empty:
self.remove_room(room_uri)
room.stop()
def _NH_SIPSessionDidFail(self, notification):
session = notification.sender
notification.center.remove_observer(self, sender=session)
log.info(u'Session from %s failed: %s' % (session.remote_identity.uri, notification.data.reason))
class IncomingReferralHandler(object):
implements(IObserver)
def __init__(self, refer_request, data):
self._refer_request = refer_request
self._refer_headers = data.headers
self.room_uri = data.request_uri
self.room_uri_str = '%s@%s' % (self.room_uri.user, self.room_uri.host)
self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri)
self.method = data.headers.get('Refer-To').parameters.get('method', 'INVITE').upper()
self.session = None
self.streams = []
def start(self):
if not self.refer_to_uri.startswith(('sip:', 'sips:')):
self.refer_to_uri = 'sip:%s' % self.refer_to_uri
try:
self.refer_to_uri = SIPURI.parse(self.refer_to_uri)
except SIPCoreError:
log.info('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.reject(488)
return
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._refer_request)
if self.method == 'INVITE':
self._refer_request.accept()
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = self.refer_to_uri
lookup = DNSLookup()
notification_center.add_observer(self, sender=lookup)
lookup.lookup_sip_proxy(uri, settings.sip.transport_list)
elif self.method == 'BYE':
log.info('Room %s - %s removed %s from the room' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri))
self._refer_request.accept()
conference_application = ConferenceApplication()
conference_application.remove_participant(self.refer_to_uri, self.room_uri)
self._refer_request.end(200)
else:
self._refer_request.reject(488)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_DNSLookupDidSucceed(self, notification):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=notification.sender)
account = DefaultAccount()
conference_application = ConferenceApplication()
try:
room = conference_application.get_room(self.room_uri)
except RoomNotFoundError:
log.info('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.end(500)
return
active_media = set(room.active_media).intersection(('audio', 'chat'))
if not active_media:
log.info('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.end(500)
return
for stream_type in active_media:
self.streams.append(MediaStreamRegistry.get(stream_type)())
self.session = Session(account)
notification_center.add_observer(self, sender=self.session)
original_from_header = self._refer_headers.get('From')
if original_from_header.display_name:
original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host)
else:
original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host)
from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call')
to_header = ToHeader(self.refer_to_uri)
extra_headers = []
if self._refer_headers.get('Referred-By', None) is not None:
extra_headers.append(Header.new(self._refer_headers.get('Referred-By')))
else:
extra_headers.append(Header('Referred-By', str(original_from_header.uri)))
if ThorNodeConfig.enabled:
extra_headers.append(Header('Thor-Scope', 'conference-invitation'))
extra_headers.append(Header('X-Originator-From', str(original_from_header.uri)))
extra_headers.append(SubjectHeader(u'Join conference request from %s' % original_identity))
route = notification.data.result[0]
self.session.connect(from_header, to_header, route=route, streams=self.streams, is_focus=True, extra_headers=extra_headers)
def _NH_DNSLookupDidFail(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
def _NH_SIPSessionGotRingIndication(self, notification):
if self._refer_request is not None:
self._refer_request.send_notify(180)
def _NH_SIPSessionGotProvisionalResponse(self, notification):
if self._refer_request is not None:
self._refer_request.send_notify(notification.data.code, notification.data.reason)
def _NH_SIPSessionDidStart(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(200)
conference_application = ConferenceApplication()
conference_application.add_participant(self.session, self.room_uri)
log.info('Room %s - %s added %s' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri))
self.session = None
self.streams = []
def _NH_SIPSessionDidFail(self, notification):
log.info('Room %s - failed to add %s: %s' % (self.room_uri_str, self.refer_to_uri, notification.data.reason))
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(notification.data.code or 500, notification.data.reason or notification.data.code)
self.session = None
self.streams = []
def _NH_SIPSessionDidEnd(self, notification):
# If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead
log.info('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(200)
self.session = None
self.streams = []
def _NH_SIPIncomingReferralDidEnd(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
self._refer_request = None
diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py
index 5681f6d..8a1550a 100644
--- a/sylk/applications/conference/room.py
+++ b/sylk/applications/conference/room.py
@@ -1,1079 +1,1079 @@
import os
import random
import shutil
import string
import weakref
from collections import Counter, deque
from glob import glob
from itertools import chain, count, cycle
from application.notification import IObserver, NotificationCenter
from application.python import Null
from application.system import makedirs
from eventlib import api, coros, proc
from sipsimple.account.bonjour import BonjourPresenceState
from sipsimple.application import SIPApplication
from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPCoreError, SIPCoreInvalidStateError, SIPURI
from sipsimple.core import Header, FromHeader, ToHeader, SubjectHeader
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import conference
from sipsimple.streams import MediaStreamRegistry
from sipsimple.streams.msrp.chat import ChatIdentity, CPIMHeader, CPIMNamespace
from sipsimple.streams.msrp.filetransfer import FileSelector
from sipsimple.threading import run_in_thread, run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from sipsimple.util import ISOTimestamp
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications.conference.configuration import get_room_config, ConferenceConfig
from sylk.applications.conference.logger import log
from sylk.bonjour import BonjourService
from sylk.configuration import ServerConfig, ThorNodeConfig
from sylk.configuration.datatypes import URL
from sylk.resources import Resources
from sylk.session import Session, IllegalStateError
from sylk.web import server as web_server
def format_identity(identity):
uri = identity.uri
if identity.display_name:
return u'%s <%s@%s>' % (identity.display_name, uri.user, uri.host)
else:
return u'%s@%s' % (uri.user, uri.host)
class ScreenImage(object):
def __init__(self, room, sender):
self.room = weakref.ref(room)
self.room_uri = room.uri
self.sender = sender
self.filename = os.path.join(ConferenceConfig.screensharing_images_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10))))
self.url = URL(web_server.url + '/conference/' + room.uri + '/screensharing')
self.url.query_items['image'] = os.path.basename(self.filename)
self.state = None
self.timer = None
@property
def active(self):
return self.state == 'active'
@property
def idle(self):
return self.state == 'idle'
@run_in_thread('file-io')
def save(self, image):
makedirs(os.path.dirname(self.filename))
tmp_filename = self.filename + '.tmp'
try:
with open(tmp_filename, 'wb') as file:
file.write(image)
- except EnvironmentError, e:
+ except EnvironmentError as e:
log.info('Room %s - cannot write screen sharing image: %s: %s' % (self.room_uri, self.filename, e))
else:
try:
os.rename(tmp_filename, self.filename)
except EnvironmentError:
pass
self.advertise()
@run_in_twisted_thread
def advertise(self):
if self.state == 'active':
self.timer.reset(10)
else:
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.state = 'active'
self.timer = reactor.callLater(10, self.stop_advertising)
room = self.room() or Null
room.dispatch_conference_info()
txt = 'Room %s - %s is sharing the screen at %s' % (self.room_uri, format_identity(self.sender), self.url)
room.dispatch_server_message(txt)
log.info(txt)
@run_in_twisted_thread
def stop_advertising(self):
if self.state != 'idle':
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.state = 'idle'
self.timer = None
room = self.room() or Null
room.dispatch_conference_info()
txt = '%s stopped sharing the screen' % format_identity(self.sender)
room.dispatch_server_message(txt)
log.info(txt)
class Room(object):
"""
Object representing a conference room, it will handle the message dispatching
among all the participants.
"""
implements(IObserver)
def __init__(self, uri):
self.config = get_room_config(uri)
self.uri = uri
self.identity = ChatIdentity(SIPURI.parse('sip:%s' % self.uri), display_name='Conference Room')
self.files = []
self.screen_images = {}
self.sessions = []
self.subscriptions = []
self.state = 'stopped'
self.incoming_message_queue = coros.queue()
self.message_dispatcher = None
self.audio_conference = None
self.moh_player = None
self.conference_info_payload = None
self.conference_info_version = count(1)
self.bonjour_services = Null
self.session_nickname_map = {}
self.last_nicknames_map = {}
self.participants_counter = Counter()
self.history = deque(maxlen=ConferenceConfig.history_size)
@property
def empty(self):
return len(self.sessions) == 0
@property
def started(self):
return self.state == 'started'
@property
def stopping(self):
return self.state in ('stopping', 'stopped')
@property
def active_media(self):
return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams))))
@property
def conference_info(self):
if self.conference_info_payload is None:
settings = SIPSimpleSettings()
conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent)
conference_description.conf_uris = conference.ConfUris()
conference_description.conf_uris.add(conference.ConfUrisEntry('sip:%s' % self.uri, purpose='participation'))
if self.config.advertise_xmpp_support:
conference_description.conf_uris.add(conference.ConfUrisEntry('xmpp:%s' % self.uri, purpose='participation'))
# TODO: add grouptextchat service uri
for number in self.config.pstn_access_numbers:
conference_description.conf_uris.add(conference.ConfUrisEntry('tel:%s' % number, purpose='participation'))
host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com'))
self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users())
self.conference_info_payload.version = next(self.conference_info_version)
user_count = len(self.participants_counter)
self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True)
users = conference.Users()
for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')):
try:
user = next(user for user in users if user.entity == str(session.remote_identity.uri))
except StopIteration:
display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name)
user = conference.User(str(session.remote_identity.uri), display_text=display_text)
user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host)
screen_image = self.screen_images.get(user_uri, None)
if screen_image is not None and screen_image.active:
user.screen_image_url = screen_image.url
users.add(user)
joining_info = conference.JoiningInfo(when=session.start_time)
holdable_streams = [stream for stream in session.streams if stream.hold_supported]
session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams)
hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected')
display_text = self.session_nickname_map.get(session, session.remote_identity.display_name)
endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status)
for stream in session.streams:
if stream.type == 'file-transfer':
continue
endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream)))
user.add(endpoint)
self.conference_info_payload.users = users
if self.files:
files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, 'OK') for file in self.files)
self.conference_info_payload.conference_description.resources = conference.Resources(files=files)
return self.conference_info_payload.toxml()
def start(self):
if self.started:
return
if ServerConfig.enable_bonjour and self.identity.uri.user != 'conference':
room_user = self.identity.uri.user
self.bonjour_services = BonjourService(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user)
self.bonjour_services.start()
self.message_dispatcher = proc.spawn(self._message_dispatcher)
self.audio_conference = AudioConference()
self.audio_conference.hold()
self.moh_player = MoHPlayer(self.audio_conference)
self.moh_player.start()
self.state = 'started'
def stop(self):
if not self.started:
return
self.state = 'stopping'
self.bonjour_services.stop()
self.bonjour_services = None
self.incoming_message_queue.send_exception(api.GreenletExit)
self.incoming_message_queue = None
self.message_dispatcher.kill(proc.ProcExit)
self.message_dispatcher = None
self.moh_player.stop()
self.moh_player = None
self.audio_conference = None
notification_center = NotificationCenter()
for subscription in self.subscriptions:
notification_center.remove_observer(self, sender=subscription)
subscription.end()
self.subscriptions = []
self.cleanup_files()
self.conference_info_payload = None
self.state = 'stopped'
@run_in_thread('file-io')
def cleanup_files(self):
path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri)
try:
shutil.rmtree(path)
except EnvironmentError:
pass
path = os.path.join(ConferenceConfig.screensharing_images_dir, self.uri)
try:
shutil.rmtree(path)
except EnvironmentError:
pass
def _message_dispatcher(self):
"""Read from self.incoming_message_queue and dispatch the messages to other participants"""
while True:
session, message_type, data = self.incoming_message_queue.wait()
if message_type == 'message':
message = data.message
if message.sender.uri != session.remote_identity.uri:
continue
if message.content.startswith('?OTR:'):
continue
if message.timestamp is None:
message.timestamp = ISOTimestamp.utcnow()
message.sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), message.sender.display_name)
recipient = message.recipients[0]
private = len(message.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri
if private:
self.dispatch_private_message(session, message)
else:
self.history.append(message)
self.dispatch_message(session, message)
elif message_type == 'composing_indication':
if data.sender.uri != session.remote_identity.uri:
continue
recipient = data.recipients[0]
private = len(data.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri
if private:
self.dispatch_private_iscomposing(session, data)
else:
self.dispatch_iscomposing(session, data)
def dispatch_message(self, session, message):
for s in (s for s in self.sessions if s is not session):
try:
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers)
def dispatch_private_message(self, session, message):
# Private messages are delivered to all sessions matching the recipient but also to the sender,
# for replication in clients
recipient = message.recipients[0]
for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)):
try:
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers)
def dispatch_iscomposing(self, session, data):
identity = ChatIdentity(session.remote_identity.uri, session.remote_identity.display_name)
for s in (s for s in self.sessions if s is not session):
try:
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
chat_stream.send_composing_indication(data.state, data.refresh, sender=identity, recipients=[self.identity])
def dispatch_private_iscomposing(self, session, data):
identity = ChatIdentity(session.remote_identity.uri, session.remote_identity.display_name)
recipient_uri = data.recipients[0].uri
for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri):
try:
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
chat_stream.send_composing_indication(data.state, data.refresh, sender=identity)
def dispatch_server_message(self, content, content_type='text/plain', exclude=None):
ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp')
message_type = CPIMHeader('Message-Type', ns, 'status')
for session in (session for session in self.sessions if session is not exclude):
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
continue
chat_stream.send_message(content, content_type, sender=self.identity, recipients=[self.identity], additional_headers=[message_type])
def dispatch_conference_info(self):
data = self.conference_info
for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'):
try:
subscription.push_content(conference.ConferenceDocument.content_type, data)
except (SIPCoreError, SIPCoreInvalidStateError):
pass
def dispatch_file(self, file):
sender_uri = file.sender.uri
for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)):
handler = FileTransferHandler(self)
handler.init_outgoing(uri, file)
def add_session(self, session):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=session)
self.sessions.append(session)
remote_uri = str(session.remote_identity.uri)
self.participants_counter[remote_uri] += 1
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=chat_stream)
try:
audio_stream = next(stream for stream in session.streams if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=audio_stream)
log.info(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, audio_stream.codec, audio_stream.sample_rate,
audio_stream.local_rtp_address, audio_stream.local_rtp_port,
audio_stream.remote_rtp_address, audio_stream.remote_rtp_port))
if audio_stream.encryption.type != 'ZRTP':
# We don't listen for stream notifications early enough
if audio_stream.encryption.active:
log.info(u'Room %s - %s audio stream enabled %s encryption' % (self.uri,
format_identity(session.remote_identity),
audio_stream.encryption.type))
else:
log.info(u'Room %s - %s audio stream did not enable encryption' % (self.uri,
format_identity(session.remote_identity)))
try:
transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer')
except StopIteration:
pass
else:
transfer_handler = FileTransferHandler(self)
transfer_handler.init_incoming(transfer_stream)
if transfer_stream.direction == 'recvonly':
filename = os.path.basename(os.path.splitext(transfer_stream.file_selector.name)[0])
txt = u'Room %s - %s is uploading file %s (%s)' % (self.uri, format_identity(session.remote_identity), filename,self.format_file_size(transfer_stream.file_selector.size))
else:
filename = os.path.basename(transfer_stream.file_selector.name)
txt = u'Room %s - %s requested file %s' % (self.uri, format_identity(session.remote_identity), filename)
log.info(txt)
self.dispatch_server_message(txt)
if len(session.streams) == 1:
return
welcome_handler = WelcomeHandler(self, initial=True, session=session, streams=session.streams)
welcome_handler.run()
self.dispatch_conference_info()
if len(self.sessions) == 1:
log.info(u'Room %s - started by %s with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams)))
else:
log.info(u'Room %s - %s joined with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams)))
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session)
if ServerConfig.enable_bonjour:
self._update_bonjour_presence()
def remove_session(self, session):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=session)
self.sessions.remove(session)
self.session_nickname_map.pop(session, None)
remote_uri = str(session.remote_identity.uri)
self.participants_counter[remote_uri] -= 1
if self.participants_counter[remote_uri] == 0:
del self.participants_counter[remote_uri]
self.last_nicknames_map.pop(remote_uri, None)
try:
chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=chat_stream)
try:
audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=audio_stream)
try:
self.audio_conference.remove(audio_stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.moh_player.pause()
self.audio_conference.hold()
elif len(self.audio_conference.streams) == 1:
self.moh_player.play()
try:
next(stream for stream in session.streams if stream.type == 'file-transfer')
except StopIteration:
pass
else:
if len(session.streams) == 1:
return
self.dispatch_conference_info()
log.info(u'Room %s - %s left conference after %s' % (self.uri, format_identity(session.remote_identity), self.format_session_duration(session)))
if not self.sessions:
log.info(u'Room %s - Last participant left conference' % self.uri)
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session)))
if ServerConfig.enable_bonjour:
self._update_bonjour_presence()
def terminate_sessions(self, uri):
if not self.started:
return
for session in (session for session in self.sessions if session.remote_identity.uri == uri):
session.end()
def handle_incoming_subscription(self, subscribe_request, data):
log.info('Room %s - subscription from %s' % (self.uri, data.headers['From'].uri))
if subscribe_request.event != 'conference':
log.info('Room %s - Subscription for event %s rejected: only conference event is supported' % (self.uri, subscribe_request.event))
subscribe_request.reject(489)
return
NotificationCenter().add_observer(self, sender=subscribe_request)
self.subscriptions.append(subscribe_request)
try:
subscribe_request.accept(conference.ConferenceDocument.content_type, self.conference_info)
- except SIPCoreError, e:
+ except SIPCoreError as e:
log.warning('Error accepting SIP subscription: %s' % e)
subscribe_request.end()
def _accept_proposal(self, session, streams):
try:
session.accept_proposal(streams)
except IllegalStateError:
pass
session.proposal_timer = None
def add_file(self, file):
self.dispatch_server_message('%s has uploaded file %s (%s)' % (format_identity(file.sender), os.path.basename(file.name), self.format_file_size(file.size)))
self.files.append(file)
self.dispatch_conference_info()
if ConferenceConfig.push_file_transfer:
self.dispatch_file(file)
def add_screen_image(self, sender, image):
sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host)
screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender))
screen_image.save(image)
def _update_bonjour_presence(self):
num = len(self.sessions)
if num == 0:
num_str = 'No'
elif num == 1:
num_str = 'One'
elif num == 2:
num_str = 'Two'
else:
num_str = str(num)
txt = u'%s participant%s' % (num_str, '' if num==1 else 's')
presence_state = BonjourPresenceState('available', txt)
if self.bonjour_services is Null:
# This is the room being published all the time
from sylk.applications.conference import ConferenceApplication
ConferenceApplication().bonjour_room_service.presence_state = presence_state
else:
self.bonjour_services.presence_state = presence_state
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_RTPStreamDidEnableEncryption(self, notification):
stream = notification.sender
session = stream.session
log.info(u'Room %s - %s %s stream enabled %s encryption' % (self.uri,
format_identity(session.remote_identity),
stream.type,
stream.encryption.type))
def _NH_RTPStreamDidNotEnableEncryption(self, notification):
stream = notification.sender
session = stream.session
log.info(u'Room %s - %s %s stream did not enable encryption: %s' % (self.uri,
format_identity(session.remote_identity),
stream.type,
notification.data.reason))
def _NH_RTPStreamZRTPReceivedSAS(self, notification):
if not self.config.zrtp_auto_verify:
return
stream = notification.sender
session = stream.session
sas = notification.data.sas
# Send ZRTP SAS over the chat stream, if available
try:
chat_stream = next(stream for stream in session.streams if stream.type=='chat')
except StopIteration:
return
# Only send the message if there are no relays in between
secure_chat = chat_stream.transport == 'tls' and all(len(path)==1 for path in (chat_stream.msrp.full_local_path, chat_stream.msrp.full_remote_path))
if secure_chat:
txt = 'Received ZRTP Short Authentication String: %s' % sas
# Don't set the remote identity, that way it will appear as a private message
ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp')
message_type = CPIMHeader('Message-Type', ns, 'status')
chat_stream.send_message(txt, 'text/plain', sender=self.identity, additional_headers=[message_type])
def _NH_RTPStreamDidTimeout(self, notification):
stream = notification.sender
if stream.type != 'audio':
return
session = stream.session
log.info(u'Room %s - audio stream for session %s timed out' % (self.uri, format_identity(session.remote_identity)))
if session.streams == [stream]:
session.end()
def _NH_ChatStreamGotMessage(self, notification):
stream = notification.sender
data = notification.data
session = notification.sender.session
message = data.message
content_type = message.content_type.lower()
if content_type.startswith(('text/', 'image/')):
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
self.incoming_message_queue.send((session, 'message', data))
elif content_type == 'application/blink-screensharing':
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
self.add_screen_image(message.sender, message.content)
elif content_type == 'application/blink-zrtp-sas':
if not self.config.zrtp_auto_verify:
stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message')
return
try:
audio_stream = next(stream for stream in session.streams if stream.type=='audio' and stream.encryption.active and stream.encryption.type=='ZRTP')
except StopIteration:
stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message')
return
# Only trust it if there was a direct path and the transport is TLS
secure_chat = stream.transport == 'tls' and all(len(path)==1 for path in (stream.msrp.full_local_path, stream.msrp.full_remote_path))
remote_sas = str(message.content)
if remote_sas == audio_stream.encryption.zrtp.sas and secure_chat:
audio_stream.encryption.zrtp.verified = True
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
else:
stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message')
else:
stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message')
def _NH_ChatStreamGotComposingIndication(self, notification):
stream = notification.sender
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
data = notification.data
session = notification.sender.session
self.incoming_message_queue.send((session, 'composing_indication', data))
def _NH_ChatStreamGotNicknameRequest(self, notification):
nickname = notification.data.nickname
session = notification.sender.session
chunk = notification.data.chunk
if nickname:
if nickname in self.session_nickname_map.values() and (session not in self.session_nickname_map or self.session_nickname_map[session] != nickname):
notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use')
return
self.session_nickname_map[session] = nickname
self.last_nicknames_map[str(session.remote_identity.uri)] = nickname
else:
self.session_nickname_map.pop(session, None)
self.last_nicknames_map.pop(str(session.remote_identity.uri), None)
notification.sender.accept_nickname(chunk)
self.dispatch_conference_info()
def _NH_SIPIncomingSubscriptionDidEnd(self, notification):
subscription = notification.sender
try:
self.subscriptions.remove(subscription)
except ValueError:
pass
else:
notification.center.remove_observer(self, sender=subscription)
def _NH_SIPSessionDidChangeHoldState(self, notification):
session = notification.sender
if notification.data.originator == 'remote':
if notification.data.on_hold:
log.info(u'Room %s - %s has put the audio session on hold' % (self.uri, format_identity(session.remote_identity)))
else:
log.info(u'Room %s - %s has taken the audio session out of hold' % (self.uri, format_identity(session.remote_identity)))
self.dispatch_conference_info()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat']
if not audio_streams and not chat_streams:
session.reject_proposal()
return
streams = [streams[0] for streams in (audio_streams, chat_streams) if streams]
timer = reactor.callLater(3, self._accept_proposal, session, streams)
old_timer = getattr(session, 'proposal_timer', None)
assert old_timer is None
session.proposal_timer = timer
def _NH_SIPSessionProposalRejected(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
timer = getattr(session, 'proposal_timer', None)
if timer is not None:
timer.cancel()
session.proposal_timer = None
def _NH_SIPSessionHadProposalFailure(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
timer = getattr(session, 'proposal_timer', None)
assert timer is not None
timer.cancel()
session.proposal_timer = None
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
session = notification.sender
for stream in notification.data.added_streams:
notification.center.add_observer(self, sender=stream)
txt = u'%s has added %s' % (format_identity(session.remote_identity), stream.type)
log.info(u'Room %s - %s' % (self.uri, txt))
self.dispatch_server_message(txt, exclude=session)
if stream.type == 'audio':
log.info(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, stream.codec, stream.sample_rate,
stream.local_rtp_address, stream.local_rtp_port,
stream.remote_rtp_address, stream.remote_rtp_port))
if stream.encryption.type != 'ZRTP':
# We don't listen for stream notifications early enough
if stream.encryption.active:
log.info(u'Room %s - %s %s stream enabled %s encryption' % (self.uri,
format_identity(session.remote_identity),
stream.type,
stream.encryption.type))
else:
log.info(u'Room %s - %s %s stream did not enable encryption' % (self.uri,
format_identity(session.remote_identity),
stream.type))
if notification.data.added_streams:
welcome_handler = WelcomeHandler(self, initial=False, session=session, streams=notification.data.added_streams)
welcome_handler.run()
for stream in notification.data.removed_streams:
notification.center.remove_observer(self, sender=stream)
txt = u'%s has removed %s' % (format_identity(session.remote_identity), stream.type)
log.info(u'Room %s - %s' % (self.uri, txt))
self.dispatch_server_message(txt, exclude=session)
if stream.type == 'audio':
try:
self.audio_conference.remove(stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.moh_player.pause()
self.audio_conference.hold()
elif len(self.audio_conference.streams) == 1:
self.moh_player.play()
if not session.streams:
log.info(u'Room %s - %s has removed all streams, session will be terminated' % (self.uri, format_identity(session.remote_identity)))
session.end()
self.dispatch_conference_info()
def _NH_SIPSessionTransferNewIncoming(self, notification):
log.info(u'Room %s - Call transfer request rejected, REFER must be out of dialog (RFC4579 5.5)' % self.uri)
notification.sender.reject_transfer(403)
def _NH_SIPSessionWillEnd(self, notification):
session = notification.sender
timer = getattr(session, 'proposal_timer', None)
if timer is not None and timer.active():
timer.cancel()
session.proposal_timer = None
@staticmethod
def format_stream_types(streams):
if not streams:
return ''
if len(streams) == 1:
txt = 'with %s' % streams[0].type
else:
txt = 'with %s' % ','.join(stream.type for stream in streams[:-1])
txt += ' and %s' % streams[-1:][0].type
return txt
@staticmethod
def format_conference_stream_type(stream):
if stream.type == 'chat':
return 'message'
return stream.type
@staticmethod
def format_session_duration(session):
if session.start_time:
duration = session.end_time - session.start_time
seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1
minutes, seconds = seconds / 60, seconds % 60
hours, minutes = minutes / 60, minutes % 60
hours += duration.days*24
if not minutes and not hours:
duration_text = '%d seconds' % seconds
elif not hours:
duration_text = '%02d:%02d' % (minutes, seconds)
else:
duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds)
else:
duration_text = '0s'
return duration_text
@staticmethod
def format_file_size(size):
infinite = float('infinity')
boundaries = [( 1024, '%d bytes', 1),
( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0),
( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0),
(10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)]
for boundary, format, divisor in boundaries:
if size < boundary:
return format % (size/divisor,)
else:
return "%d bytes" % size
class MoHPlayer(object):
implements(IObserver)
def __init__(self, conference):
self.conference = conference
self.files = None
self.paused = None
self._player = None
def start(self):
files = glob('%s/*.wav' % Resources.get('sounds/moh'))
if not files:
log.error(u'No files found, MoH is disabled')
return
random.shuffle(files)
self.files = cycle(files)
self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_delay=1, volume=20)
self.paused = True
self.conference.bridge.add(self._player)
NotificationCenter().add_observer(self, sender=self._player)
def stop(self):
if self._player is None:
return
NotificationCenter().remove_observer(self, sender=self._player)
self._player.stop()
self.paused = True
self.conference.bridge.remove(self._player)
self.conference = None
def play(self):
if self._player is not None and self.paused:
self.paused = False
self._play_next_file()
def pause(self):
if self._player is not None and not self.paused:
self.paused = True
self._player.stop()
def _play_next_file(self):
self._player.filename = next(self.files)
self._player.play()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_WavePlayerDidFail(self, notification):
if not self.paused:
self._play_next_file()
_NH_WavePlayerDidEnd = _NH_WavePlayerDidFail
class WelcomeHandler(object):
implements(IObserver)
def __init__(self, room, initial, session, streams):
self.room = room
self.initial = initial
self.session = session
self.streams = streams
self.procs = proc.RunningProcSet()
@run_in_green_thread
def run(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
for stream in self.streams:
if stream.type == 'audio':
self.procs.spawn(self.audio_welcome, stream)
elif stream.type == 'chat':
self.procs.spawn(self.chat_welcome, stream)
self.procs.waitall()
notification_center.remove_observer(self, sender=self.session)
self.session = None
self.streams = None
self.room = None
self.procs = None
def play_file_in_player(self, player, file, delay):
player.filename = file
player.pause_time = delay
try:
player.play().wait()
- except WavePlayerError, e:
+ except WavePlayerError as e:
log.warning(u'Error playing file %s: %s' % (file, e))
def audio_welcome(self, stream):
player = WavePlayer(stream.mixer, '', pause_time=1, initial_delay=1, volume=50)
stream.bridge.add(player)
try:
if self.initial:
file = Resources.get('sounds/co_welcome_conference.wav')
self.play_file_in_player(player, file, 1)
user_count = len({str(s.remote_identity.uri) for s in self.room.sessions if s.remote_identity.uri != self.session.remote_identity.uri and any(stream for stream in s.streams if stream.type == 'audio')})
if user_count == 0:
file = Resources.get('sounds/co_only_one.wav')
self.play_file_in_player(player, file, 0.5)
elif user_count == 1:
file = Resources.get('sounds/co_there_is_one.wav')
self.play_file_in_player(player, file, 0.5)
elif user_count < 100:
file = Resources.get('sounds/co_there_are.wav')
self.play_file_in_player(player, file, 0.2)
if user_count <= 24:
file = Resources.get('sounds/bi_%d.wav' % user_count)
self.play_file_in_player(player, file, 0.1)
else:
file = Resources.get('sounds/bi_%d0.wav' % (user_count / 10))
self.play_file_in_player(player, file, 0.1)
file = Resources.get('sounds/bi_%d.wav' % (user_count % 10))
self.play_file_in_player(player, file, 0.1)
file = Resources.get('sounds/co_more_participants.wav')
self.play_file_in_player(player, file, 0)
file = Resources.get('sounds/connected_tone.wav')
self.play_file_in_player(player, file, 0.1)
except proc.ProcExit:
# No need to remove the bridge from the stream, it's done automatically
pass
else:
stream.bridge.remove(player)
self.room.audio_conference.add(stream)
self.room.audio_conference.unhold()
if len(self.room.audio_conference.streams) == 1:
self.room.moh_player.play()
else:
self.room.moh_player.pause()
finally:
player.stop()
def chat_welcome(self, stream):
if self.initial:
txt = 'Welcome to SylkServer!'
else:
txt = ''
user_count = len({str(s.remote_identity.uri) for s in self.room.sessions if s.remote_identity.uri != self.session.remote_identity.uri})
if user_count == 0:
txt += ' You are the first participant'
else:
if user_count == 1:
txt += ' There is one more participant'
else:
txt += ' There are %s more participants' % user_count
txt += ' in this conference room.'
if self.room.config.advertise_xmpp_support or self.room.config.pstn_access_numbers or self.room.config.webrtc_gateway_url:
txt += '\n\nOther participants can join at these addresses:\n\n'
if self.room.config.pstn_access_numbers:
if len(self.room.config.pstn_access_numbers) == 1:
nums = self.room.config.pstn_access_numbers[0]
else:
nums = ', '.join(self.room.config.pstn_access_numbers[:-1]) + ' or %s' % self.room.config.pstn_access_numbers[-1]
txt += ' - Using a landline or mobile phone, dial %s (audio)\n' % nums
if self.room.config.advertise_xmpp_support:
txt += ' - Using an XMPP client, connect to group chat room %s (chat)\n' % self.room.uri
txt += ' - Using an XMPP Jingle capable client, add contact %s and call it (audio)\n' % self.room.uri
txt += ' - Using a SIP client, initiate a session to %s (audio and chat)\n' % self.room.uri
if self.room.config.webrtc_gateway_url:
webrtc_url = str(self.room.config.webrtc_gateway_url).replace('$room', self.room.uri)
txt += ' - Using a WebRTC enabled browser go to %s\n' % webrtc_url
stream.send_message(txt, 'text/plain', sender=self.room.identity, recipients=[self.room.identity])
for msg in self.room.history:
stream.send_message(msg.content, msg.content_type, sender=msg.sender, recipients=[self.room.identity], timestamp=msg.timestamp)
# Send ZRTP SAS over the chat stream, if applicable
if self.room.config.zrtp_auto_verify:
session = stream.session
try:
audio_stream = next(stream for stream in session.streams if stream.type=='audio')
except StopIteration:
pass
else:
if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active:
# Only send the message if there are no relays in between
secure_chat = stream.transport == 'tls' and all(len(path)==1 for path in (stream.msrp.full_local_path, stream.msrp.full_remote_path))
sas = audio_stream.encryption.zrtp.sas
if sas is not None and secure_chat:
txt = 'Received ZRTP Short Authentication String: %s' % sas
# Don't set the remote identity, that way it will appear as a private message
ns = CPIMNamespace('urn:ag-projects:xml:ns:cpim', prefix='agp')
message_type = CPIMHeader('Message-Type', ns, 'status')
stream.send_message(txt, 'text/plain', sender=self.room.identity, additional_headers=[message_type])
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionWillEnd(self, notification):
self.procs.killall()
class RoomFile(object):
def __init__(self, name, hash, size, sender):
self.name = name
self.hash = hash
self.size = size
self.sender = sender
@property
def file_selector(self):
return FileSelector.for_file(self.name, hash=self.hash)
class FileTransferHandler(object):
implements(IObserver)
def __init__(self, room):
self.room = weakref.ref(room)
self.session = None
self.stream = None
self.handler = None
self.direction = None
def init_incoming(self, stream):
self.direction = 'incoming'
self.stream = stream
self.session = stream.session
self.handler = stream.handler
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.stream)
notification_center.add_observer(self, sender=self.handler)
@run_in_green_thread
def init_outgoing(self, destination, file):
self.direction = 'outgoing'
room = self.room()
if room is None:
return
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = SIPURI.new(destination)
lookup = DNSLookup()
try:
route = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()[0]
except (DNSLookupError, IndexError):
return
self.session = Session(account)
self.stream = MediaStreamRegistry.get('file-transfer')(file.file_selector, 'sendonly')
self.handler = self.stream.handler
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.stream)
notification_center.add_observer(self, sender=self.handler)
from_header = FromHeader(SIPURI.new(room.identity.uri), u'Conference File Transfer')
to_header = ToHeader(SIPURI.new(destination))
extra_headers = []
if ThorNodeConfig.enabled:
extra_headers.append(Header('Thor-Scope', 'conference-invitation'))
extra_headers.append(Header('X-Originator-From', str(file.sender.uri)))
extra_headers.append(SubjectHeader(u'File uploaded by %s' % file.sender))
self.session.connect(from_header, to_header, route=route, streams=[self.stream], is_focus=True, extra_headers=extra_headers)
def _terminate(self, failure_reason=None):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.stream)
notification_center.remove_observer(self, sender=self.handler)
room = self.room()
if room is not None:
if failure_reason is None:
if self.direction == 'incoming' and self.stream.direction == 'recvonly':
sender = ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name)
file = RoomFile(self.stream.file_selector.name, self.stream.file_selector.hash, self.stream.file_selector.size, sender)
room.add_file(file)
else:
room.dispatch_server_message('File transfer for %s failed: %s' % (os.path.basename(self.stream.file_selector.name), failure_reason))
self.session = None
self.stream = None
self.handler = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_MediaStreamDidNotInitialize(self, notification):
self._terminate(failure_reason=notification.data.reason)
def _NH_FileTransferHandlerDidEnd(self, notification):
if self.direction == 'incoming':
if self.stream.direction == 'sendonly':
reactor.callLater(3, self.session.end)
else:
reactor.callLater(1, self.session.end)
else:
self.session.end()
self._terminate(failure_reason=notification.data.reason)
diff --git a/sylk/applications/ircconference/room.py b/sylk/applications/ircconference/room.py
index 08c05fe..3aaa7b1 100644
--- a/sylk/applications/ircconference/room.py
+++ b/sylk/applications/ircconference/room.py
@@ -1,672 +1,672 @@
import random
import urllib
import lxml.html
import lxml.html.clean
from itertools import count
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from eventlib import coros, proc
from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI, SIPCoreError, SIPCoreInvalidStateError
from sipsimple.payloads.conference import Conference, ConferenceDocument, ConferenceDescription, ConferenceState, Endpoint, EndpointStatus, HostInfo, JoiningInfo, Media, User, Users, WebPage
from sipsimple.streams.msrp.chat import ChatIdentity
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from twisted.internet import protocol, reactor
from twisted.words.protocols import irc
from zope.interface import implements
from sylk.applications.ircconference.configuration import get_room_configuration
from sylk.applications.ircconference.logger import log
from sylk.resources import Resources
def format_identity(identity):
uri = identity.uri
if identity.display_name:
return u'%s <sip:%s@%s>' % (identity.display_name, uri.user, uri.host)
else:
return u'sip:%s@%s' % (uri.user, uri.host)
def format_stream_types(streams):
if not streams:
return ''
if len(streams) == 1:
txt = 'with %s' % streams[0].type
else:
txt = 'with %s' % ','.join(stream.type for stream in streams[:-1])
txt += ' and %s' % streams[-1:][0].type
return txt
def format_session_duration(session):
if session.start_time:
duration = session.end_time - session.start_time
seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1
minutes, seconds = seconds / 60, seconds % 60
hours, minutes = minutes / 60, minutes % 60
hours += duration.days*24
if not minutes and not hours:
duration_text = '%d seconds' % seconds
elif not hours:
duration_text = '%02d:%02d' % (minutes, seconds)
else:
duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds)
else:
duration_text = '0s'
return duration_text
def format_conference_stream_type(stream):
if stream.type == 'chat':
return 'message'
return stream.type
def html2text(data):
try:
doc = lxml.html.document_fromstring(data)
cleaner = lxml.html.clean.Cleaner(style=True)
doc = cleaner.clean_html(doc)
return doc.text_content().strip('\n')
except Exception:
return ''
class IRCMessage(object):
def __init__(self, username, uri, content, content_type='text/plain'):
self.sender = ChatIdentity(uri, display_name=username)
self.content = content
self.content_type = content_type
class IRCRoom(object):
"""
Object representing a conference room, it will handle the message dispatching
among all the participants.
"""
__metaclass__ = Singleton
implements(IObserver)
def __init__(self, uri):
self.uri = uri
self.identity = ChatIdentity.parse('<sip:%s>' % self.uri)
self.sessions = []
self.sessions_with_proposals = []
self.subscriptions = []
self.pending_messages = []
self.state = 'stopped'
self.incoming_message_queue = coros.queue()
self.message_dispatcher = None
self.audio_conference = None
self.conference_info_payload = None
self.conference_info_version = count(1)
self.irc_connector = None
self.irc_protocol = None
@classmethod
def get_room(cls, uri):
room_uri = '%s@%s' % (uri.user, uri.host)
room = cls(room_uri)
return room
@property
def empty(self):
return len(self.sessions) == 0
@property
def started(self):
return self.state == 'started'
def start(self):
if self.state != 'stopped':
return
config = get_room_configuration(self.uri.split('@')[0])
factory = IRCBotFactory(config)
host, port = config.server
self.irc_connector = reactor.connectTCP(host, port, factory)
NotificationCenter().add_observer(self, sender=self.irc_connector.factory)
self.message_dispatcher = proc.spawn(self._message_dispatcher)
self.audio_conference = AudioConference()
self.audio_conference.hold()
self.state = 'started'
def stop(self):
if self.state != 'started':
return
self.state = 'stopped'
NotificationCenter().remove_observer(self, sender=self.irc_connector.factory)
self.irc_connector.factory.stop_requested = True
self.irc_connector.disconnect()
self.irc_connector = None
self.message_dispatcher.kill(proc.ProcExit)
self.audio_conference = None
def _message_dispatcher(self):
"""Read from self.incoming_message_queue and dispatch the messages to other participants"""
while True:
session, message_type, data = self.incoming_message_queue.wait()
if message_type == 'msrp_message':
if data.sender.uri != session.remote_identity.uri:
return
self.dispatch_message(session, data)
elif message_type == 'irc_message':
self.dispatch_irc_message(data)
def dispatch_message(self, session, message):
identity = ChatIdentity.parse(format_identity(session.remote_identity))
for s in (s for s in self.sessions if s is not session):
try:
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
pass
else:
chat_stream.send_message(message.content, message.content_type, sender=identity, recipients=[self.identity], timestamp=message.timestamp)
def dispatch_irc_message(self, message):
for session in self.sessions:
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
chat_stream.send_message(message.content, message.content_type, sender=message.sender, recipients=[self.identity])
def dispatch_server_message(self, content, content_type='text/plain', exclude=None):
for session in (session for session in self.sessions if session is not exclude):
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
chat_stream.send_message(content, content_type, sender=self.identity, recipients=[self.identity])
def get_conference_info(self):
# Send request to get participants list, we'll get a notification with it
if self.irc_protocol is not None:
self.irc_protocol.get_participants()
else:
self.dispatch_conference_info([])
def dispatch_conference_info(self, irc_participants):
data = self.build_conference_info_payload(irc_participants)
for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'):
try:
subscription.push_content(ConferenceDocument.content_type, data)
except (SIPCoreError, SIPCoreInvalidStateError):
pass
def build_conference_info_payload(self, irc_participants):
irc_configuration = get_room_configuration(self.uri.split('@')[0])
if self.conference_info_payload is None:
settings = SIPSimpleSettings()
conference_description = ConferenceDescription(display_text='#%s on %s' % (irc_configuration.channel, irc_configuration.server[0]), free_text='Hosted by %s' % settings.user_agent)
host_info = HostInfo(web_page=WebPage(irc_configuration.website))
self.conference_info_payload = Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=Users())
self.conference_info_payload.version = next(self.conference_info_version)
user_count = len({str(s.remote_identity.uri) for s in self.sessions}) + len(irc_participants)
self.conference_info_payload.conference_state = ConferenceState(user_count=user_count, active=True)
users = Users()
for session in self.sessions:
try:
user = next(user for user in users if user.entity == str(session.remote_identity.uri))
except StopIteration:
user = User(str(session.remote_identity.uri), display_text=session.remote_identity.display_name)
users.add(user)
joining_info = JoiningInfo(when=session.start_time)
holdable_streams = [stream for stream in session.streams if stream.hold_supported]
session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams)
hold_status = EndpointStatus('on-hold' if session_on_hold else 'connected')
endpoint = Endpoint(str(session._invitation.remote_contact_header.uri), display_text=session.remote_identity.display_name, joining_info=joining_info, status=hold_status)
for stream in session.streams:
endpoint.add(Media(id(stream), media_type=format_conference_stream_type(stream)))
user.add(endpoint)
for nick in irc_participants:
irc_uri = '%s@%s' % (urllib.quote(nick), irc_configuration.server[0])
user = User(irc_uri, display_text=nick)
users.add(user)
endpoint = Endpoint(irc_uri, display_text=nick)
endpoint.add(Media(random.randint(100000000, 999999999), media_type='message'))
user.add(endpoint)
self.conference_info_payload.users = users
return self.conference_info_payload.toxml()
def add_session(self, session):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=session)
self.sessions.append(session)
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=chat_stream)
try:
audio_stream = next(stream for stream in session.streams if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=audio_stream)
log.info(u'Audio stream using %s/%sHz, end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate,
audio_stream.local_rtp_address, audio_stream.local_rtp_port,
audio_stream.remote_rtp_address, audio_stream.remote_rtp_port))
welcome_handler = WelcomeHandler(self, session)
welcome_handler.start()
self.get_conference_info()
if len(self.sessions) == 1:
log.info(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams)))
else:
log.info(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams)))
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), format_stream_types(session.streams)), exclude=session)
def remove_session(self, session):
notification_center = NotificationCenter()
try:
chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=chat_stream)
try:
audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=audio_stream)
try:
self.audio_conference.remove(audio_stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.audio_conference.hold()
notification_center.remove_observer(self, sender=session)
self.sessions.remove(session)
self.get_conference_info()
log.info(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, format_session_duration(session)))
if not self.sessions:
log.info(u'Last participant left conference %s' % self.uri)
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), format_session_duration(session)))
def accept_proposal(self, session, streams):
if session in self.sessions_with_proposals:
session.accept_proposal(streams)
self.sessions_with_proposals.remove(session)
def handle_incoming_subscription(self, subscribe_request, data):
if subscribe_request.event != 'conference':
subscribe_request.reject(489)
return
NotificationCenter().add_observer(self, sender=subscribe_request)
self.subscriptions.append(subscribe_request)
try:
subscribe_request.accept()
- except SIPCoreError, e:
+ except SIPCoreError as e:
log.warning('Error accepting SIP subscription: %s' % e)
subscribe_request.end()
self.get_conference_info()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPIncomingSubscriptionDidEnd(self, notification):
subscription = notification.sender
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=subscription)
self.subscriptions.remove(subscription)
def _NH_SIPSessionDidChangeHoldState(self, notification):
session = notification.sender
if notification.data.originator == 'remote':
if notification.data.on_hold:
log.info(u'%s has put the audio session on hold' % format_identity(session.remote_identity))
else:
log.info(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity))
self.get_conference_info()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat']
if not audio_streams and not chat_streams:
session.reject_proposal()
return
if chat_streams:
chat_streams[0].chatroom_capabilities = []
streams = [streams[0] for streams in (audio_streams, chat_streams) if streams]
self.sessions_with_proposals.append(session)
reactor.callLater(4, self.accept_proposal, session, streams)
def _NH_SIPSessionProposalRejected(self, notification):
session = notification.sender
self.sessions_with_proposals.remove(session)
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
session = notification.sender
for stream in notification.data.added_streams:
notification.center.add_observer(self, sender=stream)
log.info(u'%s has added %s to %s' % (format_identity(session.remote_identity), stream.type, self.uri))
self.dispatch_server_message('%s has added %s' % (format_identity(session.remote_identity), stream.type), exclude=session)
if stream.type == 'audio':
log.info(u'Audio stream using %s/%sHz, end-points: %s:%d <-> %s:%d' % (stream.codec, stream.sample_rate,
stream.local_rtp_address, stream.local_rtp_port,
stream.remote_rtp_address, stream.remote_rtp_port))
welcome_handler = WelcomeHandler(self, session)
welcome_handler.start(welcome_prompt=False)
for stream in notification.data.removed_streams:
notification.center.remove_observer(self, sender=stream)
log.info(u'%s has removed %s from %s' % (format_identity(session.remote_identity), stream.type, self.uri))
self.dispatch_server_message('%s has removed %s' % (format_identity(session.remote_identity), stream.type), exclude=session)
if stream.type == 'audio':
try:
self.audio_conference.remove(stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.audio_conference.hold()
if not session.streams:
log.info(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri))
session.end()
self.get_conference_info()
def _NH_RTPStreamDidTimeout(self, notification):
stream = notification.sender
if stream.type != 'audio':
return
session = stream.session
log.info(u'Audio stream for session %s timed out' % format_identity(session.remote_identity))
if session.streams == [stream]:
session.end()
def _NH_ChatStreamGotMessage(self, notification):
stream = notification.sender
message = notification.data.message
if message.content_type not in ('text/html', 'text/plain'):
log.info(u'Unsupported content type: %s, ignoring message' % message.content_type)
stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message')
return
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
# Send MSRP chat message to other participants
session = stream.session
self.incoming_message_queue.send((session, 'msrp_message', message))
# Send MSRP chat message to IRC chat room
if message.content_type == 'text/html':
content = html2text(message.content)
elif message.content_type == 'text/plain':
content = message.content
else:
log.warning('unexpected message type: %s' % message.content_type)
return
sender = message.sender
irc_message = '%s: %s' % (format_identity(sender), content)
if self.irc_protocol is not None:
self.irc_protocol.send_message(irc_message.encode('utf-8'))
else:
self.pending_messages.append(irc_message)
def _NH_ChatStreamGotNicknameRequest(self, notification):
# Discard the nickname but pretend we accept it so that XMPP clients can work
chunk = notification.data.chunk
notification.sender.accept_nickname(chunk)
def _NH_IRCBotGotConnected(self, notification):
self.irc_protocol = notification.data.protocol
# Send enqueued messages
while self.pending_messages:
message = self.pending_messages.pop(0)
self.irc_protocol.send_message(message.encode('utf-8'))
# Update participants list
self.get_conference_info()
def _NH_IRCBotGotDisconnected(self, notification):
self.irc_protocol = None
def _NH_IRCBotGotMessage(self, notification):
message = notification.data.message
self.incoming_message_queue.send((None, 'irc_message', message))
def _NH_IRCBotGotParticipantsList(self, notification):
self.dispatch_conference_info(notification.data.participants)
def _NH_IRCBotJoinedChannel(self, notification):
self.get_conference_info()
def _NH_IRCBotUserJoined(self, notification):
self.dispatch_server_message('%s joined the IRC channel' % notification.data.user)
self.get_conference_info()
def _NH_IRCBotUserLeft(self, notification):
self.dispatch_server_message('%s left the IRC channel' % notification.data.user)
self.get_conference_info()
def _NH_IRCBotUserQuit(self, notification):
self.dispatch_server_message('%s quit the IRC channel: %s' % (notification.data.user, notification.data.reason))
self.get_conference_info()
def _NH_IRCBotUserKicked(self, notification):
data = notification.data
self.dispatch_server_message('%s kicked %s out of the IRC channel: %s' % (data.kicker, data.kickee, data.reason))
self.get_conference_info()
def _NH_IRCBotUserRenamed(self, notification):
self.dispatch_server_message('%s changed his name to %s' % (notification.data.oldname, notification.data.newname))
self.get_conference_info()
def _NH_IRCBotUserAction(self, notification):
self.dispatch_server_message('%s %s' % (notification.data.user, notification.data.action))
class WelcomeHandler(object):
implements(IObserver)
def __init__(self, room, session):
self.room = room
self.session = session
self.proc = None
@run_in_green_thread
def start(self, welcome_prompt=True):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
self.proc = proc.spawn(self.play_audio_welcome, welcome_prompt)
self.proc.wait()
notification_center.remove_observer(self, sender=self.session)
self.session = None
self.room = None
self.proc = None
def play_file_in_player(self, player, file, delay):
player.filename = file
player.pause_time = delay
try:
player.play().wait()
- except WavePlayerError, e:
+ except WavePlayerError as e:
log.warning(u'Error playing file %s: %s' % (file, e))
def play_audio_welcome(self, welcome_prompt):
try:
audio_stream = next(stream for stream in self.session.streams if stream.type == 'audio')
except StopIteration:
return
player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_delay=1, volume=50)
audio_stream.bridge.add(player)
try:
if welcome_prompt:
file = Resources.get('sounds/co_welcome_conference.wav')
self.play_file_in_player(player, file, 1)
user_count = len({str(s.remote_identity.uri) for s in self.room.sessions if s.remote_identity.uri != self.session.remote_identity.uri and any(stream for stream in s.streams if stream.type == 'audio')})
if user_count == 0:
file = Resources.get('sounds/co_only_one.wav')
self.play_file_in_player(player, file, 0.5)
elif user_count == 1:
file = Resources.get('sounds/co_there_is_one.wav')
self.play_file_in_player(player, file, 0.5)
elif user_count < 100:
file = Resources.get('sounds/co_there_are.wav')
self.play_file_in_player(player, file, 0.2)
if user_count <= 24:
file = Resources.get('sounds/bi_%d.wav' % user_count)
self.play_file_in_player(player, file, 0.1)
else:
file = Resources.get('sounds/bi_%d0.wav' % (user_count / 10))
self.play_file_in_player(player, file, 0.1)
file = Resources.get('sounds/bi_%d.wav' % (user_count % 10))
self.play_file_in_player(player, file, 0.1)
file = Resources.get('sounds/co_more_participants.wav')
self.play_file_in_player(player, file, 0)
file = Resources.get('sounds/connected_tone.wav')
self.play_file_in_player(player, file, 0.1)
except proc.ProcExit:
# No need to remove the bridge from the stream, it's done automatically
pass
else:
audio_stream.bridge.remove(player)
self.room.audio_conference.add(audio_stream)
self.room.audio_conference.unhold()
finally:
player.stop()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionWillEnd(self, notification):
self.proc.kill()
class IRCBot(irc.IRCClient):
nickname = 'SylkServer'
def __init__(self):
self._nick_collector = []
self.nicks = []
def connectionMade(self):
irc.IRCClient.connectionMade(self)
log.info('Connection to IRC has been established')
NotificationCenter().post_notification('IRCBotGotConnected', self.factory, NotificationData(protocol=self))
def connectionLost(self, failure):
irc.IRCClient.connectionLost(self, failure)
NotificationCenter().post_notification('IRCBotGotDisconnected', self.factory, NotificationData())
def signedOn(self):
log.info('Logging into %s channel...' % self.factory.channel)
self.join(self.factory.channel)
def kickedFrom(self, channel, kicker, message):
log.info('Got kicked from %s by %s: %s. Rejoining...' % (channel, kicker, message))
self.join(self.factory.channel)
def joined(self, channel):
log.info('Logged into %s channel' % channel)
NotificationCenter().post_notification('IRCBotJoinedChannel', self.factory, NotificationData(channel=self.factory.channel))
def privmsg(self, user, channel, message):
if channel == '*':
return
username = user.split('!', 1)[0]
if username == self.nickname:
return
if channel == self.nickname:
self.msg(username, "Sorry, I don't support private messages, I'm a bot.")
return
uri = SIPURI.parse('sip:%s@%s' % (urllib.quote(username), self.factory.config.server[0]))
irc_message = IRCMessage(username, uri, message.decode('utf-8'))
data = NotificationData(message=irc_message)
NotificationCenter().post_notification('IRCBotGotMessage', self.factory, data)
def send_message(self, message):
self.say(self.factory.channel, message)
def get_participants(self):
self.sendLine("NAMES #%s" % self.factory.channel)
def got_participants(self, nicks):
data = NotificationData(participants=nicks)
NotificationCenter().post_notification('IRCBotGotParticipantsList', self.factory, data)
def irc_RPL_NAMREPLY(self, prefix, params):
"""Collect usernames from this channel. Several of these
messages may be sent to cover the channel's full nicklist.
An RPL_ENDOFNAMES signals the end of the list.
"""
# We just separate these into individual nicks and stuff them in
# the nickCollector, transferred to 'nicks' when we get the RPL_ENDOFNAMES.
for name in params[3].split():
# Remove operator and voice prefixes
if name[0] in '@+':
name = name[1:]
if name != self.nickname:
self._nick_collector.append(name)
def irc_RPL_ENDOFNAMES(self, prefix, params):
"""This is sent after zero or more RPL_NAMREPLY commands to
terminate the list of users in a channel.
"""
self.nicks = self._nick_collector
self._nick_collector = []
self.got_participants(self.nicks)
def userJoined(self, user, channel):
if channel.strip('#') == self.factory.channel:
data = NotificationData(user=user)
NotificationCenter().post_notification('IRCBotUserJoined', self.factory, data)
def userLeft(self, user, channel):
if channel.strip('#') == self.factory.channel:
data = NotificationData(user=user)
NotificationCenter().post_notification('IRCBotUserLeft', self.factory, data)
def userQuit(self, user, reason):
data = NotificationData(user=user, reason=reason)
NotificationCenter().post_notification('IRCBotUserQuit', self.factory, data)
def userKicked(self, kickee, channel, kicker, message):
if channel.strip('#') == self.factory.channel:
data = NotificationData(kickee=kickee, kicker=kicker, reason=message)
NotificationCenter().post_notification('IRCBotUserKicked', self.factory, data)
def userRenamed(self, oldname, newname):
data = NotificationData(oldname=oldname, newname=newname)
NotificationCenter().post_notification('IRCBotUserRenamed', self.factory, data)
def action(self, user, channel, data):
if channel.strip('#') == self.factory.channel:
username = user.split('!', 1)[0]
data = NotificationData(user=username, action=data)
NotificationCenter().post_notification('IRCBotUserAction', self.factory, data)
class IRCBotFactory(protocol.ClientFactory):
protocol = IRCBot
def __init__(self, config):
self.config = config
self.channel = config.channel
self.stop_requested = False
def clientConnectionLost(self, connector, failure):
log.info('Disconnected from IRC: %s' % failure.getErrorMessage())
if not self.stop_requested:
log.info('Reconnecting...')
connector.connect()
def clientConnectionFailed(self, connector, failure):
log.error('Connection to IRC server failed: %s' % failure.getErrorMessage())
diff --git a/sylk/applications/playback/__init__.py b/sylk/applications/playback/__init__.py
index b626098..43dbb41 100644
--- a/sylk/applications/playback/__init__.py
+++ b/sylk/applications/playback/__init__.py
@@ -1,169 +1,169 @@
import os
from application.python import Null
from application.notification import IObserver, NotificationCenter
from eventlib import proc
from sipsimple.account.bonjour import BonjourPresenceState
from sipsimple.audio import WavePlayer, WavePlayerError
from twisted.internet import reactor
from zope.interface import implements
from sylk.applications import SylkApplication, ApplicationLogger
from sylk.applications.playback.configuration import get_config
from sylk.bonjour import BonjourService
from sylk.configuration import ServerConfig
log = ApplicationLogger(__package__)
class PlaybackApplication(SylkApplication):
def start(self):
self.bonjour_services = []
if ServerConfig.enable_bonjour:
application_map = dict((item.split(':')) for item in ServerConfig.application_map)
for uri, app in application_map.iteritems():
if app == 'playback':
config = get_config('%s' % uri)
if config is None:
continue
if os.path.isfile(config.file) and os.access(config.file, os.R_OK):
service = BonjourService(service='sipuri', name='Playback Test', uri_user=uri, is_focus=False)
service.start()
service.presence_state = BonjourPresenceState('available', u'File: %s' % os.path.basename(config.file))
self.bonjour_services.append(service)
def stop(self):
for service in self.bonjour_services:
service.stop()
del self.bonjour_services[:]
def incoming_session(self, session):
log.info('Incoming session %s from %s to %s' % (session.call_id, session.remote_identity.uri, session.local_identity.uri))
config = get_config('%s@%s' % (session.request_uri.user, session.request_uri.host))
if config is None:
config = get_config('%s' % session.request_uri.user)
if config is None:
log.info(u'Session %s rejected: no configuration found for %s' % (session.call_id, session.request_uri))
session.reject(488)
return
stream_types = {'audio'}
if config.enable_video:
stream_types.add('video')
streams = [stream for stream in session.proposed_streams if stream.type in stream_types]
if not streams:
log.info(u'Session %s rejected: invalid media, only RTP audio and video is supported' % session.call_id)
session.reject(488)
return
handler = PlaybackHandler(config, session)
handler.run()
def incoming_subscription(self, request, data):
request.reject(405)
def incoming_referral(self, request, data):
request.reject(405)
def incoming_message(self, request, data):
request.reject(405)
class PlaybackHandler(object):
implements(IObserver)
def __init__(self, config, session):
self.config = config
self.session = session
self.proc = None
def run(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
self.session.send_ring_indication()
stream_types = {'audio'}
if self.config.enable_video:
stream_types.add('video')
streams = [stream for stream in self.session.proposed_streams if stream.type in stream_types]
reactor.callLater(self.config.answer_delay, self._accept_session, self.session, streams)
def _accept_session(self, session, streams):
if session.state == 'incoming':
session.accept(streams)
def _play(self):
config = get_config('%s@%s' % (self.session.request_uri.user, self.session.request_uri.host))
if config is None:
config = get_config('%s' % self.session.request_uri.user)
try:
audio_stream = next(stream for stream in self.session.streams if stream.type=='audio')
except StopIteration:
self.proc = None
return
player = WavePlayer(audio_stream.mixer, config.file)
audio_stream.bridge.add(player)
log.info(u'Playing file %s for session %s' % (config.file, self.session.call_id))
try:
player.play().wait()
- except (ValueError, WavePlayerError), e:
+ except (ValueError, WavePlayerError) as e:
log.warning(u'Error playing file %s: %s' % (config.file, e))
except proc.ProcExit:
pass
finally:
player.stop()
self.proc = None
audio_stream.bridge.remove(player)
self.session.end()
self.session = None
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
stream_types = {'audio'}
if self.config.enable_video:
stream_types.add('video')
streams = [stream for stream in session.proposed_streams if stream.type in stream_types]
if not streams:
session.reject_proposal()
return
session.accept_proposal(streams)
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
session = notification.sender
for stream in notification.data.added_streams:
log.info('Session %s added %s' % (session.call_id, stream.type))
for stream in notification.data.removed_streams:
log.info('Session %s removed %s' % (session.call_id, stream.type))
if notification.data.added_streams and self.proc is None:
self.proc = proc.spawn(self._play)
if notification.data.removed_streams and not session.streams:
session.end()
def _NH_SIPSessionDidStart(self, notification):
session = notification.sender
log.info('Session %s started' % session.call_id)
self.proc = proc.spawn(self._play)
def _NH_SIPSessionDidFail(self, notification):
session = notification.sender
log.info('Session %s failed' % session.call_id)
notification.center.remove_observer(self, sender=session)
def _NH_SIPSessionWillEnd(self, notification):
if self.proc:
self.proc.kill()
def _NH_SIPSessionDidEnd(self, notification):
session = notification.sender
log.info('Session %s ended' % session.call_id)
notification.center.remove_observer(self, sender=session)
diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py
index 4e5fbbe..52eab6f 100644
--- a/sylk/applications/xmppgateway/muc.py
+++ b/sylk/applications/xmppgateway/muc.py
@@ -1,465 +1,465 @@
import uuid
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.descriptor import WriteOnceAttribute
from eventlib import coros, proc
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, SIPURI, SIPCoreError, Referral, sip_status_messages
from sipsimple.core import ContactHeader, FromHeader, ToHeader, ReferToHeader, RouteHeader
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.streams import MediaStreamRegistry
from sipsimple.streams.msrp.chat import ChatStreamError, ChatIdentity
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from time import time
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource
from sylk.applications.xmppgateway.logger import log
from sylk.applications.xmppgateway.xmpp import XMPPManager
from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession
from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, OutgoingInvitationMessage, STANZAS_NS
from sylk.configuration import SIPConfig
from sylk.session import Session
class ReferralError(Exception):
def __init__(self, error, code=0):
self.error = error
self.code = code
class SIPReferralDidFail(Exception):
def __init__(self, data):
self.data = data
class MucInvitationFailure(object):
def __init__(self, code, reason):
self.code = code
self.reason = reason
def __str__(self):
return '%s (%s)' % (self.code, self.reason)
class X2SMucInvitationHandler(object):
implements(IObserver)
def __init__(self, sender, recipient, participant):
self.sender = sender
self.recipient = recipient
self.participant = participant
self.active = False
self.route = None
self._channel = coros.queue()
self._referral = None
self._failure = None
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, name='NetworkConditionsDidChange')
proc.spawn(self._run)
notification_center.post_notification('X2SMucInvitationHandlerDidStart', sender=self)
def _run(self):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
sender_uri = self.sender.uri.as_sip_uri()
recipient_uri = self.recipient.uri.as_sip_uri()
participant_uri = self.participant.uri.as_sip_uri()
try:
# Lookup routes
account = DefaultAccount()
if account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = SIPURI.new(recipient_uri)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
- except DNSLookupError, e:
+ except DNSLookupError as e:
raise ReferralError(error='DNS lookup failed: %s' % e)
timeout = time() + 30
for route in routes:
self.route = route
remaining_time = timeout - time()
if remaining_time > 0:
transport = route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
refer_to_header = ReferToHeader(str(participant_uri))
refer_to_header.parameters['method'] = 'INVITE'
referral = Referral(recipient_uri, FromHeader(sender_uri),
ToHeader(recipient_uri),
refer_to_header,
ContactHeader(contact_uri),
RouteHeader(route.uri),
account.credentials)
notification_center.add_observer(self, sender=referral)
try:
referral.send_refer(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=referral)
timeout = 5
raise ReferralError(error='Internal error')
self._referral = referral
try:
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidStart':
break
- except SIPReferralDidFail, e:
+ except SIPReferralDidFail as e:
notification_center.remove_observer(self, sender=referral)
self._referral = None
if e.data.code in (403, 405):
raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code)
else:
# Otherwise just try the next route
continue
else:
break
else:
self.route = None
raise ReferralError(error='No more routes to try')
# At this point it is subscribed. Handle notifications and ending/failures.
try:
self.active = True
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidEnd':
break
- except SIPReferralDidFail, e:
+ except SIPReferralDidFail as e:
notification_center.remove_observer(self, sender=self._referral)
raise ReferralError(error=e.data.reason, code=e.data.code)
else:
notification_center.remove_observer(self, sender=self._referral)
finally:
self.active = False
- except ReferralError, e:
+ except ReferralError as e:
self._failure = MucInvitationFailure(e.code, e.error)
finally:
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self._referral = None
if self._failure is not None:
notification_center.post_notification('X2SMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure))
else:
notification_center.post_notification('X2SMucInvitationHandlerDidEnd', sender=self)
def _refresh(self):
account = DefaultAccount()
transport = self.route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
contact_header = ContactHeader(contact_uri)
self._referral.refresh(contact_header=contact_header, timeout=2)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPReferralDidStart(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidEnd(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidFail(self, notification):
self._channel.send_exception(SIPReferralDidFail(notification.data))
def _NH_SIPReferralGotNotify(self, notification):
self._channel.send(notification)
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._refresh()
class S2XMucInvitationHandler(object):
implements(IObserver)
def __init__(self, session, sender, recipient, inviter):
self.session = session
self.sender = sender
self.recipient = recipient
self.inviter = inviter
self._timer = None
self._failure = None
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
stanza = OutgoingInvitationMessage(self.sender, self.recipient, self.inviter, id='MUC.'+uuid.uuid4().hex)
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
self._timer = reactor.callLater(90, self._timeout)
notification_center.post_notification('S2XMucInvitationHandlerDidStart', sender=self)
def stop(self):
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
notification_center = NotificationCenter()
if self.session is not None:
notification_center.remove_observer(self, sender=self.session)
reactor.callLater(5, self._end_session, self.session)
self.session = None
if self._failure is not None:
notification_center.post_notification('S2XMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure))
else:
notification_center.post_notification('S2XMucInvitationHandlerDidEnd', sender=self)
def _end_session(self, session):
try:
session.end(480)
except Exception:
pass
def _timeout(self):
NotificationCenter().remove_observer(self, sender=self.session)
try:
self.session.end(408)
except Exception:
pass
self.session = None
self._failure = MucInvitationFailure('Timeout', 408)
self.stop()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidFail(self, notification):
notification.center.remove_observer(self, sender=self.session)
self.session = None
self._failure = MucInvitationFailure(notification.data.reason or notification.data.failure_reason, notification.data.code)
self.stop()
class X2SMucHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity, nickname):
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
self.nickname = nickname
self._xmpp_muc_session = None
self._sip_session = None
self._msrp_stream = None
self._first_stanza = None
self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza
self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza
self._participants = set() # set of (URI, nickname) tuples
self.ended = False
def start(self):
notification_center = NotificationCenter()
self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_muc_session)
self._xmpp_muc_session.start()
notification_center.post_notification('X2SMucHandlerDidStart', sender=self)
self._start_sip_session()
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_muc_session is not None:
notification_center.remove_observer(self, sender=self._xmpp_muc_session)
# Send indication that the user has been kicked from the room
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False)
stanza.jid = self.xmpp_identity
stanza.muc_statuses.append('307')
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
self._xmpp_muc_session.end()
self._xmpp_muc_session = None
if self._sip_session is not None:
notification_center.remove_observer(self, sender=self._sip_session)
self._sip_session.end()
self._sip_session = None
self.ended = True
notification_center.post_notification('X2SMucHandlerDidEnd', sender=self)
@run_in_green_thread
def _start_sip_session(self):
# self.xmpp_identity is our local identity
from_uri = self.xmpp_identity.uri.as_sip_uri()
del from_uri.parameters['gr'] # no GRUU in From header
contact_uri = self.xmpp_identity.uri.as_sip_uri()
contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8'))
to_uri = self.sip_identity.uri.as_sip_uri()
lookup = DNSLookup()
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = to_uri
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError:
log.warning('DNS lookup error while looking for %s proxy' % uri)
self.end()
return
self._msrp_stream = MediaStreamRegistry.get('chat')()
route = routes.pop(0)
from_header = FromHeader(from_uri)
to_header = ToHeader(to_uri)
contact_header = ContactHeader(contact_uri)
self._sip_session = Session(account)
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._sip_session)
notification_center.add_observer(self, sender=self._msrp_stream)
self._sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self._msrp_stream])
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
log.info("SIP multiparty session %s started" % self._sip_session.call_id)
if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed:
self.end()
return
message_id = self._msrp_stream.set_local_nickname(self.nickname)
self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza)
self._first_stanza = None
def _NH_SIPSessionDidEnd(self, notification):
log.info("SIP multiparty session %s ended" % self._sip_session.call_id)
notification.center.remove_observer(self, sender=self._sip_session)
notification.center.remove_observer(self, sender=self._msrp_stream)
self._sip_session = None
self._msrp_stream = None
self.end()
def _NH_SIPSessionDidFail(self, notification):
log.info("SIP multiparty session %s failed" % self._sip_session.call_id)
notification.center.remove_observer(self, sender=self._sip_session)
notification.center.remove_observer(self, sender=self._msrp_stream)
self._sip_session = None
self._msrp_stream = None
self.end()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
self._sip_session.reject_proposal()
def _NH_SIPSessionTransferNewIncoming(self, notification):
self._sip_session.reject_transfer(403)
def _NH_SIPSessionGotConferenceInfo(self, notification):
# Translate to XMPP payload
xmpp_manager = XMPPManager()
own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host)
conference_info = notification.data.conference_info
new_participants = set()
for user in conference_info.users:
user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity)
nickname = user.display_text.value if user.display_text else user.entity
new_participants.add((user_uri, nickname))
# Remove participants that are no longer in the room
for uri, nickname in self._participants - new_participants:
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False)
xmpp_manager.send_muc_stanza(stanza)
# Send presence for current participants
for uri, nickname in new_participants:
if uri == own_uri:
continue
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True)
stanza.jid = Identity(uri)
xmpp_manager.send_muc_stanza(stanza)
self._participants = new_participants
# Send own status last
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True)
stanza.jid = self.xmpp_identity
stanza.muc_statuses.append('110')
xmpp_manager.send_muc_stanza(stanza)
def _NH_ChatStreamGotMessage(self, notification):
# Notification is sent by the MSRP stream
if not self._xmpp_muc_session:
return
message = notification.data.message
content_type = message.content_type.lower()
if content_type not in ('text/plain', 'text/html'):
return
if content_type == 'text/plain':
html_body = None
body = message.content
else:
html_body = message.content
body = None
resource = message.sender.display_name or str(message.sender.uri)
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource))
self._xmpp_muc_session.send_message(sender, body, html_body, message_id='MUC.'+uuid.uuid4().hex)
self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
def _NH_ChatStreamDidSetNickname(self, notification):
# Notification is sent by the MSRP stream
nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id)
self.nickname = nickname
def _NH_ChatStreamDidNotSetNickname(self, notification):
# Notification is sent by the MSRP stream
nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id)
error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', [('conflict', STANZAS_NS)])
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(error_stanza)
def _NH_ChatStreamDidDeliverMessage(self, notification):
# Echo back the message to the sender
stanza = self._pending_messages_map.pop(notification.data.message_id)
stanza.sender, stanza.recipient = stanza.recipient, stanza.sender
stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname)
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
def _NH_ChatStreamDidNotDeliverMessage(self, notification):
self._pending_messages_map.pop(notification.data.message_id)
def _NH_XMPPIncomingMucSessionDidEnd(self, notification):
notification.center.remove_observer(self, sender=self._xmpp_muc_session)
self._xmpp_muc_session = None
self.end()
def _NH_XMPPIncomingMucSessionGotMessage(self, notification):
if not self._sip_session:
return
message = notification.data.message
sender_uri = message.sender.uri.as_sip_uri()
del sender_uri.parameters['gr'] # no GRUU in CPIM From header
sender = ChatIdentity(sender_uri, display_name=self.nickname)
message_id = self._msrp_stream.send_message(message.body, 'text/plain', sender=sender)
self._pending_messages_map[message_id] = message
# Message will be echoed back to the sender on ChatStreamDidDeliverMessage
def _NH_XMPPIncomingMucSessionChangedNickname(self, notification):
if not self._sip_session:
return
nickname = notification.data.nickname
try:
message_id = self._msrp_stream.set_local_nickname(nickname)
except ChatStreamError:
return
self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza)
diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py
index 20b7b77..db3a3a8 100644
--- a/sylk/applications/xmppgateway/presence.py
+++ b/sylk/applications/xmppgateway/presence.py
@@ -1,520 +1,520 @@
import hashlib
import random
from application.notification import IObserver, NotificationCenter
from application.python import Null, limit
from application.python.descriptor import WriteOnceAttribute
from eventlib import coros, proc
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, SIPURI, SIPCoreError
from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader
from sipsimple.core import Subscription
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import pidf, rpid, caps
from sipsimple.payloads import ParserError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
from sipsimple.util import ISOTimestamp
from time import time
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource
from sylk.applications.xmppgateway.logger import log
from sylk.applications.xmppgateway.util import format_uri
from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence
from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscription, XMPPIncomingSubscription
from sylk.configuration import SIPConfig
__all__ = ['S2XPresenceHandler', 'X2SPresenceHandler']
class S2XPresenceHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity):
self.ended = False
self._sip_subscriptions = []
self._stanza_cache = {}
self._pidf = None
self._xmpp_subscription = None
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
def start(self):
notification_center = NotificationCenter()
self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.start()
notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self)
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_subscription is not None:
notification_center.remove_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.end()
self._xmpp_subscription = None
while self._sip_subscriptions:
subscription = self._sip_subscriptions.pop()
notification_center.remove_observer(self, sender=subscription)
try:
subscription.end()
except SIPCoreError:
pass
self.ended = True
notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self)
def add_sip_subscription(self, subscription):
# If s subscription is received after the handle has ended but before
# S2XPresenceHandlerDidEnd has been processed we need to ignore it and wait for a retransmission
# which we will handle by creating a new S2XPresenceHandler
if self.ended:
return
self._sip_subscriptions.append(subscription)
NotificationCenter().add_observer(self, sender=subscription)
if self._xmpp_subscription.state == 'active':
pidf_doc = self._pidf
content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None
try:
subscription.accept(content_type, pidf_doc)
- except SIPCoreError, e:
+ except SIPCoreError as e:
log.warning('Error accepting SIP subscription: %s' % e)
subscription.end()
else:
try:
subscription.accept_pending()
- except SIPCoreError, e:
+ except SIPCoreError as e:
log.warning('Error accepting SIP subscription: %s' % e)
subscription.end()
if XMPPGatewayConfig.log_presence:
log.info('SIP subscription from %s to %s added to presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _build_pidf(self):
if not self._stanza_cache:
self._pidf = None
return None
pidf_doc = pidf.PIDF(str(self.xmpp_identity))
uri = next(self._stanza_cache.iterkeys())
person = pidf.Person("PID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest())
person.activities = rpid.Activities()
pidf_doc.add(person)
for stanza in self._stanza_cache.itervalues():
if not stanza.available:
status = pidf.Status('closed')
status.extended = 'offline'
else:
status = pidf.Status('open')
if stanza.show == 'away':
status.extended = 'away'
if 'away' not in person.activities:
person.activities.add('away')
elif stanza.show == 'xa':
status.extended = 'away'
if 'away' not in person.activities:
person.activities.add('away')
elif stanza.show == 'dnd':
status.extended = 'busy'
if 'busy' not in person.activities:
person.activities.add('busy')
else:
status.extended = 'available'
if stanza.sender.uri.resource:
resource = encode_resource(stanza.sender.uri.resource)
else:
# Workaround for clients not sending the resource under certain (unknown) circumstances
resource = hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()
service_id = "SID-%s" % resource
sip_uri = stanza.sender.uri.as_sip_uri()
sip_uri.parameters['gr'] = resource
sip_uri.parameters['xmpp'] = None
contact = pidf.Contact(str(sip_uri))
service = pidf.Service(service_id, status=status, contact=contact)
service.add(pidf.DeviceID(resource))
service.device_info = pidf.DeviceInfo(resource, description=stanza.sender.uri.resource)
service.timestamp = pidf.ServiceTimestamp(stanza.timestamp)
service.capabilities = caps.ServiceCapabilities(text=True, message=True)
for lang, note in stanza.statuses.iteritems():
service.notes.add(pidf.PIDFNote(note, lang=lang))
pidf_doc.add(service)
if not person.activities:
person.activities = None
self._pidf = pidf_doc.toxml()
return self._pidf
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPIncomingSubscriptionDidEnd(self, notification):
subscription = notification.sender
notification.center.remove_observer(self, sender=subscription)
self._sip_subscriptions.remove(subscription)
if XMPPGatewayConfig.log_presence:
log.info('SIP subscription from %s to %s removed from presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
if not self._sip_subscriptions:
self.end()
def _NH_SIPIncomingSubscriptionNotifyDidFail(self, notification):
if XMPPGatewayConfig.log_presence:
log.info('Sending SIP NOTIFY failed from %s to %s for presence flow 0x%x: %s (%s)' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), notification.data.code, notification.data.reason))
def _NH_SIPIncomingSubscriptionGotUnsubscribe(self, notification):
if XMPPGatewayConfig.log_presence:
log.info('SIP subscription from %s to %s was terminated by user for presence flow 1x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _NH_SIPIncomingSubscriptionGotRefreshingSubscribe(self, notification):
if XMPPGatewayConfig.log_presence:
log.info('SIP subscription from %s to %s was refreshed for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _NH_SIPIncomingSubscriptionDidTimeout(self, notification):
if XMPPGatewayConfig.log_presence:
log.info('SIP subscription from %s to %s timed out for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _NH_XMPPSubscriptionChangedState(self, notification):
if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active':
pidf_doc = self._pidf
content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None
for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'):
subscription.accept(content_type, pidf_doc)
def _NH_XMPPSubscriptionGotNotify(self, notification):
stanza = notification.data.presence
self._stanza_cache[stanza.sender.uri] = stanza
stanza.timestamp = ISOTimestamp.now() # TODO: mirror the one in the stanza, if present
pidf_doc = self._build_pidf()
if XMPPGatewayConfig.log_presence:
log.info('XMPP notification from %s to %s for presence flow 0x%x' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self)))
for subscription in self._sip_subscriptions:
try:
subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc)
- except SIPCoreError, e:
+ except SIPCoreError as e:
if XMPPGatewayConfig.log_presence:
log.info('Failed to send SIP NOTIFY from %s to %s for presence flow 0x%x: %s' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), e))
if not stanza.available:
# Only inform once about this device being unavailable
del self._stanza_cache[stanza.sender.uri]
def _NH_XMPPSubscriptionDidFail(self, notification):
notification.center.remove_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription = None
self.end()
_NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail
class InterruptSubscription(Exception): pass
class TerminateSubscription(Exception): pass
class SubscriptionError(Exception):
def __init__(self, error, timeout, refresh_interval=None, fatal=False):
self.error = error
self.refresh_interval = refresh_interval
self.timeout = timeout
self.fatal = fatal
class SIPSubscriptionDidFail(Exception):
def __init__(self, data):
self.data = data
class X2SPresenceHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity):
self.ended = False
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._sip_subscription = None
self._sip_subscription_proc = None
self._sip_subscription_timer = None
self._xmpp_subscription = None
def start(self):
notification_center = NotificationCenter()
self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.start()
self._command_proc = proc.spawn(self._run)
self._subscribe_sip()
notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self)
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_subscription is not None:
notification_center.remove_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.end()
self._xmpp_subscription = None
if self._sip_subscription:
self._unsubscribe_sip()
self.ended = True
notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self)
@run_in_green_thread
def _subscribe_sip(self):
command = Command('subscribe')
self._command_channel.send(command)
@run_in_green_thread
def _unsubscribe_sip(self):
command = Command('unsubscribe')
self._command_channel.send(command)
command.wait()
self._command_proc.kill()
self._command_proc = None
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_subscribe(self, command):
if self._sip_subscription_timer is not None and self._sip_subscription_timer.active():
self._sip_subscription_timer.cancel()
self._sip_subscription_timer = None
if self._sip_subscription_proc is not None:
subscription_proc = self._sip_subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._sip_subscription_timer is not None and self._sip_subscription_timer.active():
self._sip_subscription_timer.cancel()
self._sip_subscription_timer = None
if self._sip_subscription_proc is not None:
subscription_proc = self._sip_subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._sip_subscription_proc = None
command.signal()
def _process_pidf(self, body):
try:
pidf_doc = pidf.PIDF.parse(body)
- except ParserError, e:
+ except ParserError as e:
log.warn('Error parsing PIDF document: %s' % e)
return
# Build XML stanzas out of PIDF documents
try:
person = next(p for p in pidf_doc.persons)
except StopIteration:
person = None
for service in pidf_doc.services:
sip_contact = self.sip_identity.uri.as_sip_uri()
if service.device_info is not None:
sip_contact.parameters['gr'] = 'urn:uuid:%s' % service.device_info.id
else:
sip_contact.parameters['gr'] = service.id
sender = Identity(FrozenURI.parse(sip_contact))
if service.status.extended is not None:
available = service.status.extended != 'offline'
else:
available = service.status.basic == 'open'
stanza = AvailabilityPresence(sender, self.xmpp_identity, available)
for note in service.notes:
stanza.statuses[note.lang] = note
if service.status.extended is not None:
if service.status.extended == 'away':
stanza.show = 'away'
elif service.status.extended == 'busy':
stanza.show = 'dnd'
elif person is not None and person.activities is not None:
activities = set(list(person.activities))
if 'away' in activities:
stanza.show = 'away'
elif {'holiday', 'vacation'}.intersection(activities):
stanza.show = 'xa'
elif 'busy' in activities:
stanza.show = 'dnd'
self._xmpp_subscription.send_presence(stanza)
def _sip_subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
account = DefaultAccount()
refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval
try:
# Lookup routes
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
- except DNSLookupError, e:
+ except DNSLookupError as e:
timeout = random.uniform(15, 30)
raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout)
timeout = time() + 30
for route in routes:
remaining_time = timeout - time()
if remaining_time > 0:
transport = route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
subscription_uri = self.sip_identity.uri.as_sip_uri()
subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()),
ToHeader(subscription_uri),
ContactHeader(contact_uri),
'presence',
RouteHeader(route.uri),
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=subscription)
raise SubscriptionError(error='Internal error', timeout=5)
self._sip_subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart':
break
- except SIPSubscriptionDidFail, e:
+ except SIPSubscriptionDidFail as e:
notification_center.remove_observer(self, sender=subscription)
self._sip_subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120))
elif e.data.code == 403:
# Forbidden
raise SubscriptionError(error='Forbidden', timeout=None, fatal=True)
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > refresh_interval:
interval = e.data.min_expires
else:
interval = None
raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval)
elif e.data.code in (405, 406, 489):
raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True)
elif e.data.code == 1400:
raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True)
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, give up
raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True)
# At this point it is subscribed. Handle notifications and ending/failures.
try:
while True:
notification = self._data_channel.wait()
if notification.sender is not self._sip_subscription:
continue
if self._xmpp_subscription is None:
continue
if notification.name == 'SIPSubscriptionGotNotify':
if notification.data.event == 'presence':
subscription_state = notification.data.headers.get('Subscription-State').state
if subscription_state == 'active' and self._xmpp_subscription.state != 'active':
self._xmpp_subscription.accept()
elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active':
# The state went from active to pending, hide the presence state?
pass
if notification.data.body:
if XMPPGatewayConfig.log_presence:
log.info('SIP NOTIFY from %s to %s' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp')))
self._process_pidf(notification.data.body)
elif notification.name == 'SIPSubscriptionDidEnd':
break
- except SIPSubscriptionDidFail, e:
+ except SIPSubscriptionDidFail as e:
if e.data.code == 0 and e.data.reason == 'rejected':
self._xmpp_subscription.reject()
else:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._sip_subscription)
- except InterruptSubscription, e:
+ except InterruptSubscription as e:
if not self.subscribed:
command.signal(e)
if self._sip_subscription is not None:
notification_center.remove_observer(self, sender=self._sip_subscription)
try:
self._sip_subscription.end(timeout=2)
except SIPCoreError:
pass
- except TerminateSubscription, e:
+ except TerminateSubscription as e:
if not self.subscribed:
command.signal(e)
if self._sip_subscription is not None:
try:
self._sip_subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._sip_subscription)
- except SubscriptionError, e:
+ except SubscriptionError as e:
if not e.fatal:
self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval))
finally:
self.subscribed = False
self._sip_subscription = None
self._sip_subscription_proc = None
reactor.callLater(0, self.end)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
self._data_channel.send(notification)
def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification):
self.end()
def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification):
if self._sip_subscription is not None and self._sip_subscription.state.lower() == 'active':
self._xmpp_subscription.accept()
_NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe
diff --git a/sylk/applications/xmppgateway/xmpp/jingle/session.py b/sylk/applications/xmppgateway/xmpp/jingle/session.py
index e16fe69..59b1181 100644
--- a/sylk/applications/xmppgateway/xmpp/jingle/session.py
+++ b/sylk/applications/xmppgateway/xmpp/jingle/session.py
@@ -1,806 +1,806 @@
import random
import string
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from cStringIO import StringIO
from datetime import datetime
from eventlib import api, coros, proc
from eventlib.twistedutil import block_on
from lxml import etree
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SDPSession, SDPMediaStream, SDPConnection, SDPNegotiator
from sipsimple.core import SIPCoreError
from sipsimple.threading import run_in_twisted_thread
from twisted.internet import reactor
from twisted.words.protocols.jabber.error import StanzaError
from twisted.words.protocols.jabber.xmlstream import TimeoutError as IqTimeoutError
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI
from sylk.applications.xmppgateway.xmpp.jingle.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError
from sylk.applications.xmppgateway.xmpp.jingle.util import jingle_to_sdp, sdp_to_jingle
from sylk.applications.xmppgateway.xmpp.stanzas import jingle
from sylk.configuration import SIPConfig
def random_id():
return ''.join(random.choice(string.ascii_letters+string.digits) for x in xrange(32))
class MediaStreamDidFailError(Exception):
def __init__(self, stream, data):
self.stream = stream
self.data = data
class MediaStreamDidNotInitializeError(Exception):
def __init__(self, stream, data):
self.stream = stream
self.data = data
class Operation(object):
__params__ = ()
def __init__(self, **params):
for name, value in params.iteritems():
setattr(self, name, value)
for param in set(self.__params__).difference(params):
raise ValueError("missing operation parameter: '%s'" % param)
self.channel = coros.queue()
class AcceptOperation(Operation):
__params__ = ('streams', 'is_focus')
class SendRingIndicationOperation(Operation):
__params__ = ()
class RejectOperation(Operation):
__params__ = ('reason',)
class EndOperation(Operation):
__params__ = ()
class HoldOperation(Operation):
__params__ = ()
class UnholdOperation(Operation):
__params__ = ()
class ProcessRemoteOperation(Operation):
__params__ = ('notification',)
class ConnectOperation(Operation):
__params__ = ('sender', 'recipient', 'streams', 'is_focus')
class SendConferenceInfoOperation(Operation):
__params__ = ('xml',)
class JingleSession(object):
implements(IObserver)
jingle_stanza_timeout = 3
media_stream_timeout = 15
def __init__(self, protocol):
self.account = DefaultAccount()
self._protocol = protocol
self._id = None
self._local_identity = None
self._remote_identity = None
self._local_jid = None
self._remote_jid = None
self._channel = coros.queue()
self._current_operation = None
self._proc = proc.spawn(self._run)
self._timer = None
self._sdp_negotiator = None
self._pending_transport_info_stanzas = []
self.direction = None
self.state = None
self.streams = None
self.proposed_streams = None
self.start_time = None
self.end_time = None
self.on_hold = False
self.local_focus = False
def init_incoming(self, stanza):
self._id = stanza.jingle.sid
self._local_identity = Identity(FrozenURI.parse(stanza.recipient))
self._remote_identity = Identity(FrozenURI.parse(stanza.sender))
self._local_jid = self._local_identity.uri.as_xmpp_jid()
self._remote_jid = self._remote_identity.uri.as_xmpp_jid()
remote_sdp = jingle_to_sdp(stanza.jingle)
try:
self._sdp_negotiator = SDPNegotiator.create_with_remote_offer(remote_sdp)
- except SIPCoreError, e:
+ except SIPCoreError as e:
self._fail(originator='local', reason='general-error', description=str(e))
return
self.proposed_streams = []
for index, media_stream in enumerate(remote_sdp.media):
if media_stream.port != 0:
for stream_type in MediaStreamRegistry:
try:
stream = stream_type.new_from_sdp(self, remote_sdp, index)
except InvalidStreamError:
break
except UnknownStreamError:
continue
else:
stream.index = index
self.proposed_streams.append(stream)
break
if self.proposed_streams:
self.direction = 'incoming'
self.state = 'incoming'
NotificationCenter().post_notification('JingleSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams))
else:
self._fail(originator='local', reason='unsupported-applications')
def connect(self, sender_identity, recipient_identity, streams, is_focus=False):
self._schedule_operation(ConnectOperation(sender=sender_identity, recipient=recipient_identity, streams=streams, is_focus=is_focus))
def send_ring_indication(self):
self._schedule_operation(SendRingIndicationOperation())
def accept(self, streams, is_focus=False):
self._schedule_operation(AcceptOperation(streams=streams, is_focus=is_focus))
def reject(self, reason='busy'):
self._schedule_operation(RejectOperation(reason=reason))
def hold(self):
self._schedule_operation(HoldOperation())
def unhold(self):
self._schedule_operation(UnholdOperation())
def end(self):
self._schedule_operation(EndOperation())
def add_stream(self):
raise NotImplementedError
def remove_stream(self):
raise NotImplementedError
@property
def id(self):
return self._id
@property
def local_identity(self):
return self._local_identity
@property
def remote_identity(self):
return self._remote_identity
@run_in_twisted_thread
def _send_conference_info(self, xml):
# This function is not meant for users to call, entities with knowledge about JingleSession
# internals will call it, such as the MediaSessionHandler
self._schedule_operation(SendConferenceInfoOperation(xml=xml))
def _send_stanza(self, stanza):
if self.direction == 'incoming':
stanza.jingle.initiator = unicode(self._remote_jid)
stanza.jingle.responder = unicode(self._local_jid)
else:
stanza.jingle.initiator = unicode(self._local_jid)
stanza.jingle.responder = unicode(self._remote_jid)
stanza.timeout = self.jingle_stanza_timeout
return self._protocol.request(stanza)
def _fail(self, originator='local', reason='general-error', description=None):
reason = jingle.Reason(jingle.ReasonType(reason), text=description)
stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason)
self._send_stanza(stanza)
self.state = 'terminated'
failure_str = '%s%s' % (reason, ' %s' % description if description else '')
NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=failure_str))
self._channel.send_exception(proc.ProcExit)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_RTPStreamDidEnableEncryption(self, notification):
if notification.sender.type != 'audio':
return
audio_stream = notification.sender
if audio_stream.encryption.type == 'ZRTP':
# start ZRTP on the video stream, if applicable
try:
video_stream = next(stream for stream in self.streams or [] if stream.type=='video')
except StopIteration:
return
if video_stream.encryption.type == 'ZRTP' and not video_stream.encryption.active:
video_stream.encryption.zrtp._enable(audio_stream)
def _NH_MediaStreamDidStart(self, notification):
stream = notification.sender
if stream.type == 'audio' and stream.encryption.type == 'ZRTP':
stream.encryption.zrtp._enable()
elif stream.type == 'video' and stream.encryption.type == 'ZRTP':
# start ZRTP on the video stream, if applicable
try:
audio_stream = next(stream for stream in self.streams or [] if stream.type=='audio')
except StopIteration:
pass
else:
if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active:
stream.encryption.zrtp._enable(audio_stream)
if self._current_operation is not None:
self._current_operation.channel.send(notification)
def _NH_MediaStreamDidInitialize(self, notification):
if self._current_operation is not None:
self._current_operation.channel.send(notification)
def _NH_MediaStreamDidNotInitialize(self, notification):
if self._current_operation is not None:
self._current_operation.channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data))
def _NH_MediaStreamDidFail(self, notification):
if self._current_operation is not None:
self._current_operation.channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data))
else:
self.end()
def _NH_XMPPGotJingleSessionAccept(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
def _NH_XMPPGotJingleSessionTerminate(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
def _NH_XMPPGotJingleSessionInfo(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
def _NH_XMPPGotJingleDescriptionInfo(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
def _NH_XMPPGotJingleTransportInfo(self, notification):
self._schedule_operation(ProcessRemoteOperation(notification=notification))
# Operation handling
@run_in_twisted_thread
def _schedule_operation(self, operation):
self._channel.send(operation)
def _run(self):
while True:
self._current_operation = op = self._channel.wait()
try:
handler = getattr(self, '_OH_%s' % op.__class__.__name__)
handler(op)
except BaseException:
self._proc = None
raise
finally:
self._current_operation = None
def _OH_AcceptOperation(self, operation):
if self.state != 'incoming':
return
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
streams = operation.streams
for stream in self.proposed_streams:
if stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
remote_sdp = self._sdp_negotiator.current_remote
local_ip = SIPConfig.local_ip.normalized
local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, media in enumerate(remote_sdp.media):
stream = stream_map.get(index, None)
if stream is not None:
media = stream.get_local_media(remote_sdp=remote_sdp, index=index)
else:
media = SDPMediaStream.new(media)
media.port = 0
media.attributes = []
local_sdp.media.append(media)
try:
self._sdp_negotiator.set_local_answer(local_sdp)
self._sdp_negotiator.negotiate()
- except SIPCoreError, e:
+ except SIPCoreError as e:
self._fail(originator='local', reason='incompatible-parameters', description=str(e))
return
self.local_focus = operation.is_focus
notification_center.post_notification('JingleSessionWillStart', sender=self)
# Get active SDPs (negotiator may make changes)
local_sdp = self._sdp_negotiator.active_local
remote_sdp = self._sdp_negotiator.active_remote
# Build the payload and send it over
payload = sdp_to_jingle(local_sdp)
payload.sid = self._id
if self.local_focus:
payload.conference_info = jingle.ConferenceInfo(True)
stanza = self._protocol.sessionAccept(self._local_jid, self._remote_jid, payload)
d = self._send_stanza(stanza)
block_on(d)
wait_count = 0
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map.get(index, None)
if stream is not None:
if remote_media.port:
wait_count += 1
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
with api.timeout(self.media_stream_timeout):
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
- except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError, IqTimeoutError, StanzaError), e:
+ except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError, IqTimeoutError, StanzaError) as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
elif isinstance(e, IqTimeoutError):
error = 'timeout sending IQ stanza'
elif isinstance(e, StanzaError):
error = str(e.condition)
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', reason='failed-application', description=error)
else:
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
notification_center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams))
def _OH_ConnectOperation(self, operation):
if self.state is not None:
return
settings = SIPSimpleSettings()
notification_center = NotificationCenter()
self.direction = 'outgoing'
self.state = 'connecting'
self.proposed_streams = operation.streams
self.local_focus = operation.is_focus
self._id = random_id()
self._local_identity = operation.sender
self._remote_identity = operation.recipient
self._local_jid = self._local_identity.uri.as_xmpp_jid()
self._remote_jid = self._remote_identity.uri.as_xmpp_jid()
notification_center.post_notification('JingleSessionNewOutgoing', self, NotificationData(streams=operation.streams))
for stream in self.proposed_streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
# Build local SDP and negotiator
local_ip = SIPConfig.local_ip.normalized
local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent)
for index, stream in enumerate(self.proposed_streams):
stream.index = index
media = stream.get_local_media(remote_sdp=None, index=index)
local_sdp.media.append(media)
self._sdp_negotiator = SDPNegotiator.create_with_local_offer(local_sdp)
# Build the payload and send it over
payload = sdp_to_jingle(local_sdp)
payload.sid = self._id
if self.local_focus:
payload.conference_info = jingle.ConferenceInfo(True)
stanza = self._protocol.sessionInitiate(self._local_jid, self._remote_jid, payload)
d = self._send_stanza(stanza)
block_on(d)
- except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, IqTimeoutError, StanzaError, SIPCoreError), e:
+ except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, IqTimeoutError, StanzaError, SIPCoreError) as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, IqTimeoutError):
error = 'timeout sending IQ stanza'
elif isinstance(e, StanzaError):
error = str(e.condition)
elif isinstance(e, SIPCoreError):
error = str(e)
else:
error = 'media stream failed: %s' % e.data.reason
self.state = 'terminated'
NotificationCenter().post_notification('JingleSessionDidFail', sender=self, data=NotificationData(originator='local', reason=error))
self._channel.send_exception(proc.ProcExit)
else:
self._timer = reactor.callLater(settings.sip.invite_timeout, self.end)
def _OH_RejectOperation(self, operation):
if self.state != 'incoming':
return
reason = jingle.Reason(jingle.ReasonType(operation.reason))
stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason)
self._send_stanza(stanza)
self.state = 'terminated'
self._channel.send_exception(proc.ProcExit)
def _OH_EndOperation(self, operation):
if self.state not in ('connecting', 'connected'):
return
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
prev_state = self.state
self.state = 'terminating'
notification_center = NotificationCenter()
notification_center.post_notification('JingleSessionWillEnd', self)
streams = (self.streams or []) + (self.proposed_streams or [])
for stream in streams[:]:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
streams.remove(stream)
else:
stream.deactivate()
if prev_state == 'connected':
reason = jingle.Reason(jingle.ReasonType('success'))
else:
reason = jingle.Reason(jingle.ReasonType('cancel'))
stanza = self._protocol.sessionTerminate(self._local_jid, self._remote_jid, self._id, reason)
self._send_stanza(stanza)
self.state = 'terminated'
if prev_state == 'connected':
self.end_time = datetime.now()
notification_center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='local'))
else:
notification_center.post_notification('JingleSessionDidFail', self, NotificationData(originator='local', reason='cancel'))
for stream in streams:
stream.end()
self._channel.send_exception(proc.ProcExit)
def _OH_SendRingIndicationOperation(self, operation):
if self.state != 'incoming':
return
stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('ringing'))
self._send_stanza(stanza)
def _OH_HoldOperation(self, operation):
if self.state != 'connected':
return
if self.on_hold:
return
self.on_hold = True
for stream in self.streams:
stream.hold()
stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('hold'))
self._send_stanza(stanza)
NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=False))
def _OH_UnholdOperation(self, operation):
if self.state != 'connected':
return
if not self.on_hold:
return
self.on_hold = False
for stream in self.streams:
stream.unhold()
stanza = self._protocol.sessionInfo(self._local_jid, self._remote_jid, self._id, jingle.Info('unhold'))
self._send_stanza(stanza)
NotificationCenter().post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False))
def _OH_SendConferenceInfoOperation(self, operation):
if self.state != 'connected':
return
if not self.local_focus:
return
tree = etree.parse(StringIO(operation.xml))
tree.getroot().attrib['sid'] = self._id # FIXME: non-standard, but Jitsi does it
data = etree.tostring(tree, xml_declaration=False) # Strip the XML heading
stanza = jingle.ConferenceInfoIq(sender=self._local_jid, recipient=self._remote_jid, payload=data)
stanza.timeout = self.jingle_stanza_timeout
self._protocol.request(stanza)
def _OH_ProcessRemoteOperation(self, operation):
notification = operation.notification
stanza = notification.data.stanza
if notification.name == 'XMPPGotJingleSessionTerminate':
if self.state not in ('incoming', 'connecting', 'connected_pending_accept', 'connected'):
return
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
# Session ended remotely
prev_state = self.state
self.state = 'terminated'
if prev_state == 'incoming':
reason = stanza.jingle.reason.value if stanza.jingle.reason else 'cancel'
notification.center.post_notification('JingleSessionDidFail', self, NotificationData(originator='remote', reason=reason))
else:
notification.center.post_notification('JingleSessionWillEnd', self, NotificationData(originator='remote'))
streams = self.proposed_streams if prev_state == 'connecting' else self.streams
for stream in streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.end_time = datetime.now()
notification.center.post_notification('JingleSessionDidEnd', self, NotificationData(originator='remote'))
self._channel.send_exception(proc.ProcExit)
elif notification.name == 'XMPPGotJingleSessionInfo':
info = stanza.jingle.info
if not info:
return
if info == 'ringing':
if self.state not in ('connecting', 'connected_pending_accept'):
return
notification.center.post_notification('JingleSessionGotRingIndication', self)
elif info in ('hold', 'unhold'):
if self.state != 'connected':
return
notification.center.post_notification('JingleSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=info=='hold', partial=False))
elif notification.name == 'XMPPGotJingleDescriptionInfo':
if self.state != 'connecting':
return
# Add candidates acquired on transport-info stanzas
for s in self._pending_transport_info_stanzas:
for c in s.jingle.content:
content = next(content for content in stanza.jingle.content if content.name == c.name)
content.transport.candidates.extend(c.transport.candidates)
if isinstance(content.transport, jingle.IceUdpTransport):
if not content.transport.ufrag and c.transport.ufrag:
content.transport.ufrag = c.transport.ufrag
if not content.transport.password and c.transport.password:
content.transport.password = c.transport.password
remote_sdp = jingle_to_sdp(stanza.jingle)
try:
self._sdp_negotiator.set_remote_answer(remote_sdp)
self._sdp_negotiator.negotiate()
except SIPCoreError:
# The description-info stanza may have been just a parameter change, not a full 'SDP'
return
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
del self._pending_transport_info_stanzas[:]
# Get active SDPs (negotiator may make changes)
local_sdp = self._sdp_negotiator.active_local
remote_sdp = self._sdp_negotiator.active_remote
notification.center.post_notification('JingleSessionWillStart', sender=self)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map[index]
if remote_media.port:
stream.start(local_sdp, remote_sdp, index)
else:
notification.center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification.center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
try:
with api.timeout(self.media_stream_timeout):
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
- except (MediaStreamDidFailError, api.TimeoutError), e:
+ except (MediaStreamDidFailError, api.TimeoutError) as e:
for stream in self.proposed_streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', reason='failed-application', description=error)
else:
self.state = 'connected_pending_accept'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
# Hold the streams to prevent real RTP from flowing
for stream in self.streams:
stream.hold()
elif notification.name == 'XMPPGotJingleSessionAccept':
if self.state not in ('connecting', 'connected_pending_accept'):
return
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
if self.state == 'connected_pending_accept':
# We already negotiated ICE and media is 'flowing' (not really because streams are on hold)
# unhold the streams and pretend the session just started
for stream in self.streams:
stream.unhold()
self.state = 'connected'
notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams))
return
# Add candidates acquired on transport-info stanzas
for s in self._pending_transport_info_stanzas:
for c in s.jingle.content:
content = next(content for content in stanza.jingle.content if content.name == c.name)
content.transport.candidates.extend(c.transport.candidates)
if isinstance(content.transport, jingle.IceUdpTransport):
if not content.transport.ufrag and c.transport.ufrag:
content.transport.ufrag = c.transport.ufrag
if not content.transport.password and c.transport.password:
content.transport.password = c.transport.password
del self._pending_transport_info_stanzas[:]
remote_sdp = jingle_to_sdp(stanza.jingle)
try:
self._sdp_negotiator.set_remote_answer(remote_sdp)
self._sdp_negotiator.negotiate()
- except SIPCoreError, e:
+ except SIPCoreError as e:
for stream in self.proposed_streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', reason='incompatible-parameters', description=str(e))
return
# Get active SDPs (negotiator may make changes)
local_sdp = self._sdp_negotiator.active_local
remote_sdp = self._sdp_negotiator.active_remote
notification.center.post_notification('JingleSessionWillStart', sender=self)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map[index]
if remote_media.port:
stream.start(local_sdp, remote_sdp, index)
else:
notification.center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification.center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
try:
with api.timeout(self.media_stream_timeout):
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = operation.channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
- except (MediaStreamDidFailError, api.TimeoutError), e:
+ except (MediaStreamDidFailError, api.TimeoutError) as e:
for stream in self.proposed_streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', reason='failed-application', description=error)
else:
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
notification.center.post_notification('JingleSessionDidStart', self, NotificationData(streams=self.streams))
elif notification.name == 'XMPPGotJingleTransportInfo':
if self.state != 'connecting':
# ICE trickling not supported yet, so only accept candidates before accept
return
self._pending_transport_info_stanzas.append(stanza)
class JingleSessionManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
self.sessions = {}
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, name='JingleSessionNewIncoming')
notification_center.add_observer(self, name='JingleSessionNewOutgoing')
notification_center.add_observer(self, name='JingleSessionDidFail')
notification_center.add_observer(self, name='JingleSessionDidEnd')
def stop(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='JingleSessionNewIncoming')
notification_center.remove_observer(self, name='JingleSessionNewOutgoing')
notification_center.remove_observer(self, name='JingleSessionDidFail')
notification_center.remove_observer(self, name='JingleSessionDidEnd')
def handle_notification(self, notification):
if notification.name in ('JingleSessionNewIncoming', 'JingleSessionNewOutgoing'):
session = notification.sender
self.sessions[session.id] = session
elif notification.name in ('JingleSessionDidFail', 'JingleSessionDidEnd'):
session = notification.sender
del self.sessions[session.id]
diff --git a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py
index 1cdd6cc..f66fb7d 100644
--- a/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py
+++ b/sylk/applications/xmppgateway/xmpp/jingle/streams/rtp.py
@@ -1,439 +1,439 @@
"""
Handling of RTP media streams according to RFC3550, RFC3605, RFC3581,
RFC2833 and RFC3711, RFC3489 and draft-ietf-mmusic-ice-19.
"""
__all__ = ['AudioStream']
from threading import RLock
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from zope.interface import implements
from sipsimple.audio import AudioBridge, AudioDevice, IAudioPort
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import AudioTransport, PJSIPError, RTPTransport, SIPCoreError
from sipsimple.streams.rtp import RTPStreamEncryption
from sylk.applications.xmppgateway.xmpp.jingle.streams import IMediaStream, InvalidStreamError, MediaStreamRegistrar, UnknownStreamError
class AudioStream(object):
__metaclass__ = MediaStreamRegistrar
implements(IMediaStream, IAudioPort, IObserver)
type = 'audio'
priority = 1
hold_supported = True
def __init__(self):
from sipsimple.application import SIPApplication
self.mixer = SIPApplication.voice_audio_mixer
self.bridge = AudioBridge(self.mixer)
self.device = AudioDevice(self.mixer)
self.notification_center = NotificationCenter()
self.on_hold_by_local = False
self.on_hold_by_remote = False
self.direction = None
self.state = 'NULL'
self._transport = None
self._hold_request = None
self._ice_state = 'NULL'
self._lock = RLock()
self._rtp_transport = None
self.session = None
self.encryption = RTPStreamEncryption(self)
self._srtp_encryption = None
self._try_ice = False
self._initialized = False
self._done = False
self._failure_reason = None
self.bridge.add(self.device)
# Audio properties
#
@property
def codec(self):
return self._transport.codec if self._transport else None
@property
def consumer_slot(self):
return self._transport.slot if self._transport else None
@property
def producer_slot(self):
return self._transport.slot if self._transport and not self.muted else None
@property
def sample_rate(self):
return self._transport.sample_rate if self._transport else None
@property
def statistics(self):
return self._transport.statistics if self._transport else None
def _get_muted(self):
return self.__dict__.get('muted', False)
def _set_muted(self, value):
if not isinstance(value, bool):
raise ValueError('illegal value for muted property: %r' % (value,))
if value == self.muted:
return
old_producer_slot = self.producer_slot
self.__dict__['muted'] = value
notification_center = NotificationCenter()
data = NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot)
notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=data)
muted = property(_get_muted, _set_muted)
del _get_muted, _set_muted
# RTP properties
#
@property
def local_rtp_address(self):
return self._rtp_transport.local_rtp_address if self._rtp_transport else None
@property
def local_rtp_port(self):
return self._rtp_transport.local_rtp_port if self._rtp_transport else None
@property
def remote_rtp_address(self):
if self._ice_state == 'IN_USE':
return self._rtp_transport.remote_rtp_address_received if self._rtp_transport else None
else:
return self._rtp_transport.remote_rtp_address_sdp if self._rtp_transport else None
@property
def remote_rtp_port(self):
if self._ice_state == 'IN_USE':
return self._rtp_transport.remote_rtp_port_received if self._rtp_transport else None
else:
return self._rtp_transport.remote_rtp_port_sdp if self._rtp_transport else None
@property
def local_rtp_candidate_type(self):
return self._rtp_transport.local_rtp_candidate_type if self._rtp_transport else None
@property
def remote_rtp_candidate_type(self):
return self._rtp_transport.remote_rtp_candidate_type if self._rtp_transport else None
@property
def ice_active(self):
return self._ice_state == 'IN_USE'
# Generic properties
#
@property
def on_hold(self):
return self.on_hold_by_local or self.on_hold_by_remote
# Public methods
#
@classmethod
def new_from_sdp(cls, session, remote_sdp, stream_index):
# TODO: actually validate the SDP
settings = SIPSimpleSettings()
remote_stream = remote_sdp.media[stream_index]
if remote_stream.media != 'audio':
raise UnknownStreamError
if remote_stream.transport not in ('RTP/AVP', 'RTP/SAVP'):
raise InvalidStreamError('expected RTP/AVP or RTP/SAVP transport in audio stream, got %s' % remote_stream.transport)
local_encryption_policy = 'sdes_optional'
if local_encryption_policy == 'sdes_mandatory' and not 'crypto' in remote_stream.attributes:
raise InvalidStreamError("SRTP/SDES is locally mandatory but it's not remotely enabled")
if remote_stream.transport == 'RTP/SAVP' and 'crypto' in remote_stream.attributes and local_encryption_policy not in ('opportunistic', 'sdes_optional', 'sdes_mandatory'):
raise InvalidStreamError("SRTP/SDES is remotely mandatory but it's not locally enabled")
supported_codecs = session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list
if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs):
raise InvalidStreamError('no compatible codecs found')
stream = cls()
stream._incoming_remote_sdp = remote_sdp
stream._incoming_stream_index = stream_index
if 'zrtp-hash' in remote_stream.attributes:
stream._incoming_stream_encryption = 'zrtp'
elif 'crypto' in remote_stream.attributes:
stream._incoming_stream_encryption = 'sdes_mandatory' if remote_stream.transport == 'RTP/SAVP' else 'sdes_optional'
else:
stream._incoming_stream_encryption = None
return stream
def initialize(self, session, direction):
with self._lock:
if self.state != 'NULL':
raise RuntimeError('AudioStream.initialize() may only be called in the NULL state')
self.state = 'INITIALIZING'
self.session = session
local_encryption_policy = 'sdes_optional'
if hasattr(self, '_incoming_remote_sdp'):
# ICE attributes could come at the session level or at the media level
remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index]
self._try_ice = (remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates
if self._incoming_stream_encryption is not None and local_encryption_policy == 'opportunistic':
self._srtp_encryption = self._incoming_stream_encryption
else:
self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy
del self._incoming_stream_encryption
else:
self._try_ice = True
self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy
self._init_rtp_transport()
def get_local_media(self, remote_sdp=None, index=0):
with self._lock:
if self.state not in ['INITIALIZED', 'WAIT_ICE', 'ESTABLISHED']:
raise RuntimeError('AudioStream.get_local_media() may only be called in the INITIALIZED, WAIT_ICE or ESTABLISHED states')
if remote_sdp is None:
# offer
old_direction = self._transport.direction
if old_direction is None:
new_direction = 'sendrecv'
elif 'send' in old_direction:
new_direction = ('sendonly' if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else 'sendrecv')
else:
new_direction = ('inactive' if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else 'recvonly')
else:
new_direction = None
return self._transport.get_local_media(remote_sdp, index, new_direction)
def start(self, local_sdp, remote_sdp, stream_index):
with self._lock:
if self.state != 'INITIALIZED':
raise RuntimeError('AudioStream.start() may only be called in the INITIALIZED state')
settings = SIPSimpleSettings()
self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout)
self._check_hold(self._transport.direction, True)
if self._try_ice:
self.state = 'WAIT_ICE'
else:
self.state = 'ESTABLISHED'
self.notification_center.post_notification('MediaStreamDidStart', sender=self)
def validate_update(self, remote_sdp, stream_index):
with self._lock:
# TODO: implement
return True
def update(self, local_sdp, remote_sdp, stream_index):
with self._lock:
connection = remote_sdp.media[stream_index].connection or remote_sdp.connection
if not self._rtp_transport.ice_active and (connection.address != self._rtp_transport.remote_rtp_address_sdp or self._rtp_transport.remote_rtp_port_sdp != remote_sdp.media[stream_index].port):
settings = SIPSimpleSettings()
old_consumer_slot = self.consumer_slot
old_producer_slot = self.producer_slot
self.notification_center.remove_observer(self, sender=self._transport)
self._transport.stop()
try:
self._transport = AudioTransport(self.mixer, self._rtp_transport, remote_sdp, stream_index, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list))
- except SIPCoreError, e:
+ except SIPCoreError as e:
self.state = 'ENDED'
self._failure_reason = e.args[0]
self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(reason=self._failure_reason))
return
self.notification_center.add_observer(self, sender=self._transport)
self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout)
self.notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=True,
old_consumer_slot=old_consumer_slot, new_consumer_slot=self.consumer_slot,
old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot))
if connection.address == '0.0.0.0' and remote_sdp.media[stream_index].direction == 'sendrecv':
self._transport.update_direction('recvonly')
self._check_hold(self._transport.direction, False)
self.notification_center.post_notification('RTPStreamDidChangeRTPParameters', sender=self)
else:
new_direction = local_sdp.media[stream_index].direction
self._transport.update_direction(new_direction)
self._check_hold(new_direction, False)
self._hold_request = None
def hold(self):
with self._lock:
if self.on_hold_by_local or self._hold_request == 'hold':
return
if self.state == 'ESTABLISHED' and self.direction != 'inactive':
self.bridge.remove(self)
self._hold_request = 'hold'
def unhold(self):
with self._lock:
if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold':
return
if self.state == 'ESTABLISHED' and self._hold_request == 'hold':
self.bridge.add(self)
self._hold_request = None if self._hold_request == 'hold' else 'unhold'
def deactivate(self):
with self._lock:
self.bridge.stop()
def end(self):
with self._lock:
if not self._initialized or self._done:
return
self._done = True
self.notification_center.post_notification('MediaStreamWillEnd', sender=self)
if self._transport is not None:
self._transport.stop()
self.notification_center.remove_observer(self, sender=self._transport)
self._transport = None
self.notification_center.remove_observer(self, sender=self._rtp_transport)
self._rtp_transport = None
self.state = 'ENDED'
self.notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason))
self.session = None
def reset(self, stream_index):
with self._lock:
if self.direction == 'inactive' and not self.on_hold_by_local:
new_direction = 'sendrecv'
self._transport.update_direction(new_direction)
self._check_hold(new_direction, False)
# TODO: do a full reset, re-creating the AudioTransport, so that a new offer
# would contain all codecs and ICE would be renegotiated -Saul
def send_dtmf(self, digit):
with self._lock:
if self.state != 'ESTABLISHED':
raise RuntimeError('AudioStream.send_dtmf() cannot be used in %s state' % self.state)
try:
self._transport.send_dtmf(digit)
- except PJSIPError, e:
+ except PJSIPError as e:
if not e.args[0].endswith('(PJ_ETOOMANY)'):
raise
# Notification handling
#
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_RTPTransportDidFail(self, notification):
with self._lock:
self.notification_center.remove_observer(self, sender=notification.sender)
if self.state == 'ENDED':
return
self._try_next_rtp_transport(notification.data.reason)
def _NH_RTPTransportDidInitialize(self, notification):
settings = SIPSimpleSettings()
rtp_transport = notification.sender
with self._lock:
if self.state == 'ENDED':
return
del self._rtp_args
del self._stun_servers
try:
if hasattr(self, '_incoming_remote_sdp'):
try:
audio_transport = AudioTransport(self.mixer, rtp_transport, self._incoming_remote_sdp, self._incoming_stream_index,
codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list))
finally:
del self._incoming_remote_sdp
del self._incoming_stream_index
else:
audio_transport = AudioTransport(self.mixer, rtp_transport, codecs=list(self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list))
- except SIPCoreError, e:
+ except SIPCoreError as e:
self.state = "ENDED"
self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=e.args[0]))
return
self._rtp_transport = rtp_transport
self._transport = audio_transport
self.notification_center.add_observer(self, sender=audio_transport)
self._initialized = True
self.state = 'INITIALIZED'
self.notification_center.post_notification('MediaStreamDidInitialize', sender=self)
def _NH_RTPAudioStreamGotDTMF(self, notification):
self.notification_center.post_notification('AudioStreamGotDTMF', sender=self, data=NotificationData(digit=notification.data.digit))
def _NH_RTPAudioTransportDidTimeout(self, notification):
self.notification_center.post_notification('RTPStreamDidTimeout', sender=self)
def _NH_RTPTransportICENegotiationStateDidChange(self, notification):
with self._lock:
if self._ice_state != 'NULL' or self.state not in ('INITIALIZING', 'INITIALIZED', 'WAIT_ICE'):
return
self.notification_center.post_notification('RTPStreamICENegotiationStateDidChange', sender=self, data=notification.data)
def _NH_RTPTransportICENegotiationDidSucceed(self, notification):
with self._lock:
if self.state != 'WAIT_ICE':
return
self._ice_state = 'IN_USE'
self.state = 'ESTABLISHED'
self.notification_center.post_notification('RTPStreamICENegotiationDidSucceed', sender=self, data=notification.data)
self.notification_center.post_notification('MediaStreamDidStart', sender=self)
def _NH_RTPTransportICENegotiationDidFail(self, notification):
with self._lock:
if self.state != 'WAIT_ICE':
return
self._ice_state = 'FAILED'
self.state = 'ESTABLISHED'
self.notification_center.post_notification('RTPStreamICENegotiationDidFail', sender=self, data=notification.data)
self.notification_center.post_notification('MediaStreamDidStart', sender=self)
# Private methods
#
def _init_rtp_transport(self, stun_servers=None):
self._rtp_args = dict()
self._rtp_args['encryption'] = self._srtp_encryption
self._rtp_args['use_ice'] = self._try_ice
self._stun_servers = [(None, None)]
if stun_servers:
self._stun_servers.extend(reversed(stun_servers))
self._try_next_rtp_transport()
def _try_next_rtp_transport(self, failure_reason=None):
if self._stun_servers:
stun_address, stun_port = self._stun_servers.pop()
rtp_transport = None
try:
rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args)
self.notification_center.add_observer(self, sender=rtp_transport)
rtp_transport.set_INIT()
- except SIPCoreError, e:
+ except SIPCoreError as e:
if rtp_transport is not None:
self.notification_center.remove_observer(self, sender=rtp_transport)
self._try_next_rtp_transport(e.args[0])
else:
self.state = 'ENDED'
self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=failure_reason))
def _check_hold(self, direction, is_initial):
was_on_hold_by_local = self.on_hold_by_local
was_on_hold_by_remote = self.on_hold_by_remote
was_inactive = self.direction == 'inactive'
self.direction = direction
inactive = self.direction == 'inactive'
self.on_hold_by_local = was_on_hold_by_local if inactive else direction == 'sendonly'
self.on_hold_by_remote = 'send' not in direction
if (is_initial or was_on_hold_by_local or was_inactive) and not inactive and not self.on_hold_by_local and self._hold_request != 'hold':
self.bridge.add(self)
if not was_on_hold_by_local and self.on_hold_by_local:
self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='local', on_hold=True))
if was_on_hold_by_local and not self.on_hold_by_local:
self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='local', on_hold=False))
if not was_on_hold_by_remote and self.on_hold_by_remote:
self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='remote', on_hold=True))
if was_on_hold_by_remote and not self.on_hold_by_remote:
self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator='remote', on_hold=False))
diff --git a/sylk/bonjour.py b/sylk/bonjour.py
index b7918c8..4e948cb 100644
--- a/sylk/bonjour.py
+++ b/sylk/bonjour.py
@@ -1,256 +1,256 @@
import uuid
from application import log
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from eventlib import api, coros, proc
from eventlib.green import select
from sipsimple.account.bonjour import _bonjour, BonjourPresenceState, BonjourRegistrationFile
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.threading import call_in_twisted_thread, run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
from threading import Lock
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
class RestartSelect(Exception): pass
class BonjourService(object):
implements(IObserver)
def __init__(self, service='sipfocus', name='SylkServer', uri_user=None, is_focus=True):
self.account = DefaultAccount()
self.service = service
self.name = name
self.uri_user = uri_user
self.is_focus = is_focus
self.id = str(uuid.uuid4())
self._stopped = True
self._files = []
self._command_channel = coros.queue()
self._select_proc = None
self._register_timer = None
self._update_timer = None
self._lock = Lock()
self.__dict__['presence_state'] = None
@run_in_green_thread
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._select_proc = proc.spawn(self._process_files)
proc.spawn(self._handle_commands)
self._activate()
@run_in_green_thread
def stop(self):
self._deactivate()
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self._select_proc.kill()
self._command_channel.send_exception(api.GreenletExit)
def _activate(self):
self._stopped = False
self._command_channel.send(Command('register'))
def _deactivate(self):
command = Command('stop')
self._command_channel.send(command)
command.wait()
self._stopped = True
def restart_registration(self):
self._command_channel.send(Command('unregister'))
self._command_channel.send(Command('register'))
def update_registrations(self):
self._command_channel.send(Command('update_registrations'))
def _get_presence_state(self):
return self.__dict__['presence_state']
def _set_presence_state(self, state):
if state is not None and not isinstance(state, BonjourPresenceState):
raise ValueError("state must be a %s instance or None" % BonjourPresenceState.__name__)
with self._lock:
old_state = self.__dict__['presence_state']
self.__dict__['presence_state'] = state
if state != old_state:
call_in_twisted_thread(self.update_registrations)
presence_state = property(_get_presence_state, _set_presence_state)
del _get_presence_state, _set_presence_state
def _register_cb(self, file, flags, error_code, name, regtype, domain):
notification_center = NotificationCenter()
file = BonjourRegistrationFile.find_by_file(file)
if error_code == _bonjour.kDNSServiceErr_NoError:
notification_center.post_notification('BonjourServiceRegistrationDidSucceed', sender=self,
data=NotificationData(name=name, transport=file.transport))
else:
error = _bonjour.BonjourError(error_code)
notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self,
data=NotificationData(reason=str(error), transport=file.transport))
self._files.remove(file)
self._select_proc.kill(RestartSelect)
file.close()
if self._register_timer is None:
self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register'))
def _process_files(self):
while True:
try:
ready = select.select([f for f in self._files if not f.active and not f.closed], [], [])[0]
except RestartSelect:
continue
else:
for file in ready:
file.active = True
self._command_channel.send(Command('process_results', files=[f for f in ready if not f.closed]))
def _handle_commands(self):
while True:
command = self._command_channel.wait()
if not self._stopped:
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_unregister(self, command):
if self._register_timer is not None and self._register_timer.active():
self._register_timer.cancel()
self._register_timer = None
if self._update_timer is not None and self._update_timer.active():
self._update_timer.cancel()
self._update_timer = None
old_files = []
for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile)):
old_files.append(file)
self._files.remove(file)
self._select_proc.kill(RestartSelect)
for file in old_files:
file.close()
notification_center = NotificationCenter()
for transport in set(file.transport for file in self._files):
notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport))
command.signal()
def _CH_register(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._register_timer is not None and self._register_timer.active():
self._register_timer.cancel()
self._register_timer = None
supported_transports = set(transport for transport in settings.sip.transport_list if transport!='tls' or self.account.tls.certificate is not None)
registered_transports = set(file.transport for file in self._files if isinstance(file, BonjourRegistrationFile))
missing_transports = supported_transports - registered_transports
added_transports = set()
for transport in missing_transports:
notification_center.post_notification('BonjourServiceWillRegister', sender=self, data=NotificationData(transport=transport))
try:
contact_uri = self.account.contact[transport]
contact_uri.user = self.uri_user
if self.is_focus:
contact_uri.parameters['isfocus'] = None
txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri), instance_id=self.id)
state = self.presence_state
if state is not None:
txtdata['state'] = state.state
txtdata['note'] = state.note.encode('utf-8')
file = _bonjour.DNSServiceRegister(name=str(contact_uri),
regtype="_%s._%s" % (self.service, transport if transport == 'udp' else 'tcp'),
port=contact_uri.port,
callBack=self._register_cb,
txtRecord=_bonjour.TXTRecord(items=txtdata))
- except (_bonjour.BonjourError, KeyError), e:
+ except (_bonjour.BonjourError, KeyError) as e:
notification_center.post_notification('BonjourServiceRegistrationDidFail', sender=self,
data=NotificationData(reason=str(e), transport=transport))
else:
self._files.append(BonjourRegistrationFile(file, transport))
added_transports.add(transport)
if added_transports:
self._select_proc.kill(RestartSelect)
if added_transports != missing_transports:
self._register_timer = reactor.callLater(1, self._command_channel.send, Command('register', command.event))
else:
command.signal()
def _CH_update_registrations(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
if self._update_timer is not None and self._update_timer.active():
self._update_timer.cancel()
self._update_timer = None
available_transports = settings.sip.transport_list
old_files = []
for file in (f for f in self._files[:] if isinstance(f, BonjourRegistrationFile) and f.transport not in available_transports):
old_files.append(file)
self._files.remove(file)
self._select_proc.kill(RestartSelect)
for file in old_files:
file.close()
update_failure = False
for file in (f for f in self._files if isinstance(f, BonjourRegistrationFile)):
try:
contact_uri = self.account.contact[file.transport]
contact_uri.user = self.uri_user
if self.is_focus:
contact_uri.parameters['isfocus'] = None
txtdata = dict(txtvers=1, name=self.name, contact="<%s>" % str(contact_uri), instance_id=self.id)
state = self.presence_state
if state is not None:
txtdata['state'] = state.state
txtdata['note'] = state.note.encode('utf-8')
_bonjour.DNSServiceUpdateRecord(file.file, None, flags=0, rdata=_bonjour.TXTRecord(items=txtdata), ttl=0)
- except (_bonjour.BonjourError, KeyError), e:
+ except (_bonjour.BonjourError, KeyError) as e:
notification_center.post_notification('BonjourServiceRegistrationUpdateDidFail', sender=self,
data=NotificationData(reason=str(e), transport=file.transport))
update_failure = True
self._command_channel.send(Command('register'))
if update_failure:
self._update_timer = reactor.callLater(1, self._command_channel.send, Command('update_registrations', command.event))
else:
command.signal()
def _CH_process_results(self, command):
for file in (f for f in command.files if not f.closed):
try:
_bonjour.DNSServiceProcessResult(file.file)
except:
# Should we close the file? The documentation doesn't say anything about this. -Luci
log.exception()
for file in command.files:
file.active = False
self._files = [f for f in self._files if not f.closed]
self._select_proc.kill(RestartSelect)
def _CH_stop(self, command):
if self._register_timer is not None and self._register_timer.active():
self._register_timer.cancel()
self._register_timer = None
if self._update_timer is not None and self._update_timer.active():
self._update_timer.cancel()
self._update_timer = None
old_files = self._files
self._files = []
self._select_proc.kill(RestartSelect)
for file in old_files:
file.close()
notification_center = NotificationCenter()
for transport in set(file.transport for file in self._files):
notification_center.post_notification('BonjourServiceRegistrationDidEnd', sender=self, data=NotificationData(transport=transport))
command.signal()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_NetworkConditionsDidChange(self, notification):
if self._files:
self.restart_registration()
diff --git a/sylk/server.py b/sylk/server.py
index 4a6cf6f..69583bc 100644
--- a/sylk/server.py
+++ b/sylk/server.py
@@ -1,256 +1,256 @@
import os
import sys
from threading import Event
from uuid import uuid4
from application import log
from application.notification import NotificationCenter
from application.python import Null
from application.system import makedirs
from eventlib import proc
from sipsimple.account import Account, BonjourAccount, AccountManager
from sipsimple.application import SIPApplication
from sipsimple.audio import AudioDevice, RootAudioBridge
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import AudioMixer
from sipsimple.lookup import DNSManager
from sipsimple.storage import MemoryStorage
from sipsimple.threading import ThreadManager
from sipsimple.threading.green import run_in_green_thread
from sipsimple.video import VideoDevice
from twisted.internet import reactor
# Load stream extensions needed for integration with SIP SIMPLE SDK
import sylk.streams
del sylk.streams
from sylk.accounts import DefaultAccount
from sylk.applications import IncomingRequestHandler
from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig
from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension
from sylk.log import TraceLogManager
from sylk.session import SessionManager
from sylk.web import WebServer
class SylkServer(SIPApplication):
def __init__(self):
self.request_handler = Null
self.thor_interface = Null
self.web_server = Null
self.options = Null
self.stopping_event = Event()
self.stop_event = Event()
self.failed = False
def start(self, options):
self.options = options
if self.options.enable_bonjour:
ServerConfig.enable_bonjour = True
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.add_observer(self, name='ThorNetworkGotFatalError')
Account.register_extension(AccountExtension)
BonjourAccount.register_extension(BonjourAccountExtension)
SIPSimpleSettings.register_extension(SylkServerSettingsExtension)
try:
super(SylkServer, self).start(MemoryStorage())
- except Exception, e:
+ except Exception as e:
log.fatal('Error starting SIP Application: %s' % e)
sys.exit(1)
def _initialize_core(self):
# SylkServer needs to listen for extra events and request types
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
# initialize core
options = dict(# general
ip_address=SIPConfig.local_ip,
user_agent=settings.user_agent,
# SIP
detect_sip_loops=False,
udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None,
tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None,
tls_port=None,
# TLS
tls_verify_server=False,
tls_ca_file=None,
tls_cert_file=None,
tls_privkey_file=None,
# rtp
rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end),
# audio
codecs=list(settings.rtp.audio_codec_list),
# video
video_codecs=list(settings.rtp.video_codec_list),
enable_colorbar_device=True,
# logging
log_level=settings.logs.pjsip_level if settings.logs.trace_pjsip else 0,
trace_sip=settings.logs.trace_sip,
# events and requests to handle
events={'conference': ['application/conference-info+xml'],
'presence': ['application/pidf+xml'],
'refer': ['message/sipfrag;version=2.0']},
incoming_events={'conference', 'presence'},
incoming_requests={'MESSAGE'})
notification_center.add_observer(self, sender=self.engine)
self.engine.start(**options)
@run_in_green_thread
def _initialize_subsystems(self):
notification_center = NotificationCenter()
with self._lock:
stop_pending = self._stop_pending
if stop_pending:
self.state = 'stopping'
if stop_pending:
notification_center.post_notification('SIPApplicationWillEnd', sender=self)
reactor.stop()
return
account_manager = AccountManager()
dns_manager = DNSManager()
session_manager = SessionManager()
settings = SIPSimpleSettings()
# Initialize default account
default_account = DefaultAccount()
account_manager.default_account = default_account
# initialize TLS
self._initialize_tls()
# initialize PJSIP internal resolver
self.engine.set_nameservers(dns_manager.nameservers)
# initialize audio objects
voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999)
self.voice_audio_device = AudioDevice(voice_mixer)
self.voice_audio_bridge = RootAudioBridge(voice_mixer)
self.voice_audio_bridge.add(self.voice_audio_device)
# initialize video objects
self.video_device = VideoDevice(u'Colorbar generator', settings.video.resolution, settings.video.framerate)
# initialize instance id
settings.instance_id = uuid4().urn
settings.save()
# initialize ZRTP cache
makedirs(ServerConfig.spool_dir.normalized)
self.engine.zrtp_cache = os.path.join(ServerConfig.spool_dir.normalized, 'zrtp.db')
# initialize middleware components
dns_manager.start()
account_manager.start()
session_manager.start()
notification_center.add_observer(self, name='CFGSettingsObjectDidChange')
self.state = 'started'
notification_center.post_notification('SIPApplicationDidStart', sender=self)
# start SylkServer components
self.web_server = WebServer()
self.web_server.start()
self.request_handler = IncomingRequestHandler()
self.request_handler.start()
if ThorNodeConfig.enabled:
from sylk.interfaces.sipthor import ConferenceNode
self.thor_interface = ConferenceNode()
thor_roles = []
if 'conference' in self.request_handler.application_registry:
thor_roles.append('conference_server')
if 'xmppgateway' in self.request_handler.application_registry:
thor_roles.append('xmpp_gateway')
if 'webrtcgateway' in self.request_handler.application_registry:
thor_roles.append('webrtc_gateway')
self.thor_interface.start(thor_roles)
@run_in_green_thread
def _shutdown_subsystems(self):
dns_manager = DNSManager()
account_manager = AccountManager()
session_manager = SessionManager()
# terminate all sessions
p = proc.spawn(session_manager.stop)
p.wait()
# shutdown SylkServer components
procs = [proc.spawn(self.web_server.stop), proc.spawn(self.request_handler.stop), proc.spawn(self.thor_interface.stop)]
proc.waitall(procs)
# shutdown other middleware components
procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop)]
proc.waitall(procs)
# shutdown engine
self.engine.stop()
self.engine.join(timeout=5)
# stop threads
thread_manager = ThreadManager()
thread_manager.stop()
# stop the reactor
reactor.stop()
def _NH_AudioDevicesDidChange(self, notification):
pass
def _NH_DefaultAudioDeviceDidChange(self, notification):
pass
def _NH_SIPApplicationFailedToStartTLS(self, notification):
log.fatal('Could not set TLS options: %s' % notification.data.error)
sys.exit(1)
def _NH_SIPApplicationWillStart(self, notification):
tracelog_manager = TraceLogManager()
tracelog_manager.start()
def _NH_SIPApplicationDidStart(self, notification):
settings = SIPSimpleSettings()
local_ip = SIPConfig.local_ip
log.info('SylkServer started, listening on:')
for transport in settings.sip.transport_list:
try:
log.info(' %s:%d (%s)' % (local_ip, getattr(self.engine, '%s_port' % transport), transport.upper()))
except TypeError:
pass
def _NH_SIPApplicationWillEnd(self, notification):
self.stopping_event.set()
def _NH_SIPApplicationDidEnd(self, notification):
log.info('SIP application ended')
tracelog_manager = TraceLogManager()
tracelog_manager.stop()
if not self.stopping_event.is_set():
log.warning('SIP application ended without shutting down all subsystems')
self.stopping_event.set()
self.stop_event.set()
def _NH_SIPApplicationGotFatalError(self, notification):
log.error('An exception occurred within the SIP core:\n%s\n' % notification.data.traceback)
self.failed = True
def _NH_SIPEngineDidFail(self, notification):
log.error('SIP engine failed')
self.failed = True
super(SylkServer, self)._NH_SIPEngineDidFail(notification)
def _NH_ThorNetworkGotFatalError(self, notification):
log.error("All Thor Event Servers have unrecoverable errors.")
diff --git a/sylk/session.py b/sylk/session.py
index c336447..0f52841 100644
--- a/sylk/session.py
+++ b/sylk/session.py
@@ -1,1978 +1,1978 @@
import random
from threading import RLock
from time import time
from application import log
from application.notification import IObserver, Notification, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.decorator import decorator, preserve_signature
from application.python.types import Singleton
from application.system import host
from eventlib import api, coros, proc
from sipsimple.account import AccountManager
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, Invitation, Subscription, SIPCoreError, SIPCoreInvalidStateError, PJSIPError, sip_status_messages
from sipsimple.core import ContactHeader, RouteHeader, FromHeader, ToHeader, ReasonHeader, WarningHeader
from sipsimple.core import SIPURI, SDPConnection, SDPSession, SDPMediaStream
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import ParserError
from sipsimple.payloads.conference import ConferenceDocument
from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
from sipsimple.util import ISOTimestamp
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.configuration import SIPConfig
class InvitationDisconnectedError(Exception):
def __init__(self, invitation, data):
self.invitation = invitation
self.data = data
class MediaStreamDidNotInitializeError(Exception):
def __init__(self, stream, data):
self.stream = stream
self.data = data
class MediaStreamDidFailError(Exception):
def __init__(self, stream, data):
self.stream = stream
self.data = data
class SubscriptionError(Exception):
def __init__(self, error, timeout, **attributes):
self.error = error
self.timeout = timeout
self.attributes = attributes
class SIPSubscriptionDidFail(Exception):
def __init__(self, data):
self.data = data
class InterruptSubscription(Exception):
pass
class TerminateSubscription(Exception):
pass
class IllegalStateError(RuntimeError):
pass
@decorator
def transition_state(required_state, new_state):
def state_transitioner(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
with obj._lock:
if obj.state != required_state:
raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state))
obj.state = new_state
return func(obj, *args, **kwargs)
return wrapper
return state_transitioner
@decorator
def check_state(required_states):
def state_checker(func):
@preserve_signature(func)
def wrapper(obj, *args, **kwargs):
if obj.state not in required_states:
raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state))
return func(obj, *args, **kwargs)
return wrapper
return state_checker
class ConferenceHandler(object):
implements(IObserver)
def __init__(self, session):
self.session = session
self.active = False
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._subscription = None
self._subscription_proc = None
self._subscription_timer = None
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, name='NetworkConditionsDidChange')
self._command_proc = proc.spawn(self._run)
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _activate(self):
self.active = True
command = Command('subscribe')
self._command_channel.send(command)
return command
def _deactivate(self):
self.active = False
command = Command('unsubscribe')
self._command_channel.send(command)
return command
def _resubscribe(self):
command = Command('subscribe')
self._command_channel.send(command)
return command
def _terminate(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.session)
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self._deactivate()
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self.session = None
def _CH_subscribe(self, command):
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._subscription_proc = proc.spawn(self._subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._subscription_proc = None
command.signal()
def _CH_terminate(self, command):
command.signal()
raise proc.ProcExit()
def _subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
try:
# Lookup routes
account = self.session.account
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = SIPURI.new(self.session.remote_identity.uri)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
- except DNSLookupError, e:
+ except DNSLookupError as e:
timeout = random.uniform(15, 30)
raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout)
target_uri = SIPURI.new(self.session.remote_identity.uri)
refresh_interval = getattr(command, 'refresh_interval', account.sip.subscribe_interval)
timeout = time() + 30
for route in routes:
remaining_time = timeout - time()
if remaining_time > 0:
try:
contact_uri = account.contact[route]
except KeyError:
continue
subscription = Subscription(target_uri,
FromHeader(SIPURI.new(self.session.local_identity.uri)),
ToHeader(target_uri),
ContactHeader(contact_uri),
'conference',
RouteHeader(route.uri),
credentials=account.credentials,
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=subscription)
timeout = 5
raise SubscriptionError(error='Internal error', timeout=timeout)
self._subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart':
break
- except SIPSubscriptionDidFail, e:
+ except SIPSubscriptionDidFail as e:
notification_center.remove_observer(self, sender=subscription)
self._subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
timeout = random.uniform(60, 120)
raise SubscriptionError(error='Authentication failed', timeout=timeout)
elif e.data.code == 423:
# Get the value of the Min-Expires header
timeout = random.uniform(60, 120)
if e.data.min_expires is not None and e.data.min_expires > refresh_interval:
raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires)
else:
raise SubscriptionError(error='Interval too short', timeout=timeout)
elif e.data.code in (405, 406, 489, 1400):
command.signal(e)
return
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, reschedule the subscription
timeout = random.uniform(60, 180)
raise SubscriptionError(error='No more routes to try', timeout=timeout)
# At this point it is subscribed. Handle notifications and ending/failures.
try:
while True:
notification = self._data_channel.wait()
if notification.sender is not self._subscription:
continue
if notification.name == 'SIPSubscriptionGotNotify':
if notification.data.event == 'conference' and notification.data.body:
try:
conference_info = ConferenceDocument.parse(notification.data.body)
except ParserError:
pass
else:
notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info))
elif notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._subscription)
- except InterruptSubscription, e:
+ except InterruptSubscription as e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
notification_center.remove_observer(self, sender=self._subscription)
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
- except TerminateSubscription, e:
+ except TerminateSubscription as e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._subscription)
- except SubscriptionError, e:
+ except SubscriptionError as e:
if 'min_expires' in e.attributes:
command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires'])
else:
command = Command('subscribe', command.event)
self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command)
finally:
self.subscribed = False
self._subscription = None
self._subscription_proc = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
self._data_channel.send(notification)
def _NH_SIPSessionDidStart(self, notification):
if self.session.remote_focus:
self._activate()
@run_in_green_thread
def _NH_SIPSessionDidFail(self, notification):
self._terminate()
@run_in_green_thread
def _NH_SIPSessionDidEnd(self, notification):
self._terminate()
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
if self.session.remote_focus and not self.active:
self._activate()
elif not self.session.remote_focus and self.active:
self._deactivate()
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._resubscribe()
class Session(object):
implements(IObserver)
media_stream_timeout = 15
short_reinvite_timeout = 5
def __init__(self, account):
self.account = account
self.direction = None
self.end_time = None
self.on_hold = False
self.proposed_streams = None
self.route = None
self.state = None
self.start_time = None
self.streams = None
self.transport = None
self.local_focus = False
self.remote_focus = False
self.greenlet = None
self.conference = None
self._channel = coros.queue()
self._hold_in_progress = False
self._invitation = None
self._local_identity = None
self._remote_identity = None
self._lock = RLock()
def init_incoming(self, invitation, data):
remote_sdp = invitation.sdp.proposed_remote
if not remote_sdp:
invitation.send_response(488)
return
self.proposed_streams = []
for index, media_stream in enumerate(remote_sdp.media):
if media_stream.port != 0:
for stream_type in MediaStreamRegistry:
try:
stream = stream_type.new_from_sdp(self, remote_sdp, index)
except UnknownStreamError:
continue
except InvalidStreamError as e:
log.error("Invalid stream: {}".format(e))
break
except Exception as e:
log.exception("Exception occurred while setting up stream from SDP: {}".format(e))
break
else:
stream.index = index
self.proposed_streams.append(stream)
break
if not self.proposed_streams:
invitation.send_response(488)
return
if 'Replaces' in data.headers:
invitation.send_response(403)
return
self.direction = 'incoming'
self.state = 'incoming'
self.transport = invitation.transport
self._invitation = invitation
self.conference = ConferenceHandler(self)
if 'isfocus' in invitation.remote_contact_header.parameters:
self.remote_focus = True
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=invitation)
notification_center.post_notification('SIPSessionNewIncoming', self, NotificationData(streams=self.proposed_streams, headers=data.headers))
@transition_state(None, 'connecting')
@run_in_green_thread
def connect(self, from_header, to_header, route, streams, is_focus=False, contact_header=None, extra_headers=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
connected = False
unhandled_notifications = []
extra_headers = extra_headers or []
if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers):
raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed')
self.direction = 'outgoing'
self.proposed_streams = streams
self.route = route
self.transport = self.route.transport
self.local_focus = is_focus
self._invitation = Invitation()
self._local_identity = from_header
self._remote_identity = to_header
self.conference = ConferenceHandler(self)
notification_center.add_observer(self, sender=self._invitation)
notification_center.post_notification('SIPSessionNewOutgoing', self, NotificationData(streams=streams[:]))
for stream in self.proposed_streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
if contact_header is None:
try:
contact_uri = self.account.contact[self.route]
- except KeyError, e:
+ except KeyError as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e))
return
else:
contact_header = ContactHeader(contact_uri)
if SIPConfig.advertised_ip not in (None, '0.0.0.0'):
local_ip = SIPConfig.advertised_ip.normalized
elif SIPConfig.local_ip not in (None, '0.0.0.0'):
local_ip = SIPConfig.local_ip.normalized
else:
local_ip = contact_header.uri.host
connection = SDPConnection(local_ip)
local_sdp = SDPSession(local_ip, name=settings.user_agent)
for index, stream in enumerate(self.proposed_streams):
stream.index = index
media = stream.get_local_media(remote_sdp=None, index=index)
if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates):
media.connection = connection
local_sdp.media.append(media)
route_header = RouteHeader(self.route.uri)
if is_focus:
contact_header.parameters['isfocus'] = None
self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, extra_headers=extra_headers)
try:
with api.timeout(settings.sip.invite_timeout):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
break
else:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self, )
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connected':
if not connected:
connected = True
else:
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except api.TimeoutError:
self.end()
return
notification_center.post_notification('SIPSessionWillStart', self)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map[index]
if remote_media.port:
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
invitation_notifications = []
with api.timeout(self.media_stream_timeout):
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
invitation_notifications.append(notification)
[self._channel.send(notification) for notification in invitation_notifications]
while not connected or self._channel:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self)
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connected':
if not connected:
connected = True
else:
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
- except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e:
+ except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
elif isinstance(e, MediaStreamDidNotInitializeError):
error = 'media stream did not initialize: %s' % e.data.reason
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', code=0, reason=None, error=error)
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
# As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE
if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE':
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator))
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason))
else:
if e.data.originator == 'remote':
code = e.data.code
reason = e.data.reason
else:
code = getattr(e.data, 'code', 0)
reason = getattr(e.data, 'reason', 'Session disconnected')
if e.data.originator == 'remote' and code // 100 == 3:
redirect_identities = e.data.headers.get('Contact', [])
else:
redirect_identities = None
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities))
self.greenlet = None
- except SIPCoreError, e:
+ except SIPCoreError as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = ISOTimestamp.now()
any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in self.streams)
if any_stream_ice:
self._reinvite_after_ice()
notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
def _reinvite_after_ice(self):
# This function does not do any error checking, it's designed to be called at the end of connect and ad
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for index, stream in enumerate(self.streams):
local_sdp.media[index] = stream.get_local_media(remote_sdp=None, index=index)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
try:
with api.timeout(self.short_reinvite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for index, stream in enumerate(self.streams):
stream.update(local_sdp, remote_sdp, index)
else:
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
elif notification.data.state == 'disconnected':
self.end()
return
except Exception:
pass
finally:
self.state = 'connected'
self.greenlet = None
@check_state(['incoming', 'received_proposal'])
@run_in_green_thread
def send_ring_indication(self):
try:
self._invitation.send_response(180)
except SIPCoreInvalidStateError:
pass # The INVITE session might have already been cancelled; ignore the error
@transition_state('incoming', 'accepting')
@run_in_green_thread
def accept(self, streams, is_focus=False, extra_headers=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
self.local_focus = is_focus
connected = False
unhandled_notifications = []
extra_headers = extra_headers or []
if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers):
raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed')
for stream in self.proposed_streams:
if stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
self.proposed_streams = streams
wait_count = len(self.proposed_streams)
try:
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
remote_sdp = self._invitation.sdp.proposed_remote
if SIPConfig.advertised_ip not in (None, '0.0.0.0'):
local_ip = SIPConfig.advertised_ip.normalized
elif SIPConfig.local_ip not in (None, '0.0.0.0'):
local_ip = SIPConfig.local_ip.normalized
else:
sdp_connection = remote_sdp.connection or next(media.connection for media in remote_sdp.media if media.connection is not None)
local_ip = host.outgoing_ip_for(sdp_connection.address) if sdp_connection.address != '0.0.0.0' else sdp_connection.address
if local_ip is None:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address')
return
connection = SDPConnection(local_ip)
local_sdp = SDPSession(local_ip, name=settings.user_agent)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, media in enumerate(remote_sdp.media):
stream = stream_map.get(index, None)
if stream is not None:
media = stream.get_local_media(remote_sdp=remote_sdp, index=index)
if not media.has_ice_attributes and not media.has_ice_candidates:
media.connection = connection
else:
media = SDPMediaStream.new(media)
media.connection = connection
media.port = 0
media.attributes = []
media.bandwidth_info = []
local_sdp.media.append(media)
contact_header = ContactHeader.new(self._invitation.local_contact_header)
try:
local_contact_uri = self.account.contact[self._invitation.transport]
contact_header.uri = local_contact_uri
except KeyError:
pass
if is_focus:
contact_header.parameters['isfocus'] = None
self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp, extra_headers=extra_headers)
notification_center.post_notification('SIPSessionWillStart', sender=self)
# Local and remote SDPs will be set after the 200 OK is sent
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
break
else:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected':
if not connected:
connected = True
elif notification.data.prev_state == 'connected':
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
wait_count = 0
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map.get(index, None)
if stream is not None:
if remote_media.port:
wait_count += 1
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
with api.timeout(self.media_stream_timeout):
while wait_count > 0 or not connected or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected':
if not connected:
connected = True
elif notification.data.prev_state == 'connected':
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
else:
unhandled_notifications.append(notification)
- except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError), e:
+ except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
reason_header = None
if isinstance(e, api.TimeoutError):
if wait_count > 0:
error = 'media stream timed out while starting'
else:
error = 'No ACK received'
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = 'Missing ACK'
elif isinstance(e, MediaStreamDidNotInitializeError):
error = 'media stream did not initialize: %s' % e.data.reason
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = 'media stream did not initialize'
else:
error = 'media stream failed: %s' % e.data.reason
reason_header = ReasonHeader('SIP')
reason_header.cause = 500
reason_header.text = 'media stream failed to start'
self.start_time = ISOTimestamp.now()
if self._invitation.state in ('incoming', 'early'):
self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header)
else:
self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header)
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
if e.data.prev_state in ('incoming', 'early'):
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None))
elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK':
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None))
else:
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote'))
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason))
self.greenlet = None
except SIPCoreInvalidStateError:
# the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.greenlet = None
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
- except SIPCoreError, e:
+ except SIPCoreError as e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('incoming', 'terminating')
@run_in_green_thread
def reject(self, code=603, reason=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.send_response(code, reason)
with api.timeout(1):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'disconnected':
break
except SIPCoreInvalidStateError:
# the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
self.proposed_streams = None
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
- except SIPCoreError, e:
+ except SIPCoreError as e:
self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e))
except api.TimeoutError:
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
self.proposed_streams = None
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None))
else:
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
self.proposed_streams = None
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None))
@transition_state('received_proposal', 'accepting_proposal')
@run_in_green_thread
def accept_proposal(self, streams):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
streams = [stream for stream in streams if stream in self.proposed_streams]
for stream in streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='incoming')
try:
wait_count = len(streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
remote_sdp = self._invitation.sdp.proposed_remote
connection = SDPConnection(local_sdp.address)
stream_map = dict((stream.index, stream) for stream in streams)
for index, media in enumerate(remote_sdp.media):
stream = stream_map.get(index, None)
if stream is not None:
media = stream.get_local_media(remote_sdp=remote_sdp, index=index)
if not media.has_ice_attributes and not media.has_ice_candidates:
media.connection = connection
if index < len(local_sdp.media):
local_sdp.media[index] = media
else:
local_sdp.media.append(media)
elif index >= len(local_sdp.media): # actually == is sufficient
media = SDPMediaStream.new(media)
media.connection = connection
media.port = 0
media.attributes = []
media.bandwidth_info = []
local_sdp.media.append(media)
self._invitation.send_response(200, sdp=local_sdp)
prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
received_invitation_state = False
received_sdp_update = False
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
else:
self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
if on_hold_streams != prev_on_hold_streams:
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams),
partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams)))
for stream in streams:
# TODO: check if port is 0 in local_sdp. In that case PJSIP disabled the stream becuase it couldn't
# negotiation failed. If there are more streams, however, the negotiation is considered successful as a
# whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind io
# OK, but we cannot really start the stream. -Saul
stream.start(local_sdp, remote_sdp, stream.index)
with api.timeout(self.media_stream_timeout):
wait_count = len(streams)
while wait_count > 0 or self._channel:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
else:
unhandled_notifications.append(notification)
except api.TimeoutError:
self._fail_proposal(originator='remote', error='media stream timed out while starting')
except MediaStreamDidNotInitializeError as e:
self._fail_proposal(originator='remote', error='media stream did not initialize: {.data.reason}'.format(e))
except MediaStreamDidFailError as e:
self._fail_proposal(originator='remote', error='media stream failed: {.data.reason}'.format(e))
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
self._fail_proposal(originator='remote', error='session ended')
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
- except SIPCoreError, e:
+ except SIPCoreError as e:
self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.streams + streams
proposed_streams = self.proposed_streams[:]
self.proposed_streams = None
notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=proposed_streams))
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('received_proposal', 'rejecting_proposal')
@run_in_green_thread
def reject_proposal(self, code=488, reason=None):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.send_response(code, reason)
with api.timeout(1, None):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
break
- except SIPCoreError, e:
+ except SIPCoreError as e:
self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
proposed_streams = self.proposed_streams[:]
self.proposed_streams = None
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=proposed_streams))
if self._hold_in_progress:
self._send_hold()
def add_stream(self, stream):
self.add_streams([stream])
@transition_state('connected', 'sending_proposal')
@run_in_green_thread
def add_streams(self, streams):
streams = list(set(streams).difference(self.streams))
if not streams:
self.state = 'connected'
return
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
unhandled_notifications = []
self.proposed_streams = streams
for stream in self.proposed_streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
# This is actually the only reason for which this notification could be received
if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal':
self._fail_proposal(originator='local', error='received stream proposal')
self.handle_notification(notification)
return
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.proposed_streams:
# Try to reuse a disabled media stream to avoid an ever-growing SDP
try:
index = next(index for index, media in enumerate(local_sdp.media) if media.port == 0)
reuse_media = True
except StopIteration:
index = len(local_sdp.media)
reuse_media = False
stream.index = index
media = stream.get_local_media(remote_sdp=None, index=index)
if reuse_media:
local_sdp.media[index] = media
else:
local_sdp.media.append(media)
self._invitation.send_reinvite(sdp=local_sdp)
notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:]))
received_invitation_state = False
received_sdp_update = False
try:
with api.timeout(settings.sip.invite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for s in self.streams:
s.update(local_sdp, remote_sdp, s.index)
else:
self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
if notification.data.code >= 300:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.greenlet = None
self.state = 'connected'
proposed_streams = self.proposed_streams[:]
self.proposed_streams = None
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams))
return
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except api.TimeoutError:
self.cancel_proposal()
return
accepted_streams = []
for stream in self.proposed_streams:
try:
remote_media = remote_sdp.media[stream.index]
except IndexError:
self._fail_proposal(originator='local', error='SDP media missing in answer')
return
else:
if remote_media.port:
stream.start(local_sdp, remote_sdp, stream.index)
accepted_streams.append(stream)
else:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
with api.timeout(self.media_stream_timeout):
wait_count = len(accepted_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
except api.TimeoutError:
self._fail_proposal(originator='local', error='media stream timed out while starting')
except MediaStreamDidNotInitializeError as e:
self._fail_proposal(originator='local', error='media stream did not initialize: {.data.reason}'.format(e))
except MediaStreamDidFailError as e:
self._fail_proposal(originator='local', error='media stream failed: {.data.reason}'.format(e))
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
self._fail_proposal(originator='local', error='session ended')
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
- except SIPCoreError, e:
+ except SIPCoreError as e:
self._fail_proposal(originator='local', error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams += accepted_streams
proposed_streams = self.proposed_streams
self.proposed_streams = None
any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in accepted_streams)
if any_stream_ice:
self._reinvite_after_ice()
notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=accepted_streams, proposed_streams=proposed_streams))
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=accepted_streams, removed_streams=[]))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
def remove_stream(self, stream):
self.remove_streams([stream])
@transition_state('connected', 'sending_proposal')
@run_in_green_thread
def remove_streams(self, streams):
streams = list(set(streams).intersection(self.streams))
if not streams:
self.state = 'connected'
return
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
self.streams.remove(stream)
media = local_sdp.media[stream.index]
media.port = 0
media.attributes = []
media.bandwidth_info = []
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
with api.timeout(self.short_reinvite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for s in self.streams:
s.update(local_sdp, remote_sdp, s.index)
else:
# TODO
pass
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
if not (200 <= notification.data.code < 300):
break
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
for stream in streams:
stream.end()
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
except (api.TimeoutError, MediaStreamDidFailError, SIPCoreError):
for stream in streams:
stream.end()
self.end()
else:
for stream in streams:
stream.end()
self.greenlet = None
self.state = 'connected'
notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=streams))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
@transition_state('sending_proposal', 'cancelling_proposal')
@run_in_green_thread
def cancel_proposal(self):
if self.greenlet is not None:
api.kill(self.greenlet, api.GreenletExit())
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
try:
self._invitation.cancel_reinvite()
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
if notification.data.code == 487:
proposed_streams = (self.proposed_streams or [])[:]
for stream in proposed_streams:
stream.deactivate()
stream.end()
self.state = 'connected'
self.proposed_streams = None
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams))
elif notification.data.code == 200:
self.end()
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
break
- except SIPCoreError, e:
+ except SIPCoreError as e:
proposed_streams = (self.proposed_streams or [])[:]
for stream in proposed_streams:
stream.deactivate()
stream.end()
self.greenlet = None
self.state = 'connected'
self.proposed_streams = None
notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=proposed_streams))
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
self.proposed_streams = None
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
else:
self.proposed_streams = None
self.greenlet = None
self.state = 'connected'
finally:
if self._hold_in_progress:
self._send_hold()
@run_in_green_thread
def hold(self):
if self.on_hold or self._hold_in_progress:
return
self._hold_in_progress = True
streams = (self.streams or []) + (self.proposed_streams or [])
if not streams:
return
for stream in streams:
stream.hold()
if self.state == 'connected':
self._send_hold()
@run_in_green_thread
def unhold(self):
if not self.on_hold and not self._hold_in_progress:
return
self._hold_in_progress = False
streams = (self.streams or []) + (self.proposed_streams or [])
if not streams:
return
for stream in streams:
stream.unhold()
if self.state == 'connected':
self._send_unhold()
@run_in_green_thread
def end(self):
if self.state in (None, 'terminating', 'terminated'):
return
if self.greenlet is not None:
api.kill(self.greenlet, api.GreenletExit())
self.greenlet = None
notification_center = NotificationCenter()
if self._invitation is None or self._invitation.state is None:
# The invitation was not yet constructed
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
return
invitation_state = self._invitation.state
if invitation_state in ('disconnecting', 'disconnected'):
return
self.greenlet = api.getcurrent()
self.state = 'terminating'
if invitation_state == 'connected':
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local'))
streams = (self.streams or []) + (self.proposed_streams or [])
for stream in streams[:]:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
streams.remove(stream)
else:
stream.deactivate()
cancelling = invitation_state != 'connected' and self.direction == 'outgoing'
try:
self._invitation.end(timeout=1)
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected':
break
- except SIPCoreError, e:
+ except SIPCoreError as e:
if cancelling:
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None))
else:
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e)))
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
# As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE
if e.data.prev_state == 'connected':
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason))
elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None))
else:
if e.data.originator == 'remote':
code = e.data.code
reason = e.data.reason
elif e.data.disconnect_reason == 'timeout':
code = 408
reason = 'timeout'
else:
code = 0
reason = None
if e.data.originator == 'remote' and code // 100 == 3:
redirect_identities = e.data.headers.get('Contact', [])
else:
redirect_identities = None
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities))
else:
if cancelling:
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None))
else:
self.end_time = ISOTimestamp.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request'))
finally:
for stream in streams:
stream.end()
notification_center.remove_observer(self, sender=self._invitation)
self.greenlet = None
self.state = 'terminated'
@property
def local_identity(self):
if self._invitation is not None and self._invitation.local_identity is not None:
return self._invitation.local_identity
else:
return self._local_identity
@property
def peer_address(self):
return self._invitation.peer_address if self._invitation is not None else None
@property
def remote_identity(self):
if self._invitation is not None and self._invitation.remote_identity is not None:
return self._invitation.remote_identity
else:
return self._remote_identity
@property
def remote_user_agent(self):
return self._invitation.remote_user_agent if self._invitation is not None else None
@property
def call_id(self):
return self._invitation.call_id if self._invitation is not None else None
@property
def request_uri(self):
return self._invitation.request_uri if self._invitation is not None else None
def _cancel_hold(self):
notification_center = NotificationCenter()
try:
self._invitation.cancel_reinvite()
while True:
try:
notification = self._channel.wait()
except MediaStreamDidFailError:
continue
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
if notification.data.code == 200:
self.end()
return False
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
break
except SIPCoreError:
pass
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
return False
return True
def _send_hold(self):
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
with api.timeout(self.short_reinvite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
else:
# TODO
pass
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
return
except api.TimeoutError:
if not self._cancel_hold():
return
except SIPCoreError:
pass
self.greenlet = None
self.on_hold = True
self.state = 'connected'
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams)))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._hold_in_progress = False
else:
for stream in self.streams:
stream.unhold()
self._send_unhold()
def _send_unhold(self):
self.state = 'sending_proposal'
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
unhandled_notifications = []
try:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index)
self._invitation.send_reinvite(sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
with api.timeout(self.short_reinvite_timeout):
while not received_invitation_state or not received_sdp_update:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
self.greenlet = None
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = notification_center
self.handle_notification(notification)
return
except api.TimeoutError:
if not self._cancel_hold():
return
except SIPCoreError:
pass
self.greenlet = None
self.on_hold = False
self.state = 'connected'
notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
for stream in self.streams:
stream.hold()
self._send_hold()
def _fail(self, originator, code, reason, error, reason_header=None):
notification_center = NotificationCenter()
prev_inv_state = self._invitation.state
self.state = 'terminating'
if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'):
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator))
if self._invitation.state not in (None, 'disconnecting', 'disconnected'):
try:
if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'):
if 400 <= code <= 699 and reason is not None:
self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else [])
else:
self._invitation.end(extra_headers=[reason_header] if reason_header is not None else [])
with api.timeout(1):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected':
break
except (api.TimeoutError, SIPCoreError):
pass
notification_center.remove_observer(self, sender=self._invitation)
self.state = 'terminated'
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None))
self.greenlet = None
def _fail_proposal(self, originator, error):
notification_center = NotificationCenter()
has_streams = bool(self.proposed_streams)
for stream in self.proposed_streams:
try:
notification_center.remove_observer(self, sender=stream)
except KeyError:
# _fail_proposal can be called from reject_proposal, which means the stream will
# not have been initialized or the session registered as an observer for it.
pass
else:
stream.deactivate()
stream.end()
if originator == 'remote' and self._invitation.sub_state == 'received_proposal':
try:
self._invitation.send_response(488 if has_streams else 500)
except SIPCoreError:
pass
notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:]))
self.state = 'connected'
self.proposed_streams = None
self.greenlet = None
@run_in_green_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPInvitationChangedState(self, notification):
if self.state == 'terminated':
return
if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'):
contact_header = notification.data.headers.get('Contact', None)
if contact_header and 'isfocus' in contact_header[0].parameters:
self.remote_focus = True
if self.greenlet is not None:
if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting':
self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data))
else:
self._channel.send(notification)
else:
self.greenlet = api.getcurrent()
unhandled_notifications = []
try:
if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal':
self.state = 'received_proposal'
try:
proposed_remote_sdp = self._invitation.sdp.proposed_remote
active_remote_sdp = self._invitation.sdp.active_remote
if len(proposed_remote_sdp.media) < len(active_remote_sdp.media):
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Streams cannot be deleted from the SDP')])
self.state = 'connected'
return
for stream in self.streams:
if not stream.validate_update(proposed_remote_sdp, stream.index):
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)])
self.state = 'connected'
return
added_media_indexes = set()
removed_media_indexes = set()
reused_media_indexes = set()
for index, media_stream in enumerate(proposed_remote_sdp.media):
if index >= len(active_remote_sdp.media):
added_media_indexes.add(index)
elif media_stream.port == 0 and active_remote_sdp.media[index].port > 0:
removed_media_indexes.add(index)
elif media_stream.port > 0 and active_remote_sdp.media[index].port == 0:
reused_media_indexes.add(index)
elif media_stream.media != active_remote_sdp.media[index].media:
added_media_indexes.add(index)
removed_media_indexes.add(index)
if added_media_indexes | reused_media_indexes and removed_media_indexes:
engine = Engine()
self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')])
self.state = 'connected'
return
elif added_media_indexes | reused_media_indexes:
self.proposed_streams = []
for index in added_media_indexes | reused_media_indexes:
media_stream = proposed_remote_sdp.media[index]
if media_stream.port != 0:
for stream_type in MediaStreamRegistry:
try:
stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index)
except UnknownStreamError:
continue
except InvalidStreamError as e:
log.error("Invalid stream: {}".format(e))
break
except Exception as e:
log.exception("Exception occurred while setting up stream from SDP: {}".format(e))
break
else:
stream.index = index
self.proposed_streams.append(stream)
break
if self.proposed_streams:
self._invitation.send_response(100)
notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:]))
else:
self._invitation.send_response(488)
self.state = 'connected'
return
else:
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes]
prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
for stream in removed_streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
media = local_sdp.media[stream.index]
media.port = 0
media.attributes = []
media.bandwidth_info = []
for stream in self.streams:
local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=proposed_remote_sdp, index=stream.index)
try:
self._invitation.send_response(200, sdp=local_sdp)
except PJSIPError:
for stream in removed_streams:
self.streams.remove(stream)
stream.end()
if removed_streams:
self.end()
return
else:
try:
self._invitation.send_response(488)
except PJSIPError:
self.end()
return
else:
for stream in removed_streams:
self.streams.remove(stream)
stream.end()
received_invitation_state = False
received_sdp_update = False
while not received_sdp_update or not received_invitation_state or self._channel:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
else:
# TODO
pass
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
else:
unhandled_notifications.append(notification)
else:
unhandled_notifications.append(notification)
on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote)
if on_hold_streams != prev_on_hold_streams:
hold_supported_streams = (stream for stream in self.streams if stream.hold_supported)
notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams),
partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams)))
if removed_media_indexes:
notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams))
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
self.greenlet = None
self.state = 'connected'
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = NotificationCenter()
self.handle_notification(notification)
except SIPCoreError:
self.end()
else:
self.state = 'connected'
elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request':
self.state = 'received_proposal_request'
try:
# An empty proposal was received, generate an offer
self._invitation.send_response(100)
local_sdp = SDPSession.new(self._invitation.sdp.active_local)
local_sdp.version += 1
connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip)
if local_sdp.connection is not None:
local_sdp.connection.address = connection_address
for index, stream in enumerate(self.streams):
stream.reset(index)
media = stream.get_local_media(remote_sdp=None, index=index)
if media.connection is not None:
media.connection.address = connection_address
local_sdp.media[stream.index] = media
self._invitation.send_response(200, sdp=local_sdp)
received_invitation_state = False
received_sdp_update = False
while not received_sdp_update or not received_invitation_state or self._channel:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
received_sdp_update = True
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
for stream in self.streams:
stream.update(local_sdp, remote_sdp, stream.index)
else:
# TODO
pass
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'connected' and notification.data.sub_state == 'normal':
received_invitation_state = True
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
else:
unhandled_notifications.append(notification)
else:
unhandled_notifications.append(notification)
- except InvitationDisconnectedError, e:
+ except InvitationDisconnectedError as e:
self.greenlet = None
self.state = 'connected'
notification = Notification('SIPInvitationChangedState', e.invitation, e.data)
notification.center = NotificationCenter()
self.handle_notification(notification)
except SIPCoreError:
raise # FIXME
else:
self.state = 'connected'
elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal':
if notification.data.originator == 'local' and notification.data.code == 487:
self.state = 'connected'
proposed_streams = self.proposed_streams[:]
self.proposed_streams = None
notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams))
if self._hold_in_progress:
self._send_hold()
elif notification.data.state == 'disconnected':
if self.state == 'incoming':
self.state = 'terminated'
if notification.data.originator == 'remote':
notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None))
else:
# There must have been an error involved
notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None))
else:
notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator))
for stream in self.streams:
notification.center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
self.end_time = ISOTimestamp.now()
notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason))
notification.center.remove_observer(self, sender=self._invitation)
finally:
self.greenlet = None
for notification in unhandled_notifications:
self.handle_notification(notification)
def _NH_SIPInvitationGotSDPUpdate(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_SIPInvitationTransferNewIncoming(self, notification):
self._invitation.notify_transfer_progress(500)
def _NH_RTPStreamDidEnableEncryption(self, notification):
if notification.sender.type != 'audio':
return
audio_stream = notification.sender
if audio_stream.encryption.type == 'ZRTP':
# start ZRTP on the video stream, if applicable
try:
video_stream = next(stream for stream in self.streams or [] if stream.type=='video')
except StopIteration:
return
if video_stream.encryption.type == 'ZRTP' and not video_stream.encryption.active:
video_stream.encryption.zrtp._enable(audio_stream)
def _NH_MediaStreamDidStart(self, notification):
stream = notification.sender
if stream.type == 'audio' and stream.encryption.type == 'ZRTP':
stream.encryption.zrtp._enable()
elif stream.type == 'video' and stream.encryption.type == 'ZRTP':
# start ZRTP on the video stream, if applicable
try:
audio_stream = next(stream for stream in self.streams or [] if stream.type=='audio')
except StopIteration:
pass
else:
if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active:
stream.encryption.zrtp._enable(audio_stream)
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidInitialize(self, notification):
if self.greenlet is not None:
self._channel.send(notification)
def _NH_MediaStreamDidNotInitialize(self, notification):
if self.greenlet is not None and self.state not in ('terminating', 'terminated'):
self._channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data))
def _NH_MediaStreamDidFail(self, notification):
if self.greenlet is not None:
if self.state not in ('terminating', 'terminated'):
self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data))
else:
stream = notification.sender
if self.streams == [stream]:
self.end()
else:
try:
self.remove_stream(stream)
except IllegalStateError:
self.end()
class SessionManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
self.sessions = []
self.state = None
self._channel = coros.queue()
def start(self):
self.state = 'starting'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillStart', sender=self)
notification_center.add_observer(self, 'SIPInvitationChangedState')
notification_center.add_observer(self, 'SIPSessionNewIncoming')
notification_center.add_observer(self, 'SIPSessionNewOutgoing')
notification_center.add_observer(self, 'SIPSessionDidFail')
notification_center.add_observer(self, 'SIPSessionDidEnd')
self.state = 'started'
notification_center.post_notification('SIPSessionManagerDidStart', sender=self)
def stop(self):
self.state = 'stopping'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillEnd', sender=self)
for session in self.sessions:
session.end()
while self.sessions:
self._channel.wait()
notification_center.remove_observer(self, 'SIPInvitationChangedState')
notification_center.remove_observer(self, 'SIPSessionNewIncoming')
notification_center.remove_observer(self, 'SIPSessionNewOutgoing')
notification_center.remove_observer(self, 'SIPSessionDidFail')
notification_center.remove_observer(self, 'SIPSessionDidEnd')
self.state = 'stopped'
notification_center.post_notification('SIPSessionManagerDidEnd', sender=self)
@run_in_twisted_thread
def handle_notification(self, notification):
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming':
account_manager = AccountManager()
account = account_manager.find_account(notification.data.request_uri)
if account is None:
account = DefaultAccount()
notification.sender.send_response(100)
session = Session(account)
session.init_incoming(notification.sender, notification.data)
elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'):
self.sessions.append(notification.sender)
elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'):
self.sessions.remove(notification.sender)
if self.state == 'stopping':
self._channel.send(notification)
diff --git a/sylk/streams.py b/sylk/streams.py
index a82badd..1e9bb99 100644
--- a/sylk/streams.py
+++ b/sylk/streams.py
@@ -1,382 +1,382 @@
import random
from collections import defaultdict
from functools import partial
from application.notification import NotificationCenter, NotificationData
from eventlib import api
from eventlib.coros import queue
from eventlib.proc import spawn, ProcExit
from msrplib.connect import DirectConnector, DirectAcceptor
from msrplib.protocol import URI, FailureReportHeader, SuccessReportHeader, UseNicknameHeader
from msrplib.session import contains_mime_type, MSRPSession
from msrplib.transport import make_response
from sipsimple.core import SDPAttribute
from sipsimple.payloads import ParserError
from sipsimple.payloads.iscomposing import IsComposingDocument, State, LastActive, Refresh, ContentType
from sipsimple.streams import InvalidStreamError, UnknownStreamError
from sipsimple.streams.msrp import MSRPStreamBase as _MSRPStreamBase, MSRPStreamError, NotificationProxyLogger
from sipsimple.streams.msrp.chat import ChatStream as _ChatStream, ChatStreamError, ChatIdentity, Message, QueuedMessage, CPIMPayload, CPIMParserError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from sipsimple.util import ISOTimestamp
from sylk.configuration import SIPConfig
@run_in_green_thread
def MSRPStreamBase_initialize(self, session, direction):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
try:
self.session = session
self.transport = self.session.account.msrp.transport
outgoing = direction=='outgoing'
logger = NotificationProxyLogger()
if self.session.account.msrp.connection_model == 'relay':
if not outgoing and self.remote_role in ('actpass', 'passive'):
# 'passive' not allowed by the RFC but play nice for interoperability. -Saul
self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True)
self.local_role = 'active'
elif not outgoing:
if self.transport=='tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key):
raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate")
self.msrp_connector = DirectAcceptor(logger=logger)
self.local_role = 'passive'
else:
# outgoing
self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True)
self.local_role = 'active'
else:
if not outgoing and self.remote_role in ('actpass', 'passive'):
# 'passive' not allowed by the RFC but play nice for interoperability. -Saul
self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True)
self.local_role = 'active'
else:
if not outgoing and self.transport=='tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key):
raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate")
self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True)
self.local_role = 'actpass' if outgoing else 'passive'
full_local_path = self.msrp_connector.prepare(local_uri=URI(host=SIPConfig.local_ip.normalized, port=0, use_tls=self.transport=='tls', credentials=self.session.account.tls_credentials))
self.local_media = self._create_local_media(full_local_path)
- except Exception, e:
+ except Exception as e:
notification_center.post_notification('MediaStreamDidNotInitialize', self, NotificationData(reason=str(e)))
else:
self._initialized = True
notification_center.post_notification('MediaStreamDidInitialize', self)
finally:
self.greenlet = None
# Monkey-patch the initialize method (needed because we want every MSRP based stream to behave this way, including file transfers)
#
_MSRPStreamBase.initialize = MSRPStreamBase_initialize
class ChatStream(_MSRPStreamBase):
type = 'chat'
priority = _ChatStream.priority + 1
msrp_session_class = MSRPSession
media_type = 'message'
accept_types = ['message/cpim']
accept_wrapped_types = ['*']
supported_chatroom_capabilities = ['nickname', 'private-messages', 'com.ag-projects.screen-sharing', 'com.ag-projects.zrtp-sas']
def __init__(self):
super(ChatStream, self).__init__(direction='sendrecv')
self.message_queue = queue()
self.sent_messages = set()
self.incoming_queue = defaultdict(list)
self.message_queue_thread = None
@classmethod
def new_from_sdp(cls, session, remote_sdp, stream_index):
remote_stream = remote_sdp.media[stream_index]
if remote_stream.media != 'message':
raise UnknownStreamError
expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport=='tls' else 'TCP/MSRP'
if remote_stream.transport != expected_transport:
raise InvalidStreamError("expected %s transport in chat stream, got %s" % (expected_transport, remote_stream.transport))
if remote_stream.formats != ['*']:
raise InvalidStreamError("wrong format list specified")
stream = cls()
stream.remote_role = remote_stream.attributes.getfirst('setup', 'active')
if remote_stream.direction != 'sendrecv':
raise InvalidStreamError("Unsupported direction for chat stream: %s" % remote_stream.direction)
remote_accept_types = remote_stream.attributes.getfirst('accept-types')
if remote_accept_types is None:
raise InvalidStreamError("remote SDP media does not have 'accept-types' attribute")
if not any(contains_mime_type(cls.accept_types, mime_type) for mime_type in remote_accept_types.split()):
raise InvalidStreamError("no compatible media types found")
return stream
@property
def local_identity(self):
try:
return ChatIdentity(self.session.local_identity.uri, self.session.account.display_name)
except AttributeError:
return None
@property
def remote_identity(self):
try:
return ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name)
except AttributeError:
return None
@property
def private_messages_allowed(self):
return 'private-messages' in self.chatroom_capabilities
@property
def nickname_allowed(self):
return 'nickname' in self.chatroom_capabilities
@property
def chatroom_capabilities(self):
try:
if self.session.local_focus:
return ' '.join(self.local_media.attributes.getall('chatroom')).split()
elif self.session.remote_focus:
return ' '.join(self.remote_media.attributes.getall('chatroom')).split()
except AttributeError:
pass
return []
def _NH_MediaStreamDidStart(self, notification):
self.message_queue_thread = spawn(self._message_queue_handler)
def _NH_MediaStreamDidNotInitialize(self, notification):
message_queue, self.message_queue = self.message_queue, queue()
while message_queue:
message = message_queue.wait()
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream was closed')
notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
def _NH_MediaStreamDidEnd(self, notification):
if self.message_queue_thread is not None:
self.message_queue_thread.kill()
else:
message_queue, self.message_queue = self.message_queue, queue()
while message_queue:
message = message_queue.wait()
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended')
notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
def _create_local_media(self, uri_path):
local_media = super(ChatStream, self)._create_local_media(uri_path)
if self.session.local_focus and self.supported_chatroom_capabilities:
local_media.attributes.append(SDPAttribute('chatroom', ' '.join(self.supported_chatroom_capabilities)))
return local_media
def _handle_REPORT(self, chunk):
# in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message.
if chunk.message_id in self.sent_messages:
self.sent_messages.remove(chunk.message_id)
notification_center = NotificationCenter()
data = NotificationData(message_id=chunk.message_id, message=chunk, code=chunk.status.code, reason=chunk.status.comment)
if chunk.status.code == 200:
notification_center.post_notification('ChatStreamDidDeliverMessage', sender=self, data=data)
else:
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
def _handle_SEND(self, chunk):
# This ChatStream doesn't send MSRP REPORT chunks automatically, the developer needs to manually send them
if chunk.size == 0:
# keep-alive
self.msrp_session.send_report(chunk, 200, 'OK')
return
if self.direction == 'sendonly':
self.msrp_session.send_report(chunk, 413, 'Unwanted Message')
return
if chunk.content_type.lower() != 'message/cpim':
self.incoming_queue.pop(chunk.message_id, None)
self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type')
return
if chunk.contflag == '#':
self.incoming_queue.pop(chunk.message_id, None)
self.msrp_session.send_report(chunk, 200, 'OK')
return
elif chunk.contflag == '+':
self.incoming_queue[chunk.message_id].append(chunk.data)
self.msrp_session.send_report(chunk, 200, 'OK')
return
else:
data = ''.join(self.incoming_queue.pop(chunk.message_id, [])) + chunk.data
try:
payload = CPIMPayload.decode(data)
except CPIMParserError:
self.msrp_session.send_report(chunk, 400, 'CPIM Parser Error')
return
message = Message(**{name: getattr(payload, name) for name in Message.__slots__})
if not contains_mime_type(self.accept_wrapped_types, message.content_type):
self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type')
return
if message.timestamp is None:
message.timestamp = ISOTimestamp.now()
if message.sender is None:
message.sender = self.remote_identity
if payload.charset is not None:
message.content = message.content.decode(payload.charset)
private = self.session.remote_focus and len(message.recipients) == 1 and message.recipients[0] != self.remote_identity
notification_center = NotificationCenter()
if message.content_type.lower() == IsComposingDocument.content_type:
try:
data = IsComposingDocument.parse(message.content)
except ParserError as e:
self.msrp_session.send_report(chunk, 400, str(e))
return
ndata = NotificationData(state=data.state.value,
refresh=data.refresh.value if data.refresh is not None else 120,
content_type=data.content_type.value if data.content_type is not None else None,
last_active=data.last_active.value if data.last_active is not None else None,
sender=message.sender,
recipients=message.recipients,
private=private,
chunk=chunk)
notification_center.post_notification('ChatStreamGotComposingIndication', self, ndata)
else:
ndata = NotificationData(message=message,
private=private,
chunk=chunk)
notification_center.post_notification('ChatStreamGotMessage', self, ndata)
def _handle_NICKNAME(self, chunk):
nickname = chunk.headers['Use-Nickname'].decoded
NotificationCenter().post_notification('ChatStreamGotNicknameRequest', self, NotificationData(nickname=nickname, chunk=chunk))
def _on_transaction_response(self, message_id, response):
if message_id in self.sent_messages and response.code != 200:
self.sent_messages.remove(message_id)
data = NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment)
NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
def _on_nickname_transaction_response(self, message_id, response):
notification_center = NotificationCenter()
if response.code == 200:
notification_center.post_notification('ChatStreamDidSetNickname', sender=self, data=NotificationData(message_id=message_id, response=response))
else:
notification_center.post_notification('ChatStreamDidNotSetNickname', sender=self, data=NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment))
def _message_queue_handler(self):
notification_center = NotificationCenter()
try:
while True:
message = self.message_queue.wait()
if self.msrp_session is None:
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended')
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
break
try:
if isinstance(message.content, unicode):
message.content = message.content.encode('utf8')
charset = 'utf8'
else:
charset = None
message.sender = message.sender or self.local_identity
message.recipients = message.recipients or [self.remote_identity]
message.timestamp = message.timestamp or ISOTimestamp.now()
payload = CPIMPayload(charset=charset, **{name: getattr(message, name) for name in Message.__slots__})
- except ChatStreamError, e:
+ except ChatStreamError as e:
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason=e.args[0])
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
continue
else:
content, content_type = payload.encode()
message_id = message.id
notify_progress = message.notify_progress
report = 'yes' if notify_progress else 'no'
chunk = self.msrp_session.make_message(content, content_type=content_type, message_id=message_id)
chunk.add_header(FailureReportHeader(report))
chunk.add_header(SuccessReportHeader(report))
try:
self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_transaction_response, message_id))
- except Exception, e:
+ except Exception as e:
if notify_progress:
data = NotificationData(message_id=message_id, message=None, code=0, reason=str(e))
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
except ProcExit:
if notify_progress:
data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended')
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
raise
else:
if notify_progress:
self.sent_messages.add(message_id)
notification_center.post_notification('ChatStreamDidSendMessage', sender=self, data=NotificationData(message=chunk))
finally:
self.message_queue_thread = None
while self.sent_messages:
message_id = self.sent_messages.pop()
data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended')
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
message_queue, self.message_queue = self.message_queue, queue()
while message_queue:
message = message_queue.wait()
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended')
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
@run_in_twisted_thread
def _enqueue_message(self, message):
if self._done:
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended')
NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
else:
self.message_queue.send(message)
@run_in_green_thread
def _send_nickname_response(self, chunk, code, reason):
response = make_response(chunk, code, reason)
try:
self.msrp_session.send_chunk(response)
except Exception:
pass
def accept_nickname(self, chunk):
if chunk.method != 'NICKNAME':
raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method)
self._send_nickname_response(chunk, 200, 'OK')
def reject_nickname(self, chunk, code, reason):
if chunk.method != 'NICKNAME':
raise ValueError('Incorrect chunk method for accept_nickname: %s' % chunk.method)
self._send_nickname_response(chunk, code, reason)
def send_message(self, content, content_type='text/plain', sender=None, recipients=None, timestamp=None, additional_headers=None, message_id=None, notify_progress=True):
message = QueuedMessage(content, content_type, sender=sender, recipients=recipients, timestamp=timestamp, additional_headers=additional_headers, id=message_id, notify_progress=notify_progress)
self._enqueue_message(message)
return message.id
def send_composing_indication(self, state, refresh=None, last_active=None, sender=None, recipients=None, message_id=None, notify_progress=False):
content = IsComposingDocument.create(state=State(state), refresh=Refresh(refresh) if refresh is not None else None, last_active=LastActive(last_active) if last_active is not None else None, content_type=ContentType('text'))
message = QueuedMessage(content, IsComposingDocument.content_type, sender=sender, recipients=recipients, id=message_id, notify_progress=notify_progress)
self._enqueue_message(message)
return message.id
@run_in_green_thread
def _set_local_nickname(self, nickname, message_id):
if self.msrp_session is None:
# should we generate ChatStreamDidNotSetNickname here?
return
chunk = self.msrp.make_request('NICKNAME')
chunk.add_header(UseNicknameHeader(nickname or u''))
try:
self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_nickname_transaction_response, message_id))
- except Exception, e:
+ except Exception as e:
self._failure_reason = str(e)
NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='sending', reason=self._failure_reason))
def set_local_nickname(self, nickname):
if not self.nickname_allowed:
raise ChatStreamError('Setting nickname is not supported')
message_id = '%x' % random.getrandbits(64)
self._set_local_nickname(nickname, message_id)
return message_id
diff --git a/sylk/tls.py b/sylk/tls.py
index 6106ae1..f6dc369 100644
--- a/sylk/tls.py
+++ b/sylk/tls.py
@@ -1,50 +1,50 @@
"""TLS helper classes"""
__all__ = ['Certificate', 'PrivateKey']
from gnutls.crypto import X509Certificate, X509PrivateKey
from application import log
from application.process import process
def file_content(file):
path = process.config_file(file)
if path is None:
raise Exception("File '%s' does not exist" % file)
try:
f = open(path, 'rt')
except Exception:
raise Exception("File '%s' could not be open" % file)
try:
return f.read()
finally:
f.close()
class Certificate(object):
"""Configuration data type. Used to create a gnutls.crypto.X509Certificate object
from a file given in the configuration file."""
def __new__(cls, value):
if isinstance(value, basestring):
try:
return X509Certificate(file_content(value))
- except Exception, e:
+ except Exception as e:
log.warn("Certificate file '%s' could not be loaded: %s" % (value, str(e)))
return None
else:
raise TypeError('value should be a string')
class PrivateKey(object):
"""Configuration data type. Used to create a gnutls.crypto.X509PrivateKey object
from a file given in the configuration file."""
def __new__(cls, value):
if isinstance(value, basestring):
try:
return X509PrivateKey(file_content(value))
- except Exception, e:
+ except Exception as e:
log.warn("Private key file '%s' could not be loaded: %s" % (value, str(e)))
return None
else:
raise TypeError('value should be a string')

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 4:03 AM (1 d, 2 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408771
Default Alt Text
(369 KB)

Event Timeline