Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/__init__.py b/sylk/applications/__init__.py
index 3ea8dd5..79608a8 100644
--- a/sylk/applications/__init__.py
+++ b/sylk/applications/__init__.py
@@ -1,306 +1,306 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details
#
__all__ = ['ISylkApplication', 'ApplicationRegistry', 'SylkApplication', 'IncomingRequestHandler', 'ApplicationLogger']
import abc
import os
import socket
import struct
import sys
from application import log
from application.configuration.datatypes import NetworkRange
from application.notification import IObserver, NotificationCenter
from application.python import Null
from application.python.types import Singleton
from itertools import chain
from sipsimple.threading import run_in_twisted_thread
from zope.interface import implements
from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig
SYLK_APP_HEADER = 'X-Sylk-App'
class ApplicationRegistry(object):
__metaclass__ = Singleton
def __init__(self):
self.applications = []
def __iter__(self):
return iter(self.applications)
def add(self, app):
if app not in self.applications:
self.applications.append(app)
class ApplicationName(object):
def __get__(self, obj, objtype):
name = objtype.__name__
return name[:-11].lower() if name.endswith('Application') else name.lower()
class SylkApplicationMeta(abc.ABCMeta, Singleton):
"""Metaclass for defining SylkServer applications: a Singleton that also adds them to the application registry"""
def __init__(cls, name, bases, dic):
super(SylkApplicationMeta, cls).__init__(name, bases, dic)
if name != 'SylkApplication':
ApplicationRegistry().add(cls)
class SylkApplication(object):
"""Base class for all SylkServer applications"""
__metaclass__ = SylkApplicationMeta
__appname__ = ApplicationName()
@abc.abstractmethod
def start(self):
pass
@abc.abstractmethod
def stop(self):
pass
@abc.abstractmethod
def incoming_session(self, session):
pass
@abc.abstractmethod
def incoming_subscription(self, subscribe_request, data):
pass
@abc.abstractmethod
def incoming_referral(self, refer_request, data):
pass
@abc.abstractmethod
def incoming_sip_message(self, message_request, data):
pass
def load_builtin_applications():
toplevel = os.path.dirname(__file__)
app_list = [item for item in os.listdir(toplevel) if os.path.isdir(os.path.join(toplevel, item)) and '__init__.py' in os.listdir(os.path.join(toplevel, item))]
for module in ['sylk.applications.%s' % item for item in set(app_list).difference(ServerConfig.disabled_applications)]:
__import__(module)
def load_extra_applications():
if ServerConfig.extra_applications_dir:
toplevel = os.path.realpath(os.path.abspath(ServerConfig.extra_applications_dir.normalized))
if os.path.isdir(toplevel):
app_list = [item for item in os.listdir(toplevel) if os.path.isdir(os.path.join(toplevel, item)) and '__init__.py' in os.listdir(os.path.join(toplevel, item))]
sys.path.append(toplevel)
for module in (item for item in set(app_list).difference(ServerConfig.disabled_applications)):
__import__(module)
def load_applications():
load_builtin_applications()
load_extra_applications()
[app() for app in ApplicationRegistry()]
class ApplicationNotLoadedError(Exception):
pass
class IncomingRequestHandler(object):
"""
Handle incoming requests and match them to applications.
"""
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
load_applications()
log.msg('Loaded applications: %s' % ', '.join([app.__appname__ for app in ApplicationRegistry()]))
self.application_map = dict((item.split(':')) for item in ServerConfig.application_map)
self.authorization_handler = AuthorizationHandler()
def start(self):
[app().start() for app in ApplicationRegistry()]
self.authorization_handler.start()
notification_center = NotificationCenter()
notification_center.add_observer(self, name='SIPSessionNewIncoming')
notification_center.add_observer(self, name='SIPIncomingSubscriptionGotSubscribe')
notification_center.add_observer(self, name='SIPIncomingReferralGotRefer')
notification_center.add_observer(self, name='SIPIncomingRequestGotRequest')
def stop(self):
self.authorization_handler.stop()
notification_center = NotificationCenter()
notification_center.remove_observer(self, name='SIPSessionNewIncoming')
notification_center.remove_observer(self, name='SIPIncomingSubscriptionGotSubscribe')
notification_center.remove_observer(self, name='SIPIncomingReferralGotRefer')
notification_center.remove_observer(self, name='SIPIncomingRequestGotRequest')
[app().stop() for app in ApplicationRegistry()]
def get_application(self, ruri, headers):
if SYLK_APP_HEADER in headers:
application = headers[SYLK_APP_HEADER].body.strip()
else:
application = ServerConfig.default_application
if self.application_map:
prefixes = ("%s@%s" % (ruri.user, ruri.host), ruri.host, ruri.user)
for prefix in prefixes:
if prefix in self.application_map:
application = self.application_map[prefix]
break
try:
- app = (app for app in ApplicationRegistry() if app.__appname__ == application).next()
+ app = next(app for app in ApplicationRegistry() if app.__appname__ == application)
except StopIteration:
log.error('Application %s is not loaded' % application)
raise ApplicationNotLoadedError
else:
return app()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionNewIncoming(self, notification):
session = notification.sender
try:
self.authorization_handler.authorize_source(session.peer_address.ip)
except UnauthorizedRequest:
session.reject(403)
return
try:
app = self.get_application(session._invitation.request_uri, notification.data.headers)
except ApplicationNotLoadedError:
session.reject(404)
else:
app.incoming_session(session)
def _NH_SIPIncomingSubscriptionGotSubscribe(self, notification):
subscribe_request = notification.sender
try:
self.authorization_handler.authorize_source(subscribe_request.peer_address.ip)
except UnauthorizedRequest:
subscribe_request.reject(403)
return
try:
app = self.get_application(notification.data.request_uri, notification.data.headers)
except ApplicationNotLoadedError:
subscribe_request.reject(404)
else:
app.incoming_subscription(subscribe_request, notification.data)
def _NH_SIPIncomingReferralGotRefer(self, notification):
refer_request = notification.sender
try:
self.authorization_handler.authorize_source(refer_request.peer_address.ip)
except UnauthorizedRequest:
refer_request.reject(403)
return
try:
app = self.get_application(notification.data.request_uri, notification.data.headers)
except ApplicationNotLoadedError:
refer_request.reject(404)
else:
app.incoming_referral(refer_request, notification.data)
def _NH_SIPIncomingRequestGotRequest(self, notification):
request = notification.sender
if notification.data.method != 'MESSAGE':
request.answer(405)
return
try:
self.authorization_handler.authorize_source(request.peer_address.ip)
except UnauthorizedRequest:
request.answer(403)
return
try:
app = self.get_application(notification.data.request_uri, notification.data.headers)
except ApplicationNotLoadedError:
request.answer(404)
else:
app.incoming_sip_message(request, notification.data)
class UnauthorizedRequest(Exception):
pass
class AuthorizationHandler(object):
implements(IObserver)
def __init__(self):
self.state = None
self.trusted_peers = SIPConfig.trusted_peers
self.thor_nodes = []
@property
def trusted_parties(self):
if ThorNodeConfig.enabled:
return self.thor_nodes
return self.trusted_peers
def start(self):
NotificationCenter().add_observer(self, name='ThorNetworkGotUpdate')
self.state = 'started'
def stop(self):
self.state = 'stopped'
NotificationCenter().remove_observer(self, name='ThorNetworkGotUpdate')
def authorize_source(self, ip_address):
if self.state != 'started':
raise UnauthorizedRequest
for range in self.trusted_parties:
if struct.unpack('!L', socket.inet_aton(ip_address))[0] & range[1] == range[0]:
return True
raise UnauthorizedRequest
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_ThorNetworkGotUpdate(self, notification):
thor_nodes = []
for node in chain(*(n.nodes for n in notification.data.networks.values())):
thor_nodes.append(NetworkRange(node))
self.thor_nodes = thor_nodes
class ApplicationLogger(object):
__metaclass__ = Singleton
@classmethod
def for_package(cls, package):
return cls(package.split('.')[-1])
def __init__(self, prefix):
self.prefix = '[%s] ' % prefix
def info(self, message, **context):
log.info(self.prefix+message, **context)
def warning(self, message, **context):
log.warning(self.prefix+message, **context)
def debug(self, message, **context):
log.debug(self.prefix+message, **context)
def error(self, message, **context):
log.error(self.prefix+message, **context)
def critical(self, message, **context):
log.critical(self.prefix+message, **context)
def exception(self, message=None, **context):
if message is not None:
message = self.prefix+message
log.exception(message, **context)
# Some aliases that are commonly used
msg = info
warn = warning
fatal = critical
err = exception
diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py
index bec96d7..3875462 100644
--- a/sylk/applications/conference/__init__.py
+++ b/sylk/applications/conference/__init__.py
@@ -1,430 +1,430 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details
#
import mimetypes
import os
import re
from application.notification import IObserver, NotificationCenter
from application.python import Null
from gnutls.interfaces.twisted import X509Credentials
from sipsimple.account import AccountManager
from sipsimple.account.bonjour import BonjourPresenceState
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, SIPURI, SIPCoreError, Header, ContactHeader, FromHeader, ToHeader
from sipsimple.lookup import DNSLookup
from sipsimple.streams import AudioStream
from sipsimple.threading.green import run_in_green_thread
from twisted.internet import reactor
from zope.interface import implements
from sylk.applications import SylkApplication
from sylk.applications.conference.configuration import get_room_config, ConferenceConfig
from sylk.applications.conference.database import initialize as init_database
from sylk.applications.conference.logger import log
from sylk.applications.conference.room import Room
from sylk.applications.conference.web import ScreenSharingWebServer
from sylk.bonjour import BonjourServices
from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig
from sylk.extensions import ChatStream
from sylk.session import Session
from sylk.tls import Certificate, PrivateKey
class ACLValidationError(Exception): pass
class RoomNotFoundError(Exception): pass
class ConferenceApplication(SylkApplication):
implements(IObserver)
def __init__(self):
self._rooms = {}
self.pending_sessions = []
self.invited_participants_map = {}
self.bonjour_focus_service = Null()
self.bonjour_room_service = Null()
self.screen_sharing_web_server = None
def start(self):
init_database()
if ServerConfig.enable_bonjour:
self.bonjour_focus_service = BonjourServices(service='sipfocus')
self.bonjour_focus_service.start()
log.msg("Bonjour publication started for service 'sipfocus'")
self.bonjour_room_service = BonjourServices(service='sipuri', name='Conference Room', uri_user='conference')
self.bonjour_room_service.start()
self.bonjour_room_service.presence_state = BonjourPresenceState('available', u'No participants')
log.msg("Bonjour publication started for service 'sipuri'")
self.screen_sharing_web_server = ScreenSharingWebServer(ConferenceConfig.screen_sharing_dir)
if ConferenceConfig.screen_sharing_use_https and ConferenceConfig.screen_sharing_certificate is not None:
cert = Certificate(ConferenceConfig.screen_sharing_certificate.normalized)
key = PrivateKey(ConferenceConfig.screen_sharing_certificate.normalized)
credentials = X509Credentials(cert, key)
else:
credentials = None
self.screen_sharing_web_server.start(ConferenceConfig.screen_sharing_ip, ConferenceConfig.screen_sharing_port, credentials)
listen_address = self.screen_sharing_web_server.listener.getHost()
log.msg("ScreenSharing listener started on %s:%d" % (listen_address.host, listen_address.port))
def stop(self):
self.bonjour_focus_service.stop()
self.bonjour_room_service.stop()
self.screen_sharing_web_server.stop()
def get_room(self, uri, create=False):
room_uri = '%s@%s' % (uri.user, uri.host)
try:
room = self._rooms[room_uri]
except KeyError:
if create:
room = Room(room_uri)
self._rooms[room_uri] = room
return room
else:
raise RoomNotFoundError
else:
return room
def remove_room(self, uri):
room_uri = '%s@%s' % (uri.user, uri.host)
self._rooms.pop(room_uri, None)
def validate_acl(self, room_uri, from_uri):
room_uri = '%s@%s' % (room_uri.user, room_uri.host)
cfg = get_room_config(room_uri)
if cfg.access_policy == 'allow,deny':
if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri):
return
raise ACLValidationError
else:
if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri):
raise ACLValidationError
def incoming_session(self, session):
log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri))
audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat']
transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer']
if not audio_streams and not chat_streams and not transfer_streams:
log.msg(u'Session rejected: invalid media, only RTP audio and MSRP chat are supported')
session.reject(488)
return
try:
self.validate_acl(session._invitation.request_uri, session.remote_identity.uri)
except ACLValidationError:
log.msg(u'Session rejected: unauthorized by access list')
session.reject(403)
return
# Check if requested files belong to this room
for stream in (stream for stream in transfer_streams if stream.direction == 'sendonly'):
try:
room = self.get_room(session._invitation.request_uri)
except RoomNotFoundError:
log.msg(u'Session rejected: room not found')
session.reject(404)
return
try:
- file = (file for file in room.files if file.hash == stream.file_selector.hash).next()
+ file = next(file for file in room.files if file.hash == stream.file_selector.hash)
except StopIteration:
log.msg(u'Session rejected: requested file not found')
session.reject(404)
return
filename = os.path.basename(file.name)
for dirpath, dirnames, filenames in os.walk(os.path.join(ConferenceConfig.file_transfer_dir, room.uri)):
if filename in filenames:
path = os.path.join(dirpath, filename)
stream.file_selector.fd = open(path, 'r')
if stream.file_selector.size is None:
stream.file_selector.size = os.fstat(stream.file_selector.fd.fileno()).st_size
if stream.file_selector.type is None:
mime_type, encoding = mimetypes.guess_type(filename)
if encoding is not None:
type = 'application/x-%s' % encoding
elif mime_type is not None:
type = mime_type
else:
type = 'application/octet-stream'
stream.file_selector.type = type
break
else:
# File got removed from the filesystem
log.msg(u'Session rejected: requested file removed from the filesystem')
session.reject(404)
return
self.pending_sessions.append(session)
NotificationCenter().add_observer(self, sender=session)
if audio_streams:
session.send_ring_indication()
streams = [streams[0] for streams in (audio_streams, chat_streams, transfer_streams) if streams]
reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams)
def incoming_subscription(self, subscribe_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
if Null in (from_header, to_header):
subscribe_request.reject(400)
return
if subscribe_request.event != 'conference':
log.msg(u'Subscription rejected: only conference event is supported')
subscribe_request.reject(489)
return
try:
self.validate_acl(data.request_uri, from_header.uri)
except ACLValidationError:
try:
self.validate_acl(to_header.uri, from_header.uri)
except ACLValidationError:
# Check if we need to skip the ACL because this was an invited participant
if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or
str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})):
log.msg(u'Subscription rejected: unauthorized by access list')
subscribe_request.reject(403)
return
try:
room = self.get_room(data.request_uri)
except RoomNotFoundError:
try:
room = self.get_room(to_header.uri)
except RoomNotFoundError:
log.msg(u'Subscription rejected: room not yet created')
subscribe_request.reject(480)
return
if not room.started:
log.msg(u'Subscription rejected: room not started yet')
subscribe_request.reject(480)
else:
room.handle_incoming_subscription(subscribe_request, data)
def incoming_referral(self, refer_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
refer_to_header = data.headers.get('Refer-To', Null)
if Null in (from_header, to_header, refer_to_header):
refer_request.reject(400)
return
log.msg(u'Room %s - join request from %s to %s' % ('%s@%s' % (to_header.uri.user, to_header.uri.host), from_header.uri, refer_to_header.uri))
try:
self.validate_acl(data.request_uri, from_header.uri)
except ACLValidationError:
log.msg(u'Room %s - invite participant request rejected: unauthorized by access list' % data.request_uri)
refer_request.reject(403)
return
referral_handler = IncomingReferralHandler(refer_request, data)
referral_handler.start()
def incoming_sip_message(self, message_request, data):
log.msg(u'SIP Message is not supported, use MSRP media instead')
message_request.answer(405)
def accept_session(self, session, streams):
if session in self.pending_sessions:
session.accept(streams, is_focus=True)
def add_participant(self, session, room_uri):
# Keep track of the invited participants, we must skip ACL policy
# for SUBSCRIBE requests
room_uri_str = '%s@%s' % (room_uri.user, room_uri.host)
log.msg(u'Room %s - outgoing session to %s started' % (room_uri_str, session.remote_identity.uri))
d = self.invited_participants_map.setdefault(room_uri_str, {})
d.setdefault(str(session.remote_identity.uri), 0)
d[str(session.remote_identity.uri)] += 1
NotificationCenter().add_observer(self, sender=session)
room = self.get_room(room_uri, True)
room.start()
room.add_session(session)
def remove_participant(self, participant_uri, room_uri):
try:
room = self.get_room(room_uri)
except RoomNotFoundError:
pass
else:
log.msg('Room %s - %s removed from conference' % (room_uri, participant_uri))
room.terminate_sessions(participant_uri)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
session = notification.sender
self.pending_sessions.remove(session)
room = self.get_room(session._invitation.request_uri, True) # FIXME
room.start()
room.add_session(session)
@run_in_green_thread
def _NH_SIPSessionDidEnd(self, notification):
session = notification.sender
notification.center.remove_observer(self, sender=session)
if session.direction == 'incoming':
room_uri = session._invitation.request_uri # FIXME
else:
# Clear invited participants mapping
room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host)
d = self.invited_participants_map[room_uri_str]
d[str(session.remote_identity.uri)] -= 1
if d[str(session.remote_identity.uri)] == 0:
del d[str(session.remote_identity.uri)]
room_uri = session.local_identity.uri
# We could get this notifiction even if we didn't get SIPSessionDidStart
try:
room = self.get_room(room_uri)
except RoomNotFoundError:
return
if session in room.sessions:
room.remove_session(session)
if not room.stopping and room.empty:
self.remove_room(room_uri)
room.stop()
def _NH_SIPSessionDidFail(self, notification):
session = notification.sender
self.pending_sessions.remove(session)
notification.center.remove_observer(self, sender=session)
log.msg(u'Session from %s failed: %s' % (session.remote_identity.uri, notification.data.reason))
class IncomingReferralHandler(object):
implements(IObserver)
def __init__(self, refer_request, data):
self._refer_request = refer_request
self._refer_headers = data.headers
self.room_uri = data.request_uri
self.room_uri_str = '%s@%s' % (self.room_uri.user, self.room_uri.host)
self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri)
self.method = data.headers.get('Refer-To').parameters.get('method', 'INVITE').upper()
self.session = None
self.streams = []
def start(self):
if not self.refer_to_uri.startswith(('sip:', 'sips:')):
self.refer_to_uri = 'sip:%s' % self.refer_to_uri
try:
self.refer_to_uri = SIPURI.parse(self.refer_to_uri)
except SIPCoreError:
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.reject(488)
return
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._refer_request)
if self.method == 'INVITE':
self._refer_request.accept()
settings = SIPSimpleSettings()
account = AccountManager().sylkserver_account
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = self.refer_to_uri
lookup = DNSLookup()
notification_center.add_observer(self, sender=lookup)
lookup.lookup_sip_proxy(uri, settings.sip.transport_list)
elif self.method == 'BYE':
log.msg('Room %s - %s removed %s from the room' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri))
self._refer_request.accept()
conference_application = ConferenceApplication()
conference_application.remove_participant(self.refer_to_uri, self.room_uri)
self._refer_request.end(200)
else:
self._refer_request.reject(488)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_DNSLookupDidSucceed(self, notification):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=notification.sender)
account = AccountManager().sylkserver_account
conference_application = ConferenceApplication()
try:
room = conference_application.get_room(self.room_uri)
except RoomNotFoundError:
log.msg('Room %s - failed to add %s to %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.end(500)
return
else:
active_media = room.active_media
if not active_media:
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.end(500)
return
if 'audio' in active_media:
self.streams.append(AudioStream())
if 'chat' in active_media:
self.streams.append(ChatStream())
self.session = Session(account)
notification_center.add_observer(self, sender=self.session)
original_from_header = self._refer_headers.get('From')
if original_from_header.display_name:
original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host)
else:
original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host)
from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call')
to_header = ToHeader(self.refer_to_uri)
transport = notification.data.result[0].transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters))
extra_headers = []
if self._refer_headers.get('Referred-By', None) is not None:
extra_headers.append(Header.new(self._refer_headers.get('Referred-By')))
else:
extra_headers.append(Header('Referred-By', str(original_from_header.uri)))
if ThorNodeConfig.enabled:
extra_headers.append(Header('Thor-Scope', 'conference-invitation'))
extra_headers.append(Header('X-Originator-From', str(original_from_header.uri)))
subject = u'Join conference request from %s' % original_identity
self.session.connect(from_header, to_header, contact_header=contact_header, routes=notification.data.result, streams=self.streams, is_focus=True, subject=subject, extra_headers=extra_headers)
def _NH_DNSLookupDidFail(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
def _NH_SIPSessionGotRingIndication(self, notification):
if self._refer_request is not None:
self._refer_request.send_notify(180)
def _NH_SIPSessionGotProvisionalResponse(self, notification):
if self._refer_request is not None:
self._refer_request.send_notify(notification.data.code, notification.data.reason)
def _NH_SIPSessionDidStart(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(200)
conference_application = ConferenceApplication()
conference_application.add_participant(self.session, self.room_uri)
log.msg('Room %s - %s added %s' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri))
self.session = None
self.streams = []
def _NH_SIPSessionDidFail(self, notification):
log.msg('Room %s - failed to add %s: %s' % (self.room_uri_str, self.refer_to_uri, notification.data.reason))
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(notification.data.code or 500, notification.data.reason or notification.data.code)
self.session = None
self.streams = []
def _NH_SIPSessionDidEnd(self, notification):
# If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(200)
self.session = None
self.streams = []
def _NH_SIPIncomingReferralDidEnd(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
self._refer_request = None
diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py
index 85487a1..94a9a00 100644
--- a/sylk/applications/conference/room.py
+++ b/sylk/applications/conference/room.py
@@ -1,1156 +1,1156 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details.
#
import hashlib
import os
import random
import re
import shutil
import string
import weakref
from collections import defaultdict
from datetime import datetime
from glob import glob
from itertools import chain, count, cycle
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.system import makedirs
from eventlib import api, coros, proc
from sipsimple.account import AccountManager
from sipsimple.account.bonjour import BonjourPresenceState
from sipsimple.application import SIPApplication
from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, SIPCoreError, SIPCoreInvalidStateError, SIPURI
from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import conference
from sipsimple.streams import FileTransferStream
from sipsimple.streams.applications.chat import CPIMIdentity
from sipsimple.streams.msrp import ChatStreamError, FileSelector
from sipsimple.threading import run_in_thread, run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from twisted.internet import reactor
from zope.interface import implements
from sylk.applications.conference import database
from sylk.applications.conference.configuration import get_room_config, ConferenceConfig
from sylk.applications.conference.logger import log
from sylk.bonjour import BonjourServices
from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig
from sylk.configuration.datatypes import ResourcePath, URL
from sylk.session import Session
def format_identity(identity, cpim_format=False):
uri = identity.uri
if identity.display_name:
return u'%s <sip:%s@%s>' % (identity.display_name, uri.user, uri.host)
elif cpim_format:
return u'<sip:%s@%s>' % (uri.user, uri.host)
else:
return u'sip:%s@%s' % (uri.user, uri.host)
class ScreenImage(object):
def __init__(self, room, sender):
self.room = weakref.ref(room)
self.room_uri = room.uri
self.sender = sender
self.filename = os.path.join(ConferenceConfig.screen_sharing_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10))))
from sylk.applications.conference import ConferenceApplication
port = ConferenceApplication().screen_sharing_web_server.port
scheme = 'https' if ConferenceConfig.screen_sharing_use_https else 'http'
self.url = URL('%s://%s:%s/' % (scheme, ConferenceConfig.screen_sharing_ip.normalized, port))
self.url.query_items['image'] = os.path.join(room.uri, os.path.basename(self.filename))
self.state = None
self.timer = None
@property
def active(self):
return self.state == 'active'
@property
def idle(self):
return self.state == 'idle'
@run_in_thread('file-io')
def save(self, image):
makedirs(os.path.dirname(self.filename))
tmp_filename = self.filename + '.tmp'
try:
with open(tmp_filename, 'wb') as file:
file.write(image)
except EnvironmentError, e:
log.msg('Room %s - cannot write screen sharing image: %s: %s' % (self.room_uri, self.filename, e))
else:
try:
os.rename(tmp_filename, self.filename)
except EnvironmentError:
pass
self.advertise()
@run_in_twisted_thread
def advertise(self):
if self.state == 'active':
self.timer.reset(10)
else:
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.state = 'active'
self.timer = reactor.callLater(10, self.stop_advertising)
room = self.room() or Null
room.dispatch_conference_info()
txt = 'Room %s - %s is sharing the screen at %s' % (self.room_uri, format_identity(self.sender, cpim_format=True), self.url)
room.dispatch_server_message(txt)
log.msg(txt)
@run_in_twisted_thread
def stop_advertising(self):
if self.state != 'idle':
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.state = 'idle'
self.timer = None
room = self.room() or Null
room.dispatch_conference_info()
txt = '%s stopped sharing the screen' % format_identity(self.sender, cpim_format=True)
room.dispatch_server_message(txt)
log.msg(txt)
class Room(object):
"""
Object representing a conference room, it will handle the message dispatching
among all the participants.
"""
implements(IObserver)
def __init__(self, uri):
self.config = get_room_config(uri)
self.uri = uri
self.identity = CPIMIdentity(SIPURI.parse('sip:%s' % self.uri), display_name='Conference Room')
self.files = []
self.screen_images = {}
self.sessions = []
self.sessions_with_proposals = {}
self.subscriptions = []
self.transfer_handlers = weakref.WeakSet()
self.state = 'stopped'
self.incoming_message_queue = coros.queue()
self.message_dispatcher = None
self.audio_conference = None
self.moh_player = None
self.conference_info_payload = None
self.conference_info_version = count(1)
self.bonjour_services = Null()
self.session_nickname_map = {}
self.last_nicknames_map = {}
self.participants_counter = defaultdict(lambda: 0)
@property
def empty(self):
return len(self.sessions) == 0
@property
def started(self):
return self.state == 'started'
@property
def stopping(self):
return self.state in ('stopping', 'stopped')
@property
def active_media(self):
return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams))))
def start(self):
if self.started:
return
if ServerConfig.enable_bonjour and self.identity.uri.user != 'conference':
room_user = self.identity.uri.user
self.bonjour_services = BonjourServices(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user)
self.bonjour_services.start()
self.message_dispatcher = proc.spawn(self._message_dispatcher)
self.audio_conference = AudioConference()
self.audio_conference.hold()
self.moh_player = MoHPlayer(self.audio_conference)
self.moh_player.start()
self.state = 'started'
def stop(self):
if not self.started:
return
self.state = 'stopping'
self.bonjour_services.stop()
self.bonjour_services = None
self.incoming_message_queue.send_exception(api.GreenletExit)
self.incoming_message_queue = None
self.message_dispatcher.kill(proc.ProcExit)
self.message_dispatcher = None
self.moh_player.stop()
self.moh_player = None
self.audio_conference = None
[handler.stop() for handler in self.transfer_handlers]
notification_center = NotificationCenter()
for subscription in self.subscriptions:
notification_center.remove_observer(self, sender=subscription)
subscription.end()
self.subscriptions = []
self.cleanup_files()
self.conference_info_payload = None
self.state = 'stopped'
@run_in_thread('file-io')
def cleanup_files(self):
path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri)
try:
shutil.rmtree(path)
except EnvironmentError:
pass
path = os.path.join(ConferenceConfig.screen_sharing_dir, self.uri)
try:
shutil.rmtree(path)
except EnvironmentError:
pass
def _message_dispatcher(self):
"""Read from self.incoming_message_queue and dispatch the messages to other participants"""
while True:
session, message_type, data = self.incoming_message_queue.wait()
if message_type == 'message':
message = data.message
if message.sender.uri != session.remote_identity.uri:
continue
if message.body.startswith('?OTR:'):
continue
if message.timestamp is not None:
value = message.timestamp
timestamp = datetime(value.year, value.month, value.day, value.hour, value.minute, value.second, value.microsecond, value.tzinfo)
else:
timestamp = datetime.utcnow()
recipient = message.recipients[0]
sender = message.sender
sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), sender.display_name)
message.sender = sender
database.async_save_message(format_identity(session.remote_identity, True), self.uri, message.body, message.content_type, unicode(message.sender), unicode(recipient), timestamp)
private = len(message.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri
if private:
self.dispatch_private_message(session, message)
else:
self.dispatch_message(session, message)
elif message_type == 'composing_indication':
if data.sender.uri != session.remote_identity.uri:
continue
recipient = data.recipients[0]
private = len(message.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri
if private:
self.dispatch_private_iscomposing(session, data)
else:
self.dispatch_iscomposing(session, data)
def dispatch_message(self, session, message):
for s in (s for s in self.sessions if s is not session):
try:
- chat_stream = (stream for stream in s.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
try:
chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers)
except ChatStreamError, e:
log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e))
def dispatch_private_message(self, session, message):
# Private messages are delivered to all sessions matching the recipient but also to the sender,
# for replication in clients
recipient = message.recipients[0]
for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)):
try:
- chat_stream = (stream for stream in s.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
try:
chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers)
except ChatStreamError, e:
log.error(u'Error dispatching private message to %s: %s' % (s.remote_identity.uri, e))
def dispatch_iscomposing(self, session, data):
for s in (s for s in self.sessions if s is not session):
try:
- chat_stream = (stream for stream in s.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
identity = CPIMIdentity.parse(format_identity(session.remote_identity, True))
try:
chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity, recipients=[self.identity])
except ChatStreamError, e:
log.error(u'Error dispatching composing indication to %s: %s' % (s.remote_identity.uri, e))
def dispatch_private_iscomposing(self, session, data):
recipient_uri = data.recipients[0].uri
for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri):
try:
- chat_stream = (stream for stream in s.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
identity = CPIMIdentity.parse(format_identity(session.remote_identity, True))
try:
chat_stream.send_composing_indication(data.state, data.refresh, local_identity=identity)
except ChatStreamError, e:
log.error(u'Error dispatching private composing indication to %s: %s' % (s.remote_identity.uri, e))
def dispatch_server_message(self, body, content_type='text/plain', exclude=None):
for session in (session for session in self.sessions if session is not exclude):
try:
- chat_stream = (stream for stream in session.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
continue
chat_stream.send_message(body, content_type, local_identity=self.identity, recipients=[self.identity])
self_identity = format_identity(self.identity, cpim_format=True)
database.async_save_message(self_identity, self.uri, body, content_type, self_identity, self_identity, datetime.now())
def dispatch_conference_info(self):
data = self.build_conference_info_payload()
for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'):
try:
subscription.push_content(conference.ConferenceDocument.content_type, data)
except (SIPCoreError, SIPCoreInvalidStateError):
pass
def dispatch_file(self, file):
sender_uri = CPIMIdentity.parse(file.sender).uri
for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)):
handler = OutgoingFileTransferHandler(self, uri, file)
self.transfer_handlers.add(handler)
handler.start()
def add_session(self, session):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=session)
self.sessions.append(session)
remote_uri = str(session.remote_identity.uri)
self.participants_counter[remote_uri] += 1
try:
- chat_stream = (stream for stream in session.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=chat_stream)
try:
- audio_stream = (stream for stream in session.streams if stream.type == 'audio').next()
+ audio_stream = next(stream for stream in session.streams if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=audio_stream)
log.msg(u'Room %s - audio stream %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (self.uri, audio_stream.codec, audio_stream.sample_rate,
'encrypted' if audio_stream.srtp_active else 'unencrypted',
audio_stream.local_rtp_address, audio_stream.local_rtp_port,
audio_stream.remote_rtp_address, audio_stream.remote_rtp_port))
try:
- transfer_stream = (stream for stream in session.streams if stream.type == 'file-transfer').next()
+ transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer')
except StopIteration:
pass
else:
if transfer_stream.direction == 'recvonly':
transfer_handler = IncomingFileTransferHandler(self, session)
transfer_handler.start()
txt = u'Room %s - %s is uploading file %s (%s)' % (self.uri, format_identity(session.remote_identity, cpim_format=True), transfer_stream.file_selector.name.decode('utf-8'), self.format_file_size(transfer_stream.file_selector.size))
else:
transfer_handler = OutgoingFileTransferRequestHandler(self, session)
transfer_handler.start()
txt = u'Room %s - %s requested file %s' % (self.uri, format_identity(session.remote_identity, cpim_format=True), transfer_stream.file_selector.name.decode('utf-8'))
log.msg(txt)
self.dispatch_server_message(txt)
if len(session.streams) == 1:
return
welcome_handler = WelcomeHandler(self, session)
welcome_handler.start()
self.dispatch_conference_info()
if len(self.sessions) == 1:
log.msg(u'Room %s - started by %s with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams)))
else:
log.msg(u'Room %s - %s joined with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams)))
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session)
if ServerConfig.enable_bonjour:
self._update_bonjour_presence()
def remove_session(self, session):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=session)
self.sessions.remove(session)
self.session_nickname_map.pop(session, None)
remote_uri = str(session.remote_identity.uri)
self.participants_counter[remote_uri] -= 1
if self.participants_counter[remote_uri] <= 0:
del self.participants_counter[remote_uri]
self.last_nicknames_map.pop(remote_uri, None)
try:
timer = self.sessions_with_proposals.pop(session)
except KeyError:
pass
else:
if timer.active():
timer.cancel()
try:
- chat_stream = (stream for stream in session.streams or [] if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=chat_stream)
try:
- audio_stream = (stream for stream in session.streams or [] if stream.type == 'audio').next()
+ audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=audio_stream)
try:
self.audio_conference.remove(audio_stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.moh_player.pause()
self.audio_conference.hold()
elif len(self.audio_conference.streams) == 1:
self.moh_player.play()
try:
next(stream for stream in session.streams if stream.type == 'file-transfer')
except StopIteration:
pass
else:
if len(session.streams) == 1:
return
self.dispatch_conference_info()
log.msg(u'Room %s - %s left conference after %s' % (self.uri, format_identity(session.remote_identity), self.format_session_duration(session)))
if not self.sessions:
log.msg(u'Room %s - Last participant left conference' % self.uri)
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session)))
if ServerConfig.enable_bonjour:
self._update_bonjour_presence()
def terminate_sessions(self, uri):
if not self.started:
return
for session in (session for session in self.sessions if session.remote_identity.uri == uri):
session.end()
def build_conference_info_payload(self):
if self.conference_info_payload is None:
settings = SIPSimpleSettings()
conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent)
conference_description.conf_uris = conference.ConfUris()
conference_description.conf_uris.add(conference.ConfUrisEntry('sip:%s' % self.uri, purpose='participation'))
if self.config.advertise_xmpp_support:
conference_description.conf_uris.add(conference.ConfUrisEntry('xmpp:%s' % self.uri, purpose='participation'))
# TODO: add grouptextchat service uri
for number in self.config.pstn_access_numbers:
conference_description.conf_uris.add(conference.ConfUrisEntry('tel:%s' % number, purpose='participation'))
host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com'))
self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users())
self.conference_info_payload.version = next(self.conference_info_version)
user_count = len(self.participants_counter.keys())
self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True)
users = conference.Users()
for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')):
try:
- user = (user for user in users if user.entity == str(session.remote_identity.uri)).next()
+ user = next(user for user in users if user.entity == str(session.remote_identity.uri))
except StopIteration:
display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name)
user = conference.User(str(session.remote_identity.uri), display_text=display_text)
user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host)
screen_image = self.screen_images.get(user_uri, None)
if screen_image is not None and screen_image.active:
user.screen_image_url = screen_image.url
users.add(user)
joining_info = conference.JoiningInfo(when=session.start_time)
holdable_streams = [stream for stream in session.streams if stream.hold_supported]
session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams)
hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected')
display_text = self.session_nickname_map.get(session, session.remote_identity.display_name)
endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status)
for stream in session.streams:
if stream.type == 'file-transfer':
continue
endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream)))
user.add(endpoint)
self.conference_info_payload.users = users
if self.files:
files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, file.status) for file in self.files)
self.conference_info_payload.conference_description.resources = conference.Resources(files=files)
return self.conference_info_payload.toxml()
def handle_incoming_subscription(self, subscribe_request, data):
log.msg('Room %s - subscription from %s' % (self.uri, data.headers['From'].uri))
if subscribe_request.event != 'conference':
log.msg('Room %s - Subscription rejected: only conference event is supported' % self.uri)
subscribe_request.reject(489)
return
NotificationCenter().add_observer(self, sender=subscribe_request)
self.subscriptions.append(subscribe_request)
data = self.build_conference_info_payload()
subscribe_request.accept(conference.ConferenceDocument.content_type, data)
def accept_proposal(self, session, streams):
self.sessions_with_proposals.pop(session)
session.accept_proposal(streams)
def add_file(self, file):
if file.status == 'INCOMPLETE':
self.dispatch_server_message('%s has cancelled upload of file %s (%s)' % (file.sender, os.path.basename(file.name), self.format_file_size(file.size)))
else:
self.dispatch_server_message('%s has uploaded file %s (%s)' % (file.sender, os.path.basename(file.name), self.format_file_size(file.size)))
self.files.append(file)
self.dispatch_conference_info()
if ConferenceConfig.push_file_transfer:
self.dispatch_file(file)
def add_screen_image(self, sender, image):
sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host)
screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender))
screen_image.save(image)
def _update_bonjour_presence(self):
num = len(self.sessions)
if num == 0:
num_str = 'No'
elif num == 1:
num_str = 'One'
elif num == 2:
num_str = 'Two'
else:
num_str = str(num)
txt = u'%s participant%s' % (num_str, '' if num==1 else 's')
presence_state = BonjourPresenceState('available', txt)
if self.bonjour_services is Null:
# This is the room being published all the time
from sylk.applications.conference import ConferenceApplication
ConferenceApplication().bonjour_room_service.presence_state = presence_state
else:
self.bonjour_services.presence_state = presence_state
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_AudioStreamDidTimeout(self, notification):
stream = notification.sender
session = stream.session
log.msg(u'Room %s - audio stream for session %s timed out' % (self.uri, format_identity(session.remote_identity)))
if session.streams == [stream]:
session.end()
def _NH_ChatStreamGotMessage(self, notification):
stream = notification.sender
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
data = notification.data
session = notification.sender.session
message = data.message
content_type = message.content_type.lower()
if content_type.startswith('text/'):
self.incoming_message_queue.send((session, 'message', data))
elif content_type == 'application/blink-screensharing':
self.add_screen_image(message.sender, message.body)
def _NH_ChatStreamGotComposingIndication(self, notification):
stream = notification.sender
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
data = notification.data
session = notification.sender.session
self.incoming_message_queue.send((session, 'composing_indication', data))
def _NH_ChatStreamGotNicknameRequest(self, notification):
nickname = notification.data.nickname
session = notification.sender.session
chunk = notification.data.chunk
if nickname:
if nickname in self.session_nickname_map.values() and (session not in self.session_nickname_map or self.session_nickname_map[session] != nickname):
notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use')
return
self.session_nickname_map[session] = nickname
self.last_nicknames_map[str(session.remote_identity.uri)] = nickname
else:
self.session_nickname_map.pop(session, None)
self.last_nicknames_map.pop(str(session.remote_identity.uri), None)
notification.sender.accept_nickname(chunk)
self.dispatch_conference_info()
def _NH_SIPIncomingSubscriptionDidEnd(self, notification):
subscription = notification.sender
try:
self.subscriptions.remove(subscription)
except ValueError:
pass
else:
notification.center.remove_observer(self, sender=subscription)
def _NH_SIPSessionDidChangeHoldState(self, notification):
session = notification.sender
if notification.data.originator == 'remote':
if notification.data.on_hold:
log.msg(u'Room %s - %s has put the audio session on hold' % (self.uri, format_identity(session.remote_identity)))
else:
log.msg(u'Room %s - %s has taken the audio session out of hold' % (self.uri, format_identity(session.remote_identity)))
self.dispatch_conference_info()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat']
if not audio_streams and not chat_streams:
session.reject_proposal()
return
streams = [streams[0] for streams in (audio_streams, chat_streams) if streams]
timer = reactor.callLater(4, self.accept_proposal, session, streams)
self.sessions_with_proposals[session] = timer
def _NH_SIPSessionProposalRejected(self, notification):
session = notification.sender
try:
timer = self.sessions_with_proposals.pop(session)
except KeyError:
# If the proposal couldn't be accepted by us we will not add a timer
pass
else:
if timer.active():
timer.cancel()
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
session = notification.sender
for stream in notification.data.added_streams:
notification.center.add_observer(self, sender=stream)
txt = u'%s has added %s' % (format_identity(session.remote_identity), stream.type)
log.msg(u'Room %s - %s' % (self.uri, txt))
self.dispatch_server_message(txt, exclude=session)
if stream.type == 'audio':
log.msg(u'Room %s - audio stream %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (self.uri, stream.codec, stream.sample_rate,
'encrypted' if stream.srtp_active else 'unencrypted',
stream.local_rtp_address, stream.local_rtp_port,
stream.remote_rtp_address, stream.remote_rtp_port))
welcome_handler = WelcomeHandler(self, session)
welcome_handler.start(welcome_prompt=False)
for stream in notification.data.removed_streams:
notification.center.remove_observer(self, sender=stream)
txt = u'%s has removed %s' % (format_identity(session.remote_identity), stream.type)
log.msg(u'Room %s - %s' % (self.uri, txt))
self.dispatch_server_message(txt, exclude=session)
if stream.type == 'audio':
try:
self.audio_conference.remove(stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.moh_player.pause()
self.audio_conference.hold()
elif len(self.audio_conference.streams) == 1:
self.moh_player.play()
if not session.streams:
log.msg(u'Room %s - %s has removed all streams, session will be terminated' % (self.uri, format_identity(session.remote_identity)))
session.end()
self.dispatch_conference_info()
def _NH_SIPSessionTransferNewIncoming(self, notification):
log.msg(u'Room %s - Call transfer request rejected, REFER must be out of dialog (RFC4579 5.5)' % self.uri)
notification.sender.reject_transfer(403)
@staticmethod
def format_stream_types(streams):
if not streams:
return ''
if len(streams) == 1:
txt = 'with %s' % streams[0].type
else:
txt = 'with %s' % ','.join(stream.type for stream in streams[:-1])
txt += ' and %s' % streams[-1:][0].type
return txt
@staticmethod
def format_conference_stream_type(stream):
if stream.type == 'chat':
return 'message'
return stream.type
@staticmethod
def format_session_duration(session):
if session.start_time:
duration = session.end_time - session.start_time
seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1
minutes, seconds = seconds / 60, seconds % 60
hours, minutes = minutes / 60, minutes % 60
hours += duration.days*24
if not minutes and not hours:
duration_text = '%d seconds' % seconds
elif not hours:
duration_text = '%02d:%02d' % (minutes, seconds)
else:
duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds)
else:
duration_text = '0s'
return duration_text
@staticmethod
def format_file_size(size):
infinite = float('infinity')
boundaries = [( 1024, '%d bytes', 1),
( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0),
( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0),
(10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)]
for boundary, format, divisor in boundaries:
if size < boundary:
return format % (size/divisor,)
else:
return "%d bytes" % size
class MoHPlayer(object):
implements(IObserver)
def __init__(self, conference):
self.conference = conference
self.files = None
self.paused = None
self._player = None
def start(self):
files = glob('%s/*.wav' % ResourcePath('sounds/moh').normalized)
if not files:
log.error(u'No files found, MoH is disabled')
return
random.shuffle(files)
self.files = cycle(files)
self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_delay=1, volume=20)
self.paused = True
self.conference.bridge.add(self._player)
NotificationCenter().add_observer(self, sender=self._player)
def stop(self):
if self._player is None:
return
NotificationCenter().remove_observer(self, sender=self._player)
self._player.stop()
self.paused = True
self.conference.bridge.remove(self._player)
self.conference = None
def play(self):
if self._player is not None and self.paused:
self.paused = False
self._play_next_file()
def pause(self):
if self._player is not None and not self.paused:
self.paused = True
self._player.stop()
def _play_next_file(self):
- self._player.filename = self.files.next()
+ self._player.filename = next(self.files)
self._player.play()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_WavePlayerDidFail(self, notification):
if not self.paused:
self._play_next_file()
_NH_WavePlayerDidEnd = _NH_WavePlayerDidFail
class InterruptWelcome(Exception): pass
class WelcomeHandler(object):
implements(IObserver)
def __init__(self, room, session):
self.room = room
self.session = session
self.procs = proc.RunningProcSet()
@run_in_green_thread
def start(self, welcome_prompt=True):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
self.procs.spawn(self.play_audio_welcome, welcome_prompt)
self.procs.spawn(self.render_chat_welcome, welcome_prompt)
self.procs.waitall()
notification_center.remove_observer(self, sender=self.session)
self.session = None
self.room = None
def play_file_in_player(self, player, file, delay):
player.filename = file
player.pause_time = delay
try:
player.play().wait()
except WavePlayerError, e:
log.warning(u"Error playing file %s: %s" % (file, e))
def play_audio_welcome(self, welcome_prompt):
try:
audio_stream = next(stream for stream in self.session.streams if stream.type == 'audio')
except StopIteration:
return
player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_delay=1, volume=50)
audio_stream.bridge.add(player)
try:
if welcome_prompt:
file = ResourcePath('sounds/co_welcome_conference.wav').normalized
self.play_file_in_player(player, file, 1)
user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(self.session.remote_identity.uri)]))
if user_count == 0:
file = ResourcePath('sounds/co_only_one.wav').normalized
self.play_file_in_player(player, file, 0.5)
elif user_count == 1:
file = ResourcePath('sounds/co_there_is_one.wav').normalized
self.play_file_in_player(player, file, 0.5)
elif user_count < 100:
file = ResourcePath('sounds/co_there_are.wav').normalized
self.play_file_in_player(player, file, 0.2)
if user_count <= 24:
file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized
self.play_file_in_player(player, file, 0.1)
else:
file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized
self.play_file_in_player(player, file, 0.1)
file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized
self.play_file_in_player(player, file, 0.1)
file = ResourcePath('sounds/co_more_participants.wav').normalized
self.play_file_in_player(player, file, 0)
file = ResourcePath('sounds/connected_tone.wav').normalized
self.play_file_in_player(player, file, 0.1)
except InterruptWelcome:
# No need to remove the bridge from the stream, it's done automatically
pass
else:
audio_stream.bridge.remove(player)
self.room.audio_conference.add(audio_stream)
self.room.audio_conference.unhold()
if len(self.room.audio_conference.streams) == 1:
self.room.moh_player.play()
else:
self.room.moh_player.pause()
finally:
player.stop()
def render_chat_welcome_prompt(self):
txt = 'Welcome to SylkServer!'
user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions) - set([str(self.session.remote_identity.uri)]))
if user_count == 0:
txt += ' You are the first participant'
else:
if user_count == 1:
txt += ' There is one more participant'
else:
txt += ' There are %s more participants' % user_count
txt += ' in this conference room.'
if True or not ServerConfig.enable_bonjour:
if self.room.config.advertise_xmpp_support or self.room.config.pstn_access_numbers:
txt += '\n\nOther participants can join at these addresses:\n\n'
if self.room.config.pstn_access_numbers:
if len(self.room.config.pstn_access_numbers) == 1:
nums = self.room.config.pstn_access_numbers[0]
else:
nums = ', '.join(self.room.config.pstn_access_numbers[:-1]) + ' or %s' % self.room.config.pstn_access_numbers[-1]
txt += ' - Using a landline or mobile phone, dial %s (audio)\n' % nums
if self.room.config.advertise_xmpp_support:
txt += ' - Using an XMPP client, connect to group chat room %s (chat)\n' % self.room.uri
txt += ' - Using an XMPP Jingle capable client, add contact %s and call it (audio)\n' % self.room.uri
txt += ' - Using a SIP client, initiate a session to %s (audio and chat)\n' % self.room.uri
return txt
def render_chat_welcome(self, welcome_prompt):
try:
- chat_stream = (stream for stream in self.session.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in self.session.streams if stream.type == 'chat')
except StopIteration:
return
try:
welcome_text = self.render_chat_welcome_prompt()
chat_stream.send_message(welcome_text, 'text/plain', local_identity=self.room.identity, recipients=[self.room.identity])
remote_identity = CPIMIdentity.parse(format_identity(self.session.remote_identity, cpim_format=True))
for msg in database.get_last_messages(self.room.uri, ConferenceConfig.replay_history):
recipient = CPIMIdentity.parse(msg.cpim_recipient)
sender = CPIMIdentity.parse(msg.cpim_sender)
if recipient.uri in (self.room.identity.uri, remote_identity.uri) or sender.uri == remote_identity.uri:
chat_stream.send_message(msg.cpim_body, msg.cpim_content_type, local_identity=sender, recipients=[recipient], timestamp=msg.cpim_timestamp)
except InterruptWelcome:
pass
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionWillEnd(self, notification):
self.procs.killall(InterruptWelcome)
class RoomFile(object):
def __init__(self, name, hash, size, sender, status):
self.name = name
self.hash = hash
self.size = size
self.sender = sender
self.status = status
@property
def file_selector(self):
return FileSelector.for_file(self.name.encode('utf-8'), hash=self.hash)
class IncomingFileTransferHandler(object):
implements(IObserver)
def __init__(self, room, session):
self.room = weakref.ref(room)
self.room_uri = room.uri
self.session = session
- self.stream = (stream for stream in self.session.streams if stream.type == 'file-transfer' and stream.direction == 'recvonly').next()
+ self.stream = next(stream for stream in self.session.streams if stream.type == 'file-transfer' and stream.direction == 'recvonly')
self.error = False
self.ended = False
self.file = None
self.file_selector = None
self.filename = None
self.hash = None
self.status = None
self.timer = None
self.transfer_finished = False
def start(self):
self.file_selector = self.stream.file_selector
path = os.path.join(ConferenceConfig.file_transfer_dir, self.room_uri)
makedirs(path)
self.filename = filename = os.path.join(path, self.file_selector.name.decode('utf-8'))
basename, ext = os.path.splitext(filename)
i = 1
while os.path.exists(filename):
filename = '%s_%d%s' % (basename, i, ext)
i += 1
self.filename = filename
try:
self.file = open(self.filename, 'wb')
except EnvironmentError:
log.msg('Room %s - cannot write destination filename: %s' % (self.room_uri, self.filename))
self.session.end()
return
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.stream)
self.hash = hashlib.sha1()
@run_in_thread('file-transfer')
def write_chunk(self, data):
notification_center = NotificationCenter()
if data is not None:
try:
self.file.write(data)
except EnvironmentError, e:
notification_center.post_notification('IncomingFileTransferHandlerGotError', sender=self, data=NotificationData(error=str(e)))
else:
self.hash.update(data)
else:
self.file.close()
if self.error:
notification_center.post_notification('IncomingFileTransferHandlerDidFail', sender=self)
else:
notification_center.post_notification('IncomingFileTransferHandlerDidEnd', sender=self)
@run_in_thread('file-io')
def remove_bogus_file(self, filename):
try:
os.unlink(filename)
except OSError:
pass
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidEnd(self, notification):
self.ended = True
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.timer = None
notification.center.remove_observer(self, sender=self.stream)
notification.center.remove_observer(self, sender=self.session)
# Mark end of write operation
self.write_chunk(None)
def _NH_FileTransferStreamGotChunk(self, notification):
self.write_chunk(notification.data.content)
def _NH_FileTransferStreamDidFinish(self, notification):
self.transfer_finished = True
if self.timer is None:
self.timer = reactor.callLater(5, self.session.end)
def _NH_IncomingFileTransferHandlerGotError(self, notification):
log.error('Error while handling incoming file transfer: %s' % notification.data.error)
self.error = True
self.status = notification.data.error
if not self.ended and self.timer is None:
self.timer = reactor.callLater(5, self.session.end)
def _NH_IncomingFileTransferHandlerDidEnd(self, notification):
notification.center.remove_observer(self, sender=self)
remote_hash = self.file_selector.hash
if not self.transfer_finished:
log.msg('File transfer of %s cancelled' % os.path.basename(self.filename))
self.remove_bogus_file(self.filename)
self.status = 'INCOMPLETE'
else:
local_hash = 'sha1:' + ':'.join(re.findall(r'..', self.hash.hexdigest().upper()))
if local_hash != remote_hash:
log.warning('Hash of transferred file does not match the remote hash (file may have changed).')
self.status = 'Hash missmatch'
self.remove_bogus_file(self.filename)
else:
self.status = 'OK'
file = RoomFile(self.filename, remote_hash, self.file_selector.size, format_identity(self.session.remote_identity, cpim_format=True), self.status)
room = self.room() or Null
room.add_file(file)
self.session = None
self.stream = None
def _NH_IncomingFileTransferHandlerDidFail(self, notification):
notification.center.remove_observer(self, sender=self)
file = RoomFile(self.filename, self.file_selector.hash, self.file_selector.size, format_identity(self.session.remote_identity, cpim_format=True), self.status)
room = self.room() or Null
room.add_file(file)
self.session = None
self.stream = None
class OutgoingFileTransferRequestHandler(object):
implements(IObserver)
def __init__(self, room, session):
self.room = weakref.ref(room)
self.session = session
- self.stream = (stream for stream in self.session.streams if stream.type == 'file-transfer').next()
+ self.stream = next(stream for stream in self.session.streams if stream.type == 'file-transfer')
self.timer = None
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.stream)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_FileTransferStreamDidFinish(self, notification):
if self.timer is None:
self.timer = reactor.callLater(2, self.session.end)
def _NH_SIPSessionDidEnd(self, notification):
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.timer = None
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.stream)
notification_center.remove_observer(self, sender=self.session)
self.session = None
self.stream = None
_NH_SIPSessionDidFail = _NH_SIPSessionDidEnd
class InterruptFileTransfer(Exception): pass
class OutgoingFileTransferHandler(object):
implements(IObserver)
def __init__(self, room, destination, file):
self.room_uri = room.identity.uri
self.destination = destination
self.file = file
self.session = None
self.stream = None
self.timer = None
@run_in_green_thread
def start(self):
self.greenlet = api.getcurrent()
settings = SIPSimpleSettings()
account = AccountManager().sylkserver_account
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = SIPURI.new(self.destination)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError:
return
notification_center = NotificationCenter()
self.session = Session(account)
self.stream = FileTransferStream(self.file.file_selector, 'sendonly')
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.stream)
subject = u'File uploaded by %s' % self.file.sender
from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference File Transfer')
to_header = ToHeader(SIPURI.new(self.destination))
transport = routes[0].transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters))
extra_headers = []
if ThorNodeConfig.enabled:
extra_headers.append(Header('Thor-Scope', 'conference-invitation'))
originator_uri = CPIMIdentity.parse(self.file.sender).uri
extra_headers.append(Header('X-Originator-From', str(originator_uri)))
self.session.connect(from_header, to_header, contact_header=contact_header, routes=routes, streams=[self.stream], is_focus=True, subject=subject, extra_headers=extra_headers)
def stop(self):
if self.session is not None:
self.session.end()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_FileTransferStreamDidFinish(self, notification):
if self.timer is None:
self.timer = reactor.callLater(2, self.session.end)
def _NH_SIPSessionDidEnd(self, notification):
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.timer = None
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.stream)
notification_center.remove_observer(self, sender=self.session)
self.session = None
self.stream = None
_NH_SIPSessionDidFail = _NH_SIPSessionDidEnd
diff --git a/sylk/applications/ircconference/room.py b/sylk/applications/ircconference/room.py
index b4218d3..11c7a62 100644
--- a/sylk/applications/ircconference/room.py
+++ b/sylk/applications/ircconference/room.py
@@ -1,619 +1,619 @@
# Copyright (C) 2011 AG Projects. See LICENSE for details.
#
import random
import urllib
from itertools import count
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from eventlib import coros, proc
from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI, SIPCoreError, SIPCoreInvalidStateError
from sipsimple.payloads.conference import Conference, ConferenceDocument, ConferenceDescription, ConferenceState, Endpoint, EndpointStatus, HostInfo, JoiningInfo, Media, User, Users, WebPage
from sipsimple.streams.applications.chat import CPIMIdentity
from sipsimple.streams.msrp import ChatStreamError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from twisted.internet import protocol, reactor
from twisted.words.protocols import irc
from zope.interface import implements
from sylk.applications.ircconference.configuration import get_room_configuration
from sylk.applications.ircconference.logger import log
from sylk.configuration.datatypes import ResourcePath
def format_identity(identity, cpim_format=False):
uri = identity.uri
if identity.display_name:
return u'%s <sip:%s@%s>' % (identity.display_name, uri.user, uri.host)
elif cpim_format:
return u'<sip:%s@%s>' % (uri.user, uri.host)
else:
return u'sip:%s@%s' % (uri.user, uri.host)
def format_stream_types(streams):
if not streams:
return ''
if len(streams) == 1:
txt = 'with %s' % streams[0].type
else:
txt = 'with %s' % ','.join(stream.type for stream in streams[:-1])
txt += ' and %s' % streams[-1:][0].type
return txt
def format_session_duration(session):
if session.start_time:
duration = session.end_time - session.start_time
seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1
minutes, seconds = seconds / 60, seconds % 60
hours, minutes = minutes / 60, minutes % 60
hours += duration.days*24
if not minutes and not hours:
duration_text = '%d seconds' % seconds
elif not hours:
duration_text = '%02d:%02d' % (minutes, seconds)
else:
duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds)
else:
duration_text = '0s'
return duration_text
def format_conference_stream_type(stream):
if stream.type == 'chat':
return 'message'
return stream.type
class IRCMessage(object):
def __init__(self, username, uri, body, content_type='text/plain'):
self.sender = CPIMIdentity(uri, display_name=username)
self.body = body
self.content_type = content_type
class IRCRoom(object):
"""
Object representing a conference room, it will handle the message dispatching
among all the participants.
"""
__metaclass__ = Singleton
implements(IObserver)
def __init__(self, uri):
self.uri = uri
self.identity = CPIMIdentity.parse('<sip:%s>' % self.uri)
self.sessions = []
self.sessions_with_proposals = []
self.subscriptions = []
self.pending_messages = []
self.state = 'stopped'
self.incoming_message_queue = coros.queue()
self.message_dispatcher = None
self.audio_conference = None
self.conference_info_payload = None
self.conference_info_version = count(1)
self.irc_connector = None
self.irc_protocol = None
@classmethod
def get_room(cls, uri):
room_uri = '%s@%s' % (uri.user, uri.host)
room = cls(room_uri)
return room
@property
def empty(self):
return len(self.sessions) == 0
@property
def started(self):
return self.state == 'started'
def start(self):
if self.state != 'stopped':
return
config = get_room_configuration(self.uri.split('@')[0])
factory = IRCBotFactory(config)
host, port = config.server
self.irc_connector = reactor.connectTCP(host, port, factory)
NotificationCenter().add_observer(self, sender=self.irc_connector.factory)
self.message_dispatcher = proc.spawn(self._message_dispatcher)
self.audio_conference = AudioConference()
self.audio_conference.hold()
self.state = 'started'
def stop(self):
if self.state != 'started':
return
self.state = 'stopped'
NotificationCenter().remove_observer(self, sender=self.irc_connector.factory)
self.irc_connector.factory.stop_requested = True
self.irc_connector.disconnect()
self.irc_connector = None
self.message_dispatcher.kill(proc.ProcExit)
self.moh_player = None
self.audio_conference = None
def _message_dispatcher(self):
"""Read from self.incoming_message_queue and dispatch the messages to other participants"""
while True:
session, message_type, data = self.incoming_message_queue.wait()
if message_type == 'msrp_message':
if data.sender.uri != session.remote_identity.uri:
return
self.dispatch_message(session, data)
elif message_type == 'irc_message':
self.dispatch_irc_message(data)
def dispatch_message(self, session, message):
for s in (s for s in self.sessions if s is not session):
try:
identity = CPIMIdentity.parse(format_identity(session.remote_identity, True))
- chat_stream = (stream for stream in s.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
pass
else:
try:
chat_stream.send_message(message.body, message.content_type, local_identity=identity, recipients=[self.identity], timestamp=message.timestamp)
except ChatStreamError, e:
log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e))
def dispatch_irc_message(self, message):
for session in self.sessions:
try:
- chat_stream = (stream for stream in session.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
try:
chat_stream.send_message(message.body, message.content_type, local_identity=message.sender, recipients=[self.identity])
except ChatStreamError, e:
log.error(u'Error dispatching message to %s: %s' % (session.remote_identity.uri, e))
def dispatch_server_message(self, body, content_type='text/plain', exclude=None):
for session in (session for session in self.sessions if session is not exclude):
try:
- chat_stream = (stream for stream in session.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
try:
chat_stream.send_message(body, content_type, local_identity=self.identity, recipients=[self.identity])
except ChatStreamError, e:
log.error(u'Error dispatching message to %s: %s' % (session.remote_identity.uri, e))
def get_conference_info(self):
# Send request to get participants list, we'll get a notification with it
if self.irc_protocol is not None:
self.irc_protocol.get_participants()
else:
self.dispatch_conference_info([])
def dispatch_conference_info(self, irc_participants):
data = self.build_conference_info_payload(irc_participants)
for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'):
try:
subscription.push_content(ConferenceDocument.content_type, data)
except (SIPCoreError, SIPCoreInvalidStateError):
pass
def build_conference_info_payload(self, irc_participants):
irc_configuration = get_room_configuration(self.uri.split('@')[0])
if self.conference_info_payload is None:
settings = SIPSimpleSettings()
conference_description = ConferenceDescription(display_text='#%s on %s' % (irc_configuration.channel, irc_configuration.server[0]), free_text='Hosted by %s' % settings.user_agent)
host_info = HostInfo(web_page=WebPage(irc_configuration.website))
self.conference_info_payload = Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=Users())
self.conference_info_payload.version = next(self.conference_info_version)
user_count = len(set(str(s.remote_identity.uri) for s in self.sessions)) + len(irc_participants)
self.conference_info_payload.conference_state = ConferenceState(user_count=user_count, active=True)
users = Users()
for session in self.sessions:
try:
- user = (user for user in users if user.entity == str(session.remote_identity.uri)).next()
+ user = next(user for user in users if user.entity == str(session.remote_identity.uri))
except StopIteration:
user = User(str(session.remote_identity.uri), display_text=session.remote_identity.display_name)
users.add(user)
joining_info = JoiningInfo(when=session.start_time)
holdable_streams = [stream for stream in session.streams if stream.hold_supported]
session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams)
hold_status = EndpointStatus('on-hold' if session_on_hold else 'connected')
endpoint = Endpoint(str(session._invitation.remote_contact_header.uri), display_text=session.remote_identity.display_name, joining_info=joining_info, status=hold_status)
for stream in session.streams:
endpoint.add(Media(id(stream), media_type=format_conference_stream_type(stream)))
user.add(endpoint)
for nick in irc_participants:
irc_uri = '%s@%s' % (urllib.quote(nick), irc_configuration.server[0])
user = User(irc_uri, display_text=nick)
users.add(user)
endpoint = Endpoint(irc_uri, display_text=nick)
endpoint.add(Media(random.randint(100000000, 999999999), media_type='message'))
user.add(endpoint)
self.conference_info_payload.users = users
return self.conference_info_payload.toxml()
def add_session(self, session):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=session)
self.sessions.append(session)
try:
- chat_stream = (stream for stream in session.streams if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=chat_stream)
try:
- audio_stream = (stream for stream in session.streams if stream.type == 'audio').next()
+ audio_stream = next(stream for stream in session.streams if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=audio_stream)
log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate,
'encrypted' if audio_stream.srtp_active else 'unencrypted',
audio_stream.local_rtp_address, audio_stream.local_rtp_port,
audio_stream.remote_rtp_address, audio_stream.remote_rtp_port))
self.play_audio_welcome(session)
self.get_conference_info()
if len(self.sessions) == 1:
log.msg(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams)))
else:
log.msg(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams)))
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), format_stream_types(session.streams)), exclude=session)
def remove_session(self, session):
notification_center = NotificationCenter()
try:
- chat_stream = (stream for stream in session.streams or [] if stream.type == 'chat').next()
+ chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=chat_stream)
try:
- audio_stream = (stream for stream in session.streams or [] if stream.type == 'audio').next()
+ audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=audio_stream)
try:
self.audio_conference.remove(audio_stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.audio_conference.hold()
notification_center.remove_observer(self, sender=session)
self.sessions.remove(session)
self.get_conference_info()
log.msg(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, format_session_duration(session)))
if not self.sessions:
log.msg(u'Last participant left conference %s' % self.uri)
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), format_session_duration(session)))
def accept_proposal(self, session, streams):
if session in self.sessions_with_proposals:
session.accept_proposal(streams)
self.sessions_with_proposals.remove(session)
def _play_file_in_player(self, player, file, delay):
player.filename = file
player.pause_time = delay
try:
player.play().wait()
except WavePlayerError, e:
log.warning(u"Error playing file %s: %s" % (file, e))
@run_in_green_thread
def play_audio_welcome(self, session, welcome_prompt=True):
- audio_stream = (stream for stream in session.streams if stream.type == 'audio').next()
+ audio_stream = next(stream for stream in session.streams if stream.type == 'audio')
player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_delay=1, volume=50)
audio_stream.bridge.add(player)
if welcome_prompt:
file = ResourcePath('sounds/co_welcome_conference.wav').normalized
self._play_file_in_player(player, file, 1)
user_count = len(set(str(s.remote_identity.uri) for s in self.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(session.remote_identity.uri)]))
if user_count == 0:
file = ResourcePath('sounds/co_only_one.wav').normalized
self._play_file_in_player(player, file, 0.5)
elif user_count == 1:
file = ResourcePath('sounds/co_there_is.wav').normalized
self._play_file_in_player(player, file, 0.5)
elif user_count < 100:
file = ResourcePath('sounds/co_there_are.wav').normalized
self._play_file_in_player(player, file, 0.2)
if user_count <= 24:
file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized
self._play_file_in_player(player, file, 0.1)
else:
file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized
self._play_file_in_player(player, file, 0.1)
file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized
self._play_file_in_player(player, file, 0.1)
file = ResourcePath('sounds/co_more_participants.wav').normalized
self._play_file_in_player(player, file, 0)
audio_stream.bridge.remove(player)
self.audio_conference.add(audio_stream)
self.audio_conference.unhold()
def handle_incoming_subscription(self, subscribe_request, data):
if subscribe_request.event != 'conference':
subscribe_request.reject(489)
return
NotificationCenter().add_observer(self, sender=subscribe_request)
subscribe_request.accept()
self.subscriptions.append(subscribe_request)
self.get_conference_info()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPIncomingSubscriptionDidEnd(self, notification):
subscription = notification.sender
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=subscription)
self.subscriptions.remove(subscription)
def _NH_SIPSessionDidChangeHoldState(self, notification):
session = notification.sender
if notification.data.originator == 'remote':
if notification.data.on_hold:
log.msg(u'%s has put the audio session on hold' % format_identity(session.remote_identity))
else:
log.msg(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity))
self.get_conference_info()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat']
if not audio_streams and not chat_streams:
session.reject_proposal()
return
if chat_streams:
chat_streams[0].chatroom_capabilities = []
streams = [streams[0] for streams in (audio_streams, chat_streams) if streams]
self.sessions_with_proposals.append(session)
reactor.callLater(4, self.accept_proposal, session, streams)
def _NH_SIPSessionProposalRejected(self, notification):
session = notification.sender
self.sessions_with_proposals.remove(session)
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
session = notification.sender
for stream in notification.data.added_streams:
notification.center.add_observer(self, sender=stream)
log.msg(u'%s has added %s to %s' % (format_identity(session.remote_identity), stream.type, self.uri))
self.dispatch_server_message('%s has added %s' % (format_identity(session.remote_identity), stream.type), exclude=session)
if stream.type == 'audio':
log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (stream.codec, stream.sample_rate,
'encrypted' if stream.srtp_active else 'unencrypted',
stream.local_rtp_address, stream.local_rtp_port,
stream.remote_rtp_address, stream.remote_rtp_port))
self.play_audio_welcome(session, False)
for stream in notification.data.removed_streams:
notification.center.remove_observer(self, sender=stream)
log.msg(u'%s has removed %s from %s' % (format_identity(session.remote_identity), stream.type, self.uri))
self.dispatch_server_message('%s has removed %s' % (format_identity(session.remote_identity), stream.type), exclude=session)
if stream.type == 'audio':
try:
self.audio_conference.remove(audio_stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.audio_conference.hold()
if not session.streams:
log.msg(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri))
session.end()
self.get_conference_info()
def _NH_AudioStreamDidTimeout(self, notification):
stream = notification.sender
session = stream.session
log.msg(u'Audio stream for session %s timed out' % format_identity(session.remote_identity))
if session.streams == [stream]:
session.end()
def _NH_ChatStreamGotMessage(self, notification):
stream = notification.sender
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
# Send MSRP chat message to other participants
message = notification.data.message
session = notification.sender.session
self.incoming_message_queue.send((session, 'msrp_message', message))
# Send MSRP chat message to IRC chat room
body = message.body
sender = message.sender
irc_message = '%s: %s' % (format_identity(sender), body)
if self.irc_protocol is not None:
self.irc_protocol.send_message(irc_message.encode('utf-8'))
else:
self.pending_messages.append(irc_message)
def _NH_ChatStreamGotNicknameRequest(self, notification):
# Discard the nickname but pretend we accept it so that XMPP clients can work
chunk = notification.data.chunk
notification.sender.accept_nickname(chunk)
def _NH_IRCBotGotConnected(self, notification):
self.irc_protocol = notification.data.protocol
# Send enqueued messages
while self.pending_messages:
message = self.pending_messages.pop(0)
self.irc_protocol.send_message(message.encode('utf-8'))
# Update participants list
self.get_conference_info()
def _NH_IRCBotGotDisconnected(self, notification):
self.irc_protocol = None
def _NH_IRCBotGotMessage(self, notification):
message = notification.data.message
self.incoming_message_queue.send((None, 'irc_message', message))
def _NH_IRCBotGotParticipantsList(self, notification):
self.dispatch_conference_info(notification.data.participants)
def _NH_IRCBotJoinedChannel(self, notification):
self.get_conference_info()
def _NH_IRCBotUserJoined(self, notification):
self.dispatch_server_message('%s joined the IRC channel' % notification.data.user)
self.get_conference_info()
def _NH_IRCBotUserLeft(self, notification):
self.dispatch_server_message('%s left the IRC channel' % notification.data.user)
self.get_conference_info()
def _NH_IRCBotUserQuit(self, notification):
self.dispatch_server_message('%s quit the IRC channel: %s' % (notification.data.user, notification.data.reason))
self.get_conference_info()
def _NH_IRCBotUserKicked(self, notification):
data = notification.data
self.dispatch_server_message('%s kicked %s out of the IRC channel: %s' % (data.kicker, data.kickee, data.reason))
self.get_conference_info()
def _NH_IRCBotUserRenamed(self, notification):
self.dispatch_server_message('%s changed his name to %s' % (notification.data.oldname, notification.data.newname))
self.get_conference_info()
def _NH_IRCBotUserAction(self, notification):
self.dispatch_server_message('%s %s' % (notification.data.user, notification.data.action))
class IRCBot(irc.IRCClient):
nickname = 'SylkServer'
def __init__(self):
self._nick_collector = []
self.nicks = []
def connectionMade(self):
irc.IRCClient.connectionMade(self)
log.msg('Connection to IRC has been established')
NotificationCenter().post_notification('IRCBotGotConnected', self.factory, NotificationData(protocol=self))
def connectionLost(self, failure):
irc.IRCClient.connectionLost(self, failure)
NotificationCenter().post_notification('IRCBotGotDisconnected', self.factory, NotificationData())
def signedOn(self):
log.msg('Logging into %s channel...' % self.factory.channel)
self.join(self.factory.channel)
def kickedFrom(self, channel, kicker, message):
log.msg('Got kicked from %s by %s: %s. Rejoining...' % (channel, kicker, message))
self.join(self.factory.channel)
def joined(self, channel):
log.msg('Logged into %s channel' % channel)
NotificationCenter().post_notification('IRCBotJoinedChannel', self.factory, NotificationData(channel=self.factory.channel))
def privmsg(self, user, channel, message):
if channel == '*':
return
username = user.split('!', 1)[0]
if username == self.nickname:
return
if channel == self.nickname:
self.msg(username, "Sorry, I don't support private messages, I'm a bot.")
return
uri = SIPURI.parse('sip:%s@%s' % (urllib.quote(username), self.factory.config.server[0]))
irc_message = IRCMessage(username, uri, message.decode('utf-8'))
data = NotificationData(message=irc_message)
NotificationCenter().post_notification('IRCBotGotMessage', self.factory, data)
def send_message(self, message):
self.say(self.factory.channel, message)
def get_participants(self):
self.sendLine("NAMES #%s" % self.factory.channel)
def got_participants(self, nicks):
data = NotificationData(participants=nicks)
NotificationCenter().post_notification('IRCBotGotParticipantsList', self.factory, data)
def irc_RPL_NAMREPLY(self, prefix, params):
"""Collect usernames from this channel. Several of these
messages may be sent to cover the channel's full nicklist.
An RPL_ENDOFNAMES signals the end of the list.
"""
# We just separate these into individual nicks and stuff them in
# the nickCollector, transferred to 'nicks' when we get the RPL_ENDOFNAMES.
for name in params[3].split():
# Remove operator and voice prefixes
if name[0] in '@+':
name = name[1:]
if name != self.nickname:
self._nick_collector.append(name)
def irc_RPL_ENDOFNAMES(self, prefix, params):
"""This is sent after zero or more RPL_NAMREPLY commands to
terminate the list of users in a channel.
"""
self.nicks = self._nick_collector
self._nick_collector = []
self.got_participants(self.nicks)
def userJoined(self, user, channel):
if channel.strip('#') == self.factory.channel:
data = NotificationData(user=user)
NotificationCenter().post_notification('IRCBotUserJoined', self.factory, data)
def userLeft(self, user, channel):
if channel.strip('#') == self.factory.channel:
data = NotificationData(user=user)
NotificationCenter().post_notification('IRCBotUserLeft', self.factory, data)
def userQuit(self, user, reason):
data = NotificationData(user=user, reason=reason)
NotificationCenter().post_notification('IRCBotUserQuit', self.factory, data)
def userKicked(self, kickee, channel, kicker, message):
if channel.strip('#') == self.factory.channel:
data = NotificationData(kickee=kickee, kicker=kicker, reason=message)
NotificationCenter().post_notification('IRCBotUserKicked', self.factory, data)
def userRenamed(self, oldname, newname):
data = NotificationData(oldname=oldname, newname=newname)
NotificationCenter().post_notification('IRCBotUserRenamed', self.factory, data)
def action(self, user, channel, data):
if channel.strip('#') == self.factory.channel:
username = user.split('!', 1)[0]
data = NotificationData(user=username, action=data)
NotificationCenter().post_notification('IRCBotUserAction', self.factory, data)
class IRCBotFactory(protocol.ClientFactory):
protocol = IRCBot
def __init__(self, config):
self.config = config
self.channel = config.channel
self.stop_requested = False
def clientConnectionLost(self, connector, failure):
log.msg('Disconnected from IRC: %s' % failure.getErrorMessage())
if not self.stop_requested:
log.msg('Reconnecting...')
connector.connect()
def clientConnectionFailed(self, connector, failure):
log.error('Connection to IRC server failed: %s' % failure.getErrorMessage())
diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py
index 510b07c..e062359 100644
--- a/sylk/applications/xmppgateway/im.py
+++ b/sylk/applications/xmppgateway/im.py
@@ -1,451 +1,451 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.descriptor import WriteOnceAttribute
from collections import deque
from eventlib import coros
from sipsimple.account import AccountManager
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI
from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader
from sipsimple.core import Message as SIPMessageRequest
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.streams.applications.chat import CPIMIdentity
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread
from twisted.internet import reactor
from zope.interface import implements
from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource
from sylk.applications.xmppgateway.logger import log
from sylk.applications.xmppgateway.xmpp import XMPPManager
from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession
from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage
from sylk.extensions import ChatStream
from sylk.session import Session
__all__ = ['ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError']
SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout
class ChatSessionHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self):
self.started = False
self.ended = False
self.sip_session = None
self.msrp_stream = None
self._sip_session_timer = None
self.use_receipts = False
self.xmpp_session = None
self._xmpp_message_queue = deque()
self._pending_msrp_chunks = {}
self._pending_xmpp_stanzas = {}
def _set_started(self, value):
old_value = self.__dict__.get('started', False)
self.__dict__['started'] = value
if not old_value and value:
NotificationCenter().post_notification('ChatSessionDidStart', sender=self)
self._send_queued_messages()
def _get_started(self):
return self.__dict__['started']
started = property(_get_started, _set_started)
del _get_started, _set_started
def _set_xmpp_session(self, session):
self.__dict__['xmpp_session'] = session
if session is not None:
# Reet SIP session timer in case it's active
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
NotificationCenter().add_observer(self, sender=session)
session.start()
# Reet SIP session timer in case it's active
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
def _get_xmpp_session(self):
return self.__dict__['xmpp_session']
xmpp_session = property(_get_xmpp_session, _set_xmpp_session)
del _get_xmpp_session, _set_xmpp_session
@classmethod
def new_from_sip_session(cls, sip_identity, session):
instance = cls()
instance.sip_identity = sip_identity
instance._start_incoming_sip_session(session)
return instance
@classmethod
def new_from_xmpp_stanza(cls, xmpp_identity, recipient):
instance = cls()
instance.xmpp_identity = xmpp_identity
instance._start_outgoing_sip_session(recipient)
return instance
@run_in_green_thread
def _start_incoming_sip_session(self, session):
self.sip_session = session
- self.msrp_stream = (stream for stream in session.proposed_streams if stream.type=='chat').next()
+ self.msrp_stream = next(stream for stream in session.proposed_streams if stream.type=='chat')
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.sip_session)
notification_center.add_observer(self, sender=self.msrp_stream)
self.sip_session.accept([self.msrp_stream])
@run_in_green_thread
def _start_outgoing_sip_session(self, target_uri):
notification_center = NotificationCenter()
# self.xmpp_identity is our local identity
from_uri = self.xmpp_identity.uri.as_sip_uri()
del from_uri.parameters['gr'] # no GRUU in From header
contact_uri = self.xmpp_identity.uri.as_sip_uri()
contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8'))
to_uri = target_uri.as_sip_uri()
lookup = DNSLookup()
settings = SIPSimpleSettings()
account = AccountManager().sylkserver_account
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = to_uri
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError:
log.warning('DNS lookup error while looking for %s proxy' % uri)
notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='DNS lookup error'))
return
self.msrp_stream = ChatStream()
route = routes.pop(0)
from_header = FromHeader(from_uri)
to_header = ToHeader(to_uri)
contact_header = ContactHeader(contact_uri)
self.sip_session = Session(account)
notification_center.add_observer(self, sender=self.sip_session)
notification_center.add_observer(self, sender=self.msrp_stream)
self.sip_session.connect(from_header, to_header, contact_header=contact_header, routes=[route], streams=[self.msrp_stream])
def end(self):
if self.ended:
return
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.cancel()
self._sip_session_timer = None
notification_center = NotificationCenter()
if self.sip_session is not None:
notification_center.remove_observer(self, sender=self.sip_session)
notification_center.remove_observer(self, sender=self.msrp_stream)
self.sip_session.end()
self.sip_session = None
self.msrp_stream = None
if self.xmpp_session is not None:
notification_center.remove_observer(self, sender=self.xmpp_session)
self.xmpp_session.end()
self.xmpp_session = None
self.ended = True
if self.started:
notification_center.post_notification('ChatSessionDidEnd', sender=self)
else:
notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='Ended before actually started'))
def enqueue_xmpp_message(self, message):
if self.started:
raise RuntimeError('session is already started')
self._xmpp_message_queue.append(message)
def _send_queued_messages(self):
if self._xmpp_message_queue:
while self._xmpp_message_queue:
message = self._xmpp_message_queue.popleft()
if message.body is None:
continue
if not message.use_receipt:
success_report = 'no'
failure_report = 'no'
else:
success_report = 'yes'
failure_report = 'yes'
sender_uri = message.sender.uri.as_sip_uri()
sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8'))
sender = CPIMIdentity(sender_uri)
self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report)
self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender)
def _inactivity_timeout(self):
log.msg("Ending SIP session %s due to inactivity" % self.sip_session._invitation.call_id)
self.sip_session.end()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
log.msg("SIP session %s started" % notification.sender._invitation.call_id)
self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout)
if self.sip_session.direction == 'outgoing':
# Time to set sip_identity and create the XMPPChatSession
contact_uri = self.sip_session._invitation.remote_contact_header.uri
if contact_uri.parameters.get('gr') is not None:
sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr'))
else:
tmp = self.sip_session.remote_identity.uri
sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource())
self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name)
session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
self.xmpp_session = session
# Session is now established on both ends
self.started = True
# Try to wakeup XMPP clients
self.xmpp_session.send_composing_indication('active')
self.xmpp_session.send_message(' ', 'text/plain')
else:
if self.xmpp_session is not None:
# Session is now established on both ends
self.started = True
# Try to wakeup XMPP clients
self.xmpp_session.send_composing_indication('active')
self.xmpp_session.send_message(' ', 'text/plain')
else:
# Try to wakeup XMPP clients
sender = self.sip_identity
tmp = self.sip_session.local_identity.uri
recipient_uri = FrozenURI(tmp.user, tmp.host)
recipient = Identity(recipient_uri)
xmpp_manager = XMPPManager()
xmpp_manager.send_stanza(ChatMessage(sender, recipient, ' ', 'text/plain'))
# Send queued messages
self._send_queued_messages()
def _NH_SIPSessionDidEnd(self, notification):
log.msg("SIP session %s ended" % notification.sender._invitation.call_id)
notification.center.remove_observer(self, sender=self.sip_session)
notification.center.remove_observer(self, sender=self.msrp_stream)
self.sip_session = None
self.msrp_stream = None
self.end()
def _NH_SIPSessionDidFail(self, notification):
log.msg("SIP session %s failed" % notification.sender._invitation.call_id)
notification.center.remove_observer(self, sender=self.sip_session)
notification.center.remove_observer(self, sender=self.msrp_stream)
self.sip_session = None
self.msrp_stream = None
self.end()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
self.sip_session.reject_proposal()
def _NH_SIPSessionTransferNewIncoming(self, notification):
self.sip_session.reject_transfer(403)
def _NH_ChatStreamGotMessage(self, notification):
# Notification is sent by the MSRP stream
message = notification.data.message
content_type = message.content_type.lower()
if content_type not in ('text/plain', 'text/html'):
return
if content_type == 'text/plain':
html_body = None
body = message.body
else:
html_body = message.body
body = None
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
chunk = notification.data.chunk
if self.started:
self.xmpp_session.send_message(body, html_body, message_id=chunk.message_id)
if self.use_receipts:
self._pending_msrp_chunks[chunk.message_id] = chunk
else:
self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK')
else:
sender = self.sip_identity
recipient_uri = FrozenURI.parse(message.recipients[0].uri)
recipient = Identity(recipient_uri, message.recipients[0].display_name)
xmpp_manager = XMPPManager()
xmpp_manager.send_stanza(ChatMessage(sender, recipient, body, html_body))
self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK')
def _NH_ChatStreamGotComposingIndication(self, notification):
# Notification is sent by the MSRP stream
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
if not self.started:
return
state = None
if notification.data.state == 'active':
state = 'composing'
elif notification.data.state == 'idle':
state = 'paused'
if state is not None:
self.xmpp_session.send_composing_indication(state)
def _NH_ChatStreamDidDeliverMessage(self, notification):
if self.started:
message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None)
if message is not None:
self.xmpp_session.send_receipt_acknowledgement(message.id)
def _NH_ChatStreamDidNotDeliverMessage(self, notification):
if self.started:
message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None)
if message is not None:
self.xmpp_session.send_error(message, 'TODO', []) # TODO
def _NH_XMPPChatSessionDidStart(self, notification):
if self.sip_session is not None:
# Session is now established on both ends
self.started = True
def _NH_XMPPChatSessionDidEnd(self, notification):
notification.center.remove_observer(self, sender=self.xmpp_session)
self.xmpp_session = None
self.end()
def _NH_XMPPChatSessionGotMessage(self, notification):
if self.sip_session is None or self.sip_session.state != 'connected':
self._xmpp_message_queue.append(notification.data.message)
return
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
message = notification.data.message
sender_uri = message.sender.uri.as_sip_uri()
del sender_uri.parameters['gr'] # no GRUU in CPIM From header
sender = CPIMIdentity(sender_uri)
self.use_receipts = message.use_receipt
if not message.use_receipt:
success_report = 'no'
failure_report = 'no'
else:
success_report = 'yes'
failure_report = 'yes'
self._pending_xmpp_stanzas[message.id] = message
# Prefer plaintext
self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report)
self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender)
def _NH_XMPPChatSessionGotComposingIndication(self, notification):
if self.sip_session is None or self.sip_session.state != 'connected':
return
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
message = notification.data.message
state = None
if message.state == 'composing':
state = 'active'
elif message.state == 'paused':
state = 'idle'
if state is not None:
sender_uri = message.sender.uri.as_sip_uri()
del sender_uri.parameters['gr'] # no GRUU in CPIM From header
sender = CPIMIdentity(sender_uri)
self.msrp_stream.send_composing_indication(state, 30, local_identity=sender)
if message.use_receipt:
self.xmpp_session.send_receipt_acknowledgement(message.id)
def _NH_XMPPChatSessionDidDeliverMessage(self, notification):
chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None)
if chunk is not None:
self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK')
def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification):
chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None)
if chunk is not None:
self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason)
def chunks(text, size):
for i in xrange(0, len(text), size):
yield text[i:i+size]
class SIPMessageError(Exception):
def __init__(self, code, reason):
Exception.__init__(self, reason)
self.code = code
self.reason = reason
class SIPMessageSender(object):
implements(IObserver)
def __init__(self, message):
# TODO: sometimes we may want to send it to the GRUU, for example when a XMPP client
# replies to one of our messages. MESSAGE requests don't need a Contact header, though
# so how should we communicate our GRUU to the recipient?
self.from_uri = message.sender.uri.as_sip_uri()
self.from_uri.parameters.pop('gr', None) # No GRUU in From header
self.to_uri = message.recipient.uri.as_sip_uri()
self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU
self.body = message.body
self.content_type = 'text/plain'
self._requests = set()
self._channel = coros.queue()
@run_in_waitable_green_thread
def send(self):
lookup = DNSLookup()
settings = SIPSimpleSettings()
account = AccountManager().sylkserver_account
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = self.to_uri
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError:
msg = 'DNS lookup error while looking for %s proxy' % uri
log.warning(msg)
raise SIPMessageError(0, msg)
else:
route = routes.pop(0)
from_header = FromHeader(self.from_uri)
to_header = ToHeader(self.to_uri)
route_header = RouteHeader(route.uri)
notification_center = NotificationCenter()
for chunk in chunks(self.body, 1000):
request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body)
notification_center.add_observer(self, sender=request)
self._requests.add(request)
request.send()
error = None
count = len(self._requests)
while count > 0:
notification = self._channel.wait()
if notification.name == 'SIPMessageDidFail':
error = (notification.data.code, notification.data.reason)
count -= 1
self._requests.clear()
if error is not None:
raise SIPMessageError(*error)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPMessageDidSucceed(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
self._channel.send(notification)
def _NH_SIPMessageDidFail(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
self._channel.send(notification)
diff --git a/sylk/applications/xmppgateway/presence.py b/sylk/applications/xmppgateway/presence.py
index ba2415c..0443eb6 100644
--- a/sylk/applications/xmppgateway/presence.py
+++ b/sylk/applications/xmppgateway/presence.py
@@ -1,504 +1,504 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
import hashlib
import random
from application.notification import IObserver, NotificationCenter
from application.python import Null, limit
from application.python.descriptor import WriteOnceAttribute
from eventlib import coros, proc
from sipsimple.account import AccountManager
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, SIPURI, SIPCoreError
from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader
from sipsimple.core import Subscription
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import pidf, rpid, caps
from sipsimple.payloads import ParserError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
from sipsimple.util import ISOTimestamp
from time import time
from twisted.internet import reactor
from zope.interface import implements
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource
from sylk.applications.xmppgateway.logger import log
from sylk.applications.xmppgateway.util import format_uri
from sylk.applications.xmppgateway.xmpp.stanzas import AvailabilityPresence
from sylk.applications.xmppgateway.xmpp.subscription import XMPPSubscription, XMPPIncomingSubscription
from sylk.configuration import SIPConfig
__all__ = ['S2XPresenceHandler', 'X2SPresenceHandler']
class S2XPresenceHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity):
self.ended = False
self._sip_subscriptions = []
self._stanza_cache = {}
self._pidf = None
self._xmpp_subscription = None
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
def start(self):
notification_center = NotificationCenter()
self._xmpp_subscription = XMPPSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.start()
notification_center.post_notification('S2XPresenceHandlerDidStart', sender=self)
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_subscription is not None:
notification_center.remove_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.end()
self._xmpp_subscription = None
while self._sip_subscriptions:
subscription = self._sip_subscriptions.pop()
notification_center.remove_observer(self, sender=subscription)
try:
subscription.end()
except SIPCoreError:
pass
self.ended = True
notification_center.post_notification('S2XPresenceHandlerDidEnd', sender=self)
def add_sip_subscription(self, subscription):
# If s subscription is received after the handle has ended but before
# S2XPresenceHandlerDidEnd has been processed we need to ignore it and wait for a retransmission
# which we will handle by creating a new S2XPresenceHandler
if self.ended:
return
self._sip_subscriptions.append(subscription)
NotificationCenter().add_observer(self, sender=subscription)
if self._xmpp_subscription.state == 'active':
pidf_doc = self._pidf
content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None
subscription.accept(content_type, pidf_doc)
else:
subscription.accept_pending()
log.msg('SIP subscription from %s to %s added to presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _build_pidf(self):
if not self._stanza_cache:
self._pidf = None
return None
pidf_doc = pidf.PIDF(str(self.xmpp_identity))
- uri = self._stanza_cache.iterkeys().next()
+ uri = next(self._stanza_cache.iterkeys())
person = pidf.Person("PID-%s" % hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest())
person.activities = rpid.Activities()
pidf_doc.add(person)
for stanza in self._stanza_cache.itervalues():
if not stanza.available:
status = pidf.Status('closed')
status.extended = 'offline'
else:
status = pidf.Status('open')
if stanza.show == 'away':
status.extended = 'away'
if 'away' not in person.activities:
person.activities.add('away')
elif stanza.show == 'xa':
status.extended = 'away'
if 'away' not in person.activities:
person.activities.add('away')
elif stanza.show == 'dnd':
status.extended = 'busy'
if 'busy' not in person.activities:
person.activities.add('busy')
else:
status.extended = 'available'
if stanza.sender.uri.resource:
resource = encode_resource(stanza.sender.uri.resource)
else:
# Workaround for clients not sending the resource under certain (unknown) circumstances
resource = hashlib.md5("%s@%s" % (uri.user, uri.host)).hexdigest()
service_id = "SID-%s" % resource
sip_uri = stanza.sender.uri.as_sip_uri()
sip_uri.parameters['gr'] = resource
sip_uri.parameters['xmpp'] = None
contact = pidf.Contact(str(sip_uri))
service = pidf.Service(service_id, status=status, contact=contact)
service.add(pidf.DeviceID(resource))
service.device_info = pidf.DeviceInfo(resource, description=stanza.sender.uri.resource)
service.timestamp = pidf.ServiceTimestamp(stanza.timestamp)
service.capabilities = caps.ServiceCapabilities(text=True, message=True)
for lang, note in stanza.statuses.iteritems():
service.notes.add(pidf.PIDFNote(note, lang=lang))
pidf_doc.add(service)
if not person.activities:
person.activities = None
self._pidf = pidf_doc.toxml()
return self._pidf
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPIncomingSubscriptionDidEnd(self, notification):
subscription = notification.sender
notification.center.remove_observer(self, sender=subscription)
self._sip_subscriptions.remove(subscription)
log.msg('SIP subscription from %s to %s removed from presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
if not self._sip_subscriptions:
self.end()
def _NH_SIPIncomingSubscriptionNotifyDidFail(self, notification):
log.msg('Sending SIP NOTIFY failed from %s to %s for presence flow 0x%x: %s (%s)' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), notification.data.code, notification.data.reason))
def _NH_SIPIncomingSubscriptionGotUnsubscribe(self, notification):
log.msg('SIP subscription from %s to %s was terminated by user for presence flow 1x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _NH_SIPIncomingSubscriptionGotRefreshingSubscribe(self, notification):
log.msg('SIP subscription from %s to %s was refreshed for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _NH_SIPIncomingSubscriptionDidTimeout(self, notification):
log.msg('SIP subscription from %s to %s timed out for presence flow 0x%x (%d subs)' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp'), id(self), len(self._sip_subscriptions)))
def _NH_XMPPSubscriptionChangedState(self, notification):
if notification.data.prev_state == 'subscribe_sent' and notification.data.state == 'active':
pidf_doc = self._pidf
content_type = pidf.PIDFDocument.content_type if pidf_doc is not None else None
for subscription in (subscription for subscription in self._sip_subscriptions if subscription.state == 'pending'):
subscription.accept(content_type, pidf_doc)
def _NH_XMPPSubscriptionGotNotify(self, notification):
stanza = notification.data.presence
self._stanza_cache[stanza.sender.uri] = stanza
stanza.timestamp = ISOTimestamp.now() # TODO: mirror the one in the stanza, if present
pidf_doc = self._build_pidf()
log.msg('XMPP NOTIFY from %s to %s for presence flow 0x%x' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self)))
for subscription in self._sip_subscriptions:
try:
subscription.push_content(pidf.PIDFDocument.content_type, pidf_doc)
except SIPCoreError, e:
log.msg('Failed to send SIP NOTIFY from %s to %s for presence flow 0x%x: %s' % (format_uri(self.xmpp_identity.uri, 'xmpp'), format_uri(self.sip_identity.uri, 'sip'), id(self), e))
if not stanza.available:
# Only inform once about this device being unavailable
del self._stanza_cache[stanza.sender.uri]
def _NH_XMPPSubscriptionDidFail(self, notification):
notification.center.remove_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription = None
self.end()
_NH_XMPPSubscriptionDidEnd = _NH_XMPPSubscriptionDidFail
class InterruptSubscription(Exception): pass
class TerminateSubscription(Exception): pass
class SubscriptionError(Exception):
def __init__(self, error, timeout, refresh_interval=None, fatal=False):
self.error = error
self.refresh_interval = refresh_interval
self.timeout = timeout
self.fatal = fatal
class SIPSubscriptionDidFail(Exception):
def __init__(self, data):
self.data = data
class X2SPresenceHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity):
self.ended = False
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._sip_subscription = None
self._sip_subscription_proc = None
self._sip_subscription_timer = None
self._xmpp_subscription = None
def start(self):
notification_center = NotificationCenter()
self._xmpp_subscription = XMPPIncomingSubscription(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.start()
self._command_proc = proc.spawn(self._run)
self._subscribe_sip()
notification_center.post_notification('X2SPresenceHandlerDidStart', sender=self)
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_subscription is not None:
notification_center.remove_observer(self, sender=self._xmpp_subscription)
self._xmpp_subscription.end()
self._xmpp_subscription = None
if self._sip_subscription:
self._unsubscribe_sip()
self.ended = True
notification_center.post_notification('X2SPresenceHandlerDidEnd', sender=self)
@run_in_green_thread
def _subscribe_sip(self):
command = Command('subscribe')
self._command_channel.send(command)
@run_in_green_thread
def _unsubscribe_sip(self):
command = Command('unsubscribe')
self._command_channel.send(command)
command.wait()
self._command_proc.kill()
self._command_proc = None
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _CH_subscribe(self, command):
if self._sip_subscription_timer is not None and self._sip_subscription_timer.active():
self._sip_subscription_timer.cancel()
self._sip_subscription_timer = None
if self._sip_subscription_proc is not None:
subscription_proc = self._sip_subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._sip_subscription_proc = proc.spawn(self._sip_subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._sip_subscription_timer is not None and self._sip_subscription_timer.active():
self._sip_subscription_timer.cancel()
self._sip_subscription_timer = None
if self._sip_subscription_proc is not None:
subscription_proc = self._sip_subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._sip_subscription_proc = None
command.signal()
def _process_pidf(self, body):
try:
pidf_doc = pidf.PIDF.parse(body)
except ParserError, e:
log.warn('Error parsing PIDF document: %s' % e)
return
# Build XML stanzas out of PIDF documents
try:
- person = (p for p in pidf_doc.persons).next()
+ person = next(p for p in pidf_doc.persons)
except StopIteration:
person = None
for service in pidf_doc.services:
sip_contact = self.sip_identity.uri.as_sip_uri()
if service.device_info is not None:
sip_contact.parameters['gr'] = 'urn:uuid:%s' % service.device_info.id
else:
sip_contact.parameters['gr'] = service.id
sender = Identity(FrozenURI.parse(sip_contact))
if service.status.extended is not None:
available = service.status.extended != 'offline'
else:
available = service.status.basic == 'open'
stanza = AvailabilityPresence(sender, self.xmpp_identity, available)
for note in service.notes:
stanza.statuses[note.lang] = note
if service.status.extended is not None:
if service.status.extended == 'away':
stanza.show = 'away'
elif service.status.extended == 'busy':
stanza.show = 'dnd'
elif person is not None and person.activities is not None:
activities = set(list(person.activities))
if 'away' in activities:
stanza.show = 'away'
elif set(('holiday', 'vacation')).intersection(activities):
stanza.show = 'xa'
elif 'busy' in activities:
stanza.show = 'dnd'
self._xmpp_subscription.send_presence(stanza)
def _sip_subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
account = AccountManager().sylkserver_account
refresh_interval = getattr(command, 'refresh_interval', None) or account.sip.subscribe_interval
try:
# Lookup routes
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = SIPURI(host=self.sip_identity.uri.as_sip_uri().host)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError, e:
timeout = random.uniform(15, 30)
raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout)
timeout = time() + 30
for route in routes:
remaining_time = timeout - time()
if remaining_time > 0:
transport = route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
subscription_uri = self.sip_identity.uri.as_sip_uri()
subscription = Subscription(subscription_uri, FromHeader(self.xmpp_identity.uri.as_sip_uri()),
ToHeader(subscription_uri),
ContactHeader(contact_uri),
'presence',
RouteHeader(route.uri),
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=subscription)
raise SubscriptionError(error='Internal error', timeout=5)
self._sip_subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart':
break
except SIPSubscriptionDidFail, e:
notification_center.remove_observer(self, sender=subscription)
self._sip_subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
raise SubscriptionError(error='Authentication failed', timeout=random.uniform(60, 120))
elif e.data.code == 403:
# Forbidden
raise SubscriptionError(error='Forbidden', timeout=None, fatal=True)
elif e.data.code == 423:
# Get the value of the Min-Expires header
if e.data.min_expires is not None and e.data.min_expires > refresh_interval:
interval = e.data.min_expires
else:
interval = None
raise SubscriptionError(error='Interval too short', timeout=random.uniform(60, 120), refresh_interval=interval)
elif e.data.code in (405, 406, 489):
raise SubscriptionError(error='Method or event not supported', timeout=None, fatal=True)
elif e.data.code == 1400:
raise SubscriptionError(error=e.data.reason, timeout=None, fatal=True)
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, give up
raise SubscriptionError(error='No more routes to try', timeout=None, fatal=True)
# At this point it is subscribed. Handle notifications and ending/failures.
try:
while True:
notification = self._data_channel.wait()
if notification.sender is not self._sip_subscription:
continue
if self._xmpp_subscription is None:
continue
if notification.name == 'SIPSubscriptionGotNotify':
if notification.data.event == 'presence':
subscription_state = notification.data.headers.get('Subscription-State').state
if subscription_state == 'active' and self._xmpp_subscription.state != 'active':
self._xmpp_subscription.accept()
elif subscription_state == 'pending' and self._xmpp_subscription.state == 'active':
# The state went from active to pending, hide the presence state?
pass
if notification.data.body:
log.msg('SIP NOTIFY from %s to %s' % (format_uri(self.sip_identity.uri, 'sip'), format_uri(self.xmpp_identity.uri, 'xmpp')))
self._process_pidf(notification.data.body)
elif notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail, e:
if e.data.code == 0 and e.data.reason == 'rejected':
self._xmpp_subscription.reject()
else:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._sip_subscription)
except InterruptSubscription, e:
if not self.subscribed:
command.signal(e)
if self._sip_subscription is not None:
notification_center.remove_observer(self, sender=self._sip_subscription)
try:
self._sip_subscription.end(timeout=2)
except SIPCoreError:
pass
except TerminateSubscription, e:
if not self.subscribed:
command.signal(e)
if self._sip_subscription is not None:
try:
self._sip_subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.sender is self._sip_subscription and notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._sip_subscription)
except SubscriptionError, e:
if not e.fatal:
self._sip_subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, Command('subscribe', command.event, refresh_interval=e.refresh_interval))
finally:
self.subscribed = False
self._sip_subscription = None
self._sip_subscription_proc = None
reactor.callLater(0, self.end)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
self._data_channel.send(notification)
def _NH_XMPPIncomingSubscriptionGotUnsubscribe(self, notification):
self.end()
def _NH_XMPPIncomingSubscriptionGotSubscribe(self, notification):
if self._sip_subscription is not None and self._sip_subscription.state.lower() == 'active':
self._xmpp_subscription.accept()
_NH_XMPPIncomingSubscriptionGotProbe = _NH_XMPPIncomingSubscriptionGotSubscribe
diff --git a/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py b/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py
index bccb47a..d815987 100644
--- a/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py
+++ b/sylk/applications/xmppgateway/xmpp/stanzas/__init__.py
@@ -1,285 +1,285 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
import hashlib
from twisted.words.xish import domish
from sylk import __version__ as SYLK_VERSION
from sylk.applications.xmppgateway.util import html2text
CHATSTATES_NS = 'http://jabber.org/protocol/chatstates'
RECEIPTS_NS = 'urn:xmpp:receipts'
STANZAS_NS = 'urn:ietf:params:xml:ns:xmpp-stanzas'
XML_NS = 'http://www.w3.org/XML/1998/namespace'
MUC_NS = 'http://jabber.org/protocol/muc'
MUC_USER_NS = MUC_NS + '#user'
CAPS_NS = 'http://jabber.org/protocol/caps'
SYLK_HASH = hashlib.sha1('SylkServer-%s' % SYLK_VERSION).hexdigest()
SYLK_CAPS = []
class BaseStanza(object):
stanza_type = None # to be defined by subclasses
type = None
def __init__(self, sender, recipient, id=None):
self.sender = sender
self.recipient = recipient
self.id = id
def to_xml_element(self):
xml_element = domish.Element((None, self.stanza_type))
xml_element['from'] = unicode(self.sender.uri.as_xmpp_jid())
xml_element['to'] = unicode(self.recipient.uri.as_xmpp_jid())
if self.type:
xml_element['type'] = self.type
if self.id is not None:
xml_element['id'] = self.id
return xml_element
class ErrorStanza(object):
"""
Stanza representing an error of another stanza. It's not a base stanza type on its own.
"""
def __init__(self, stanza_type, sender, recipient, error_type, conditions, id=None):
self.stanza_type = stanza_type
self.sender = sender
self.recipient = recipient
self.id = id
self.conditions = conditions
self.error_type = error_type
@classmethod
def from_stanza(cls, stanza, error_type, conditions):
# In error stanzas sender and recipient are swapped
return cls(stanza.stanza_type, stanza.recipient, stanza.sender, error_type, conditions, id=stanza.id)
def to_xml_element(self):
xml_element = domish.Element((None, self.stanza_type))
xml_element['from'] = unicode(self.sender.uri.as_xmpp_jid())
xml_element['to'] = unicode(self.recipient.uri.as_xmpp_jid())
xml_element['type'] = 'error'
if self.id is not None:
xml_element['id'] = self.id
error_element = domish.Element((None, 'error'))
error_element['type'] = self.error_type
[error_element.addChild(domish.Element((ns, condition))) for condition, ns in self.conditions]
xml_element.addChild(error_element)
return xml_element
class BaseMessageStanza(BaseStanza):
stanza_type = 'message'
def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=False):
super(BaseMessageStanza, self).__init__(sender, recipient, id=id)
self.use_receipt = use_receipt
if body is not None and html_body is None:
self.body = body
self.html_body = None
elif body is None and html_body is not None:
self.body = html2text(html_body)
self.html_body = html_body
else:
self.body = body
self.html_body = html_body
def to_xml_element(self):
xml_element = super(BaseMessageStanza, self).to_xml_element()
if self.id is not None and self.recipient.uri.resource is not None and self.use_receipt:
xml_element.addElement('request', defaultUri=RECEIPTS_NS)
if self.body is not None:
xml_element.addElement('body', content=self.body)
if self.html_body is not None:
xml_element.addElement('html', content=self.html_body)
return xml_element
class NormalMessage(BaseMessageStanza):
def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=False):
if body is None and html_body is None:
raise ValueError('either body or html_body need to be set')
super(NormalMessage, self).__init__(sender, recipient, body, html_body, id, use_receipt)
class ChatMessage(BaseMessageStanza):
type = 'chat'
def __init__(self, sender, recipient, body=None, html_body=None, id=None, use_receipt=True):
if body is None and html_body is None:
raise ValueError('either body or html_body need to be set')
super(ChatMessage, self).__init__(sender, recipient, body, html_body, id, use_receipt)
def to_xml_element(self):
xml_element = super(ChatMessage, self).to_xml_element()
xml_element.addElement('active', defaultUri=CHATSTATES_NS)
return xml_element
class ChatComposingIndication(BaseMessageStanza):
type = 'chat'
def __init__(self, sender, recipient, state, id=None, use_receipt=False):
super(ChatComposingIndication, self).__init__(sender, recipient, id=id, use_receipt=use_receipt)
self.state = state
def to_xml_element(self):
xml_element = super(ChatComposingIndication, self).to_xml_element()
xml_element.addElement(self.state, defaultUri=CHATSTATES_NS)
return xml_element
class GroupChatMessage(BaseMessageStanza):
type = 'groupchat'
def __init__(self, sender, recipient, body=None, html_body=None, id=None):
# TODO: add timestamp
if body is None and html_body is None:
raise ValueError('either body or html_body need to be set')
super(GroupChatMessage, self).__init__(sender, recipient, body, html_body, id, False)
class MessageReceipt(BaseMessageStanza):
def __init__(self, sender, recipient, receipt_id, id=None):
super(MessageReceipt, self).__init__(sender, recipient, id=id, use_receipt=False)
self.receipt_id = receipt_id
def to_xml_element(self):
xml_element = super(MessageReceipt, self).to_xml_element()
receipt_element = domish.Element((RECEIPTS_NS, 'received'))
receipt_element['id'] = self.receipt_id
xml_element.addChild(receipt_element)
return xml_element
class BasePresenceStanza(BaseStanza):
stanza_type = 'presence'
class IncomingInvitationMessage(BaseMessageStanza):
def __init__(self, sender, recipient, invited_user, reason=None, id=None):
super(IncomingInvitationMessage, self).__init__(sender, recipient, body=None, html_body=None, id=id, use_receipt=False)
self.invited_user = invited_user
self.reason = reason
def to_xml_element(self):
xml_element = super(IncomingInvitationMessage, self).to_xml_element()
child = xml_element.addElement((MUC_USER_NS, 'x'))
child.addElement('invite')
child.invite['to'] = unicode(self.invited_user.uri.as_xmpp_jid())
if self.reason:
child.invite.addElement('reason', content=self.reason)
return xml_element
class OutgoingInvitationMessage(BaseMessageStanza):
def __init__(self, sender, recipient, originator, reason=None, id=None):
super(OutgoingInvitationMessage, self).__init__(sender, recipient, body=None, html_body=None, id=id, use_receipt=False)
self.originator = originator
self.reason = reason
def to_xml_element(self):
xml_element = super(OutgoingInvitationMessage, self).to_xml_element()
child = xml_element.addElement((MUC_USER_NS, 'x'))
child.addElement('invite')
child.invite['from'] = unicode(self.originator.uri.as_xmpp_jid())
if self.reason:
child.invite.addElement('reason', content=self.reason)
return xml_element
class AvailabilityPresence(BasePresenceStanza):
def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None):
super(AvailabilityPresence, self).__init__(sender, recipient, id=id)
self.available = available
self.show = show
self.priority = priority
self.statuses = statuses or {}
def _get_available(self):
return self.__dict__['available']
def _set_available(self, available):
if available:
self.type = None
else:
self.type = 'unavailable'
self.__dict__['available'] = available
available = property(_get_available, _set_available)
del _get_available, _set_available
@property
def status(self):
status = self.statuses.get(None)
if status is None:
try:
- status = self.statuses.itervalues().next()
+ status = next(self.statuses.itervalues())
except StopIteration:
pass
return status
def to_xml_element(self):
xml_element = super(BasePresenceStanza, self).to_xml_element()
if self.available:
if self.show is not None:
xml_element.addElement('show', content=self.show)
if self.priority != 0:
xml_element.addElement('priority', content=unicode(self.priority))
caps = xml_element.addElement('c', defaultUri=CAPS_NS)
caps['node'] = 'http://sylkserver.com'
caps['hash'] = 'sha-1'
caps['ver'] = SYLK_HASH
if SYLK_CAPS:
caps['ext'] = ' '.join(SYLK_CAPS)
for lang, text in self.statuses.iteritems():
status = xml_element.addElement('status', content=text)
if lang:
status[(XML_NS, 'lang')] = lang
return xml_element
class SubscriptionPresence(BasePresenceStanza):
def __init__(self, sender, recipient, type, id=None):
super(SubscriptionPresence, self).__init__(sender, recipient, id=id)
self.type = type
class ProbePresence(BasePresenceStanza):
type = 'probe'
class MUCAvailabilityPresence(AvailabilityPresence):
def __init__(self, sender, recipient, available=True, show=None, statuses=None, priority=0, id=None, affiliation=None, jid=None, role=None, muc_statuses=None):
super(MUCAvailabilityPresence, self).__init__(sender, recipient, available, show, statuses, priority, id)
self.affiliation = affiliation or 'member'
self.role = role or 'participant'
self.muc_statuses = muc_statuses or []
self.jid = jid
def to_xml_element(self):
xml_element = super(MUCAvailabilityPresence, self).to_xml_element()
muc = xml_element.addElement('x', defaultUri=MUC_USER_NS)
item = muc.addElement('item')
if self.affiliation:
item['affiliation'] = self.affiliation
if self.role:
item['role'] = self.role
if self.jid:
item['jid'] = unicode(self.jid.uri.as_xmpp_jid())
for code in self.muc_statuses:
status = muc.addElement('status')
status['code'] = code
return xml_element
class MUCErrorPresence(ErrorStanza):
def to_xml_element(self):
xml_element = super(MUCErrorPresence, self).to_xml_element()
xml_element.addElement('x', defaultUri=MUC_USER_NS)
return xml_element
diff --git a/sylk/session.py b/sylk/session.py
index e8bfcd7..3b5300f 100644
--- a/sylk/session.py
+++ b/sylk/session.py
@@ -1,626 +1,626 @@
# Copyright (C) 2011 AG Projects. See LICENSE for details.
#
from __future__ import with_statement
import random
from datetime import datetime
from time import time
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.types import Singleton
from eventlib import api, coros, proc
from sipsimple.account import AccountManager
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, Invitation, Subscription, SIPCoreError, sip_status_messages
from sipsimple.core import ContactHeader, RouteHeader, SubjectHeader, FromHeader, ToHeader
from sipsimple.core import SIPURI, SDPConnection, SDPSession
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import ParserError
from sipsimple.payloads.conference import ConferenceDocument
from sipsimple.session import Session as _Session
from sipsimple.session import SessionReplaceHandler, TransferHandler, DialogID, TransferInfo
from sipsimple.session import InvitationDisconnectedError, MediaStreamDidFailError, InterruptSubscription, TerminateSubscription, SubscriptionError, SIPSubscriptionDidFail
from sipsimple.session import transition_state
from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import Command, run_in_green_thread
from twisted.internet import reactor
from zope.interface import implements
from sylk.configuration import SIPConfig
class ConferenceHandler(object):
implements(IObserver)
def __init__(self, session):
self.session = session
self.active = False
self.subscribed = False
self._command_proc = None
self._command_channel = coros.queue()
self._data_channel = coros.queue()
self._subscription = None
self._subscription_proc = None
self._subscription_timer = None
self._wakeup_timer = None
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, name='DNSNameserversDidChange')
notification_center.add_observer(self, name='SystemIPAddressDidChange')
notification_center.add_observer(self, name='SystemDidWakeUpFromSleep')
self._command_proc = proc.spawn(self._run)
def _run(self):
while True:
command = self._command_channel.wait()
handler = getattr(self, '_CH_%s' % command.name)
handler(command)
def _activate(self):
self.active = True
command = Command('subscribe')
self._command_channel.send(command)
return command
def _deactivate(self):
self.active = False
command = Command('unsubscribe')
self._command_channel.send(command)
return command
def _resubscribe(self):
command = Command('subscribe')
self._command_channel.send(command)
return command
def _terminate(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.session)
notification_center.remove_observer(self, name='DNSNameserversDidChange')
notification_center.remove_observer(self, name='SystemIPAddressDidChange')
notification_center.remove_observer(self, name='SystemDidWakeUpFromSleep')
self._deactivate()
command = Command('terminate')
self._command_channel.send(command)
command.wait()
self.session = None
def _CH_subscribe(self, command):
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(InterruptSubscription)
subscription_proc.wait()
self._subscription_proc = proc.spawn(self._subscription_handler, command)
def _CH_unsubscribe(self, command):
# Cancel any timer which would restart the subscription process
if self._subscription_timer is not None and self._subscription_timer.active():
self._subscription_timer.cancel()
self._subscription_timer = None
if self._wakeup_timer is not None and self._wakeup_timer.active():
self._wakeup_timer.cancel()
self._wakeup_timer = None
if self._subscription_proc is not None:
subscription_proc = self._subscription_proc
subscription_proc.kill(TerminateSubscription)
subscription_proc.wait()
self._subscription_proc = None
command.signal()
def _CH_terminate(self, command):
command.signal()
raise proc.ProcExit()
def _subscription_handler(self, command):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
try:
# Lookup routes
account = self.session.account
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = SIPURI.new(self.session.remote_identity.uri)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError, e:
timeout = random.uniform(15, 30)
raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout)
target_uri = SIPURI.new(self.session.remote_identity.uri)
refresh_interval = getattr(command, 'refresh_interval', account.sip.subscribe_interval)
timeout = time() + 30
for route in routes:
remaining_time = timeout - time()
if remaining_time > 0:
transport = route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
subscription = Subscription(target_uri, FromHeader(SIPURI.new(self.session.local_identity.uri)),
ToHeader(target_uri),
ContactHeader(contact_uri),
'conference',
RouteHeader(route.uri),
credentials=account.credentials,
refresh=refresh_interval)
notification_center.add_observer(self, sender=subscription)
try:
subscription.subscribe(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=subscription)
timeout = 5
raise SubscriptionError(error='Internal error', timeout=timeout)
self._subscription = subscription
try:
while True:
notification = self._data_channel.wait()
if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart':
break
except SIPSubscriptionDidFail, e:
notification_center.remove_observer(self, sender=subscription)
self._subscription = None
if e.data.code == 407:
# Authentication failed, so retry the subscription in some time
timeout = random.uniform(60, 120)
raise SubscriptionError(error='Authentication failed', timeout=timeout)
elif e.data.code == 423:
# Get the value of the Min-Expires header
timeout = random.uniform(60, 120)
if e.data.min_expires is not None and e.data.min_expires > refresh_interval:
raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires)
else:
raise SubscriptionError(error='Interval too short', timeout=timeout)
elif e.data.code in (405, 406, 489, 1400):
command.signal(e)
return
else:
# Otherwise just try the next route
continue
else:
self.subscribed = True
command.signal()
break
else:
# There are no more routes to try, reschedule the subscription
timeout = random.uniform(60, 180)
raise SubscriptionError(error='No more routes to try', timeout=timeout)
# At this point it is subscribed. Handle notifications and ending/failures.
try:
while True:
notification = self._data_channel.wait()
if notification.sender is not self._subscription:
continue
if notification.name == 'SIPSubscriptionGotNotify':
if notification.data.event == 'conference' and notification.data.body:
try:
conference_info = ConferenceDocument.parse(notification.data.body)
except ParserError:
pass
else:
notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info))
elif notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
self._command_channel.send(Command('subscribe'))
notification_center.remove_observer(self, sender=self._subscription)
except InterruptSubscription, e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
notification_center.remove_observer(self, sender=self._subscription)
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
except TerminateSubscription, e:
if not self.subscribed:
command.signal(e)
if self._subscription is not None:
try:
self._subscription.end(timeout=2)
except SIPCoreError:
pass
else:
try:
while True:
notification = self._data_channel.wait()
if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd':
break
except SIPSubscriptionDidFail:
pass
finally:
notification_center.remove_observer(self, sender=self._subscription)
except SubscriptionError, e:
if 'min_expires' in e.attributes:
command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires'])
else:
command = Command('subscribe', command.event)
self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command)
finally:
self.subscribed = False
self._subscription = None
self._subscription_proc = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSubscriptionDidStart(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidEnd(self, notification):
self._data_channel.send(notification)
def _NH_SIPSubscriptionDidFail(self, notification):
self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data))
def _NH_SIPSubscriptionGotNotify(self, notification):
self._data_channel.send(notification)
def _NH_SIPSessionDidStart(self, notification):
if self.session.remote_focus:
self._activate()
@run_in_green_thread
def _NH_SIPSessionDidFail(self, notification):
self._terminate()
@run_in_green_thread
def _NH_SIPSessionDidEnd(self, notification):
self._terminate()
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
if self.session.remote_focus and not self.active:
self._activate()
elif not self.session.remote_focus and self.active:
self._deactivate()
def _NH_DNSNameserversDidChange(self, notification):
if self.active:
self._resubscribe()
def _NH_SystemIPAddressDidChange(self, notification):
if self.active:
self._resubscribe()
def _NH_SystemDidWakeUpFromSleep(self, notification):
if self._wakeup_timer is None:
def wakeup_action():
if self.active:
self._resubscribe()
self._wakeup_timer = None
self._wakeup_timer = reactor.callLater(5, wakeup_action) # wait for system to stabilize
class Session(_Session):
def init_incoming(self, invitation, data):
remote_sdp = invitation.sdp.proposed_remote
self.proposed_streams = []
if remote_sdp:
for index, media_stream in enumerate(remote_sdp.media):
if media_stream.port != 0:
for stream_type in MediaStreamRegistry():
try:
stream = stream_type.new_from_sdp(self, remote_sdp, index)
except InvalidStreamError:
break
except UnknownStreamError:
continue
else:
stream.index = index
self.proposed_streams.append(stream)
break
if not self.proposed_streams:
invitation.send_response(488)
return
self.direction = 'incoming'
self.state = 'incoming'
self.transport = invitation.transport
self._invitation = invitation
self.conference = ConferenceHandler(self)
self.transfer_handler = TransferHandler(self)
if 'isfocus' in invitation.remote_contact_header.parameters:
self.remote_focus = True
try:
self.__dict__['subject'] = data.headers['Subject'].subject
except KeyError:
pass
if 'Referred-By' in data.headers or 'Replaces' in data.headers:
self.transfer_info = TransferInfo()
if 'Referred-By' in data.headers:
self.transfer_info.referred_by = data.headers['Referred-By'].body
if 'Replaces' in data.headers:
replaces_header = data.headers.get('Replaces')
replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=replaces_header.from_tag)
session_manager = SessionManager()
try:
- self.replaced_session = (session for session in session_manager.sessions if session._invitation is not None and session._invitation.dialog_id == replaced_dialog_id).next()
+ self.replaced_session = next(session for session in session_manager.sessions if session._invitation is not None and session._invitation.dialog_id == replaced_dialog_id)
except StopIteration:
invitation.send_response(481)
return
else:
self.transfer_info.replaced_dialog_id = replaced_dialog_id
replace_handler = SessionReplaceHandler(self)
replace_handler.start()
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=invitation)
notification_center.post_notification('SIPSessionNewIncoming', self, NotificationData(streams=self.proposed_streams, headers=data.headers))
@transition_state(None, 'connecting')
@run_in_green_thread
def connect(self, from_header, to_header, routes, streams, contact_header=None, is_focus=False, subject=None, extra_headers=[]):
self.greenlet = api.getcurrent()
settings = SIPSimpleSettings()
connected = False
received_code = 0
received_reason = None
unhandled_notifications = []
self.direction = 'outgoing'
self.proposed_streams = streams
self.route = routes[0]
self.transport = self.route.transport
self.local_focus = is_focus
self._invitation = Invitation()
self._local_identity = from_header
self._remote_identity = to_header
self.conference = ConferenceHandler(self)
self.transfer_handler = Null
self.__dict__['subject'] = subject
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._invitation)
notification_center.post_notification('SIPSessionNewOutgoing', self, NotificationData(streams=streams))
for stream in self.proposed_streams:
notification_center.add_observer(self, sender=stream)
stream.initialize(self, direction='outgoing')
try:
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidInitialize':
wait_count -= 1
if contact_header is None:
try:
contact_uri = self.account.contact[self.route]
except KeyError, e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e))
return
else:
contact_header = ContactHeader(contact_uri)
local_ip = contact_header.uri.host
local_sdp = SDPSession(local_ip, connection=SDPConnection(local_ip), name=settings.user_agent)
stun_addresses = []
for index, stream in enumerate(self.proposed_streams):
stream.index = index
media = stream.get_local_media(for_offer=True)
local_sdp.media.append(media)
stun_addresses.extend((value.split(' ', 5)[4] for value in media.attributes.getall('candidate') if value.startswith('S ')))
if stun_addresses:
local_sdp.connection.address = stun_addresses[0]
route_header = RouteHeader(self.route.uri)
if is_focus:
contact_header.parameters['isfocus'] = None
if self.subject:
extra_headers.append(SubjectHeader(self.subject))
self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, extra_headers=extra_headers)
try:
with api.timeout(settings.sip.invite_timeout):
while True:
notification = self._channel.wait()
if notification.name == 'SIPInvitationGotSDPUpdate':
if notification.data.succeeded:
local_sdp = notification.data.local_sdp
remote_sdp = notification.data.remote_sdp
break
else:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='remote', code=received_code, reason=received_reason, error='SDP negotiation failed: %s' % notification.data.error)
return
elif notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self, )
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connecting':
received_code = notification.data.code
received_reason = notification.data.reason
elif notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason))
else:
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except api.TimeoutError:
self.greenlet = None
self.end()
return
notification_center.post_notification('SIPSessionWillStart', self)
stream_map = dict((stream.index, stream) for stream in self.proposed_streams)
for index, local_media in enumerate(local_sdp.media):
remote_media = remote_sdp.media[index]
stream = stream_map[index]
if remote_media.port:
stream.start(local_sdp, remote_sdp, index)
else:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)]
for stream in removed_streams:
notification_center.remove_observer(self, sender=stream)
self.proposed_streams.remove(stream)
del stream_map[stream.index]
stream.deactivate()
stream.end()
invitation_notifications = []
with api.timeout(self.media_stream_timeout):
wait_count = len(self.proposed_streams)
while wait_count > 0:
notification = self._channel.wait()
if notification.name == 'MediaStreamDidStart':
wait_count -= 1
elif notification.name == 'SIPInvitationChangedState':
invitation_notifications.append(notification)
[self._channel.send(notification) for notification in invitation_notifications]
while not connected or self._channel:
notification = self._channel.wait()
if notification.name == 'SIPInvitationChangedState':
if notification.data.state == 'early':
if notification.data.code == 180:
notification_center.post_notification('SIPSessionGotRingIndication', self)
notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason))
elif notification.data.state == 'connecting':
received_code = notification.data.code
received_reason = notification.data.reason
elif notification.data.state == 'connected':
if not connected:
connected = True
notification_center.post_notification('SIPSessionDidProcessTransaction', self,
NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason))
else:
unhandled_notifications.append(notification)
elif notification.data.state == 'disconnected':
raise InvitationDisconnectedError(notification.sender, notification.data)
except (MediaStreamDidFailError, api.TimeoutError), e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
if isinstance(e, api.TimeoutError):
error = 'media stream timed out while starting'
else:
error = 'media stream failed: %s' % e.data.reason
self._fail(originator='local', code=received_code, reason=received_reason, error=error)
except InvitationDisconnectedError, e:
notification_center.remove_observer(self, sender=self._invitation)
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self.state = 'terminated'
# As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE
if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE':
notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator))
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200]))
self.end_time = datetime.now()
notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason))
else:
if e.data.originator == 'remote':
notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason))
if e.data.originator == 'remote':
code = e.data.code
reason = e.data.reason
elif e.data.disconnect_reason == 'timeout':
code = 408
reason = 'timeout'
else:
code = 0
reason = None
if e.data.originator == 'remote' and code // 100 == 3:
redirect_identities = e.data.headers.get('Contact', [])
else:
redirect_identities = None
notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities))
self.greenlet = None
except SIPCoreError, e:
for stream in self.proposed_streams:
notification_center.remove_observer(self, sender=stream)
stream.deactivate()
stream.end()
self._fail(originator='local', code=received_code, reason=received_reason, error='SIP core error: %s' % str(e))
else:
self.greenlet = None
self.state = 'connected'
self.streams = self.proposed_streams
self.proposed_streams = None
self.start_time = datetime.now()
notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams))
for notification in unhandled_notifications:
self.handle_notification(notification)
if self._hold_in_progress:
self._send_hold()
class SessionManager(object):
__metaclass__ = Singleton
implements(IObserver)
def __init__(self):
self.sessions = []
self.state = None
self._channel = coros.queue()
def start(self):
self.state = 'starting'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillStart', sender=self)
notification_center.add_observer(self, 'SIPInvitationChangedState')
notification_center.add_observer(self, 'SIPSessionNewIncoming')
notification_center.add_observer(self, 'SIPSessionNewOutgoing')
notification_center.add_observer(self, 'SIPSessionDidFail')
notification_center.add_observer(self, 'SIPSessionDidEnd')
self.state = 'started'
notification_center.post_notification('SIPSessionManagerDidStart', sender=self)
def stop(self):
self.state = 'stopping'
notification_center = NotificationCenter()
notification_center.post_notification('SIPSessionManagerWillEnd', sender=self)
for session in self.sessions:
session.end()
while self.sessions:
self._channel.wait()
notification_center.remove_observer(self, 'SIPInvitationChangedState')
notification_center.remove_observer(self, 'SIPSessionNewIncoming')
notification_center.remove_observer(self, 'SIPSessionNewOutgoing')
notification_center.remove_observer(self, 'SIPSessionDidFail')
notification_center.remove_observer(self, 'SIPSessionDidEnd')
self.state = 'stopped'
notification_center.post_notification('SIPSessionManagerDidEnd', sender=self)
@run_in_twisted_thread
def handle_notification(self, notification):
if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming':
account = AccountManager().sylkserver_account
notification.sender.send_response(100)
session = Session(account)
session.init_incoming(notification.sender, notification.data)
elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'):
self.sessions.append(notification.sender)
elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'):
self.sessions.remove(notification.sender)
if self.state == 'stopping':
self._channel.send(notification)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 4:05 AM (20 h, 44 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408793
Default Alt Text
(207 KB)

Event Timeline