Page MenuHomePhabricator

No OneTemporary

diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py
index 5dda11b..afc6eb3 100644
--- a/sylk/applications/conference/__init__.py
+++ b/sylk/applications/conference/__init__.py
@@ -1,438 +1,438 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details
#
import mimetypes
import os
import re
import shutil
from application.notification import IObserver, NotificationCenter
from application.python import Null
from gnutls.interfaces.twisted import X509Credentials
from sipsimple.account.bonjour import BonjourPresenceState
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, SIPURI, SIPCoreError
from sipsimple.core import Header, ContactHeader, FromHeader, ToHeader, SubjectHeader
from sipsimple.lookup import DNSLookup
from sipsimple.session import IllegalStateError
from sipsimple.streams import AudioStream
from sipsimple.threading.green import run_in_green_thread
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications import SylkApplication
from sylk.applications.conference.configuration import get_room_config, ConferenceConfig
from sylk.applications.conference.logger import log
from sylk.applications.conference.room import Room
from sylk.applications.conference.web import ScreenSharingWebServer
from sylk.bonjour import BonjourServices
from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig
-from sylk.extensions import ChatStream
from sylk.session import Session
+from sylk.streams import ChatStream
from sylk.tls import Certificate, PrivateKey
class ACLValidationError(Exception): pass
class RoomNotFoundError(Exception): pass
class ConferenceApplication(SylkApplication):
implements(IObserver)
def __init__(self):
self._rooms = {}
self.invited_participants_map = {}
self.bonjour_focus_service = Null
self.bonjour_room_service = Null
self.screen_sharing_web_server = None
def start(self):
# cleanup old files
for path in (ConferenceConfig.file_transfer_dir, ConferenceConfig.screen_sharing_dir):
try:
shutil.rmtree(path)
except EnvironmentError:
pass
if ServerConfig.enable_bonjour and ServerConfig.default_application == 'conference':
self.bonjour_focus_service = BonjourServices(service='sipfocus')
self.bonjour_focus_service.start()
log.msg("Bonjour publication started for service 'sipfocus'")
self.bonjour_room_service = BonjourServices(service='sipuri', name='Conference Room', uri_user='conference')
self.bonjour_room_service.start()
self.bonjour_room_service.presence_state = BonjourPresenceState('available', u'No participants')
log.msg("Bonjour publication started for service 'sipuri'")
self.screen_sharing_web_server = ScreenSharingWebServer(ConferenceConfig.screen_sharing_dir)
if ConferenceConfig.screen_sharing_use_https and ConferenceConfig.screen_sharing_certificate is not None:
cert = Certificate(ConferenceConfig.screen_sharing_certificate.normalized)
key = PrivateKey(ConferenceConfig.screen_sharing_certificate.normalized)
credentials = X509Credentials(cert, key)
else:
credentials = None
self.screen_sharing_web_server.start(ConferenceConfig.screen_sharing_ip, ConferenceConfig.screen_sharing_port, credentials)
listen_address = self.screen_sharing_web_server.listener.getHost()
log.msg("ScreenSharing listener started on %s:%d" % (listen_address.host, listen_address.port))
def stop(self):
self.bonjour_focus_service.stop()
self.bonjour_room_service.stop()
self.screen_sharing_web_server.stop()
def get_room(self, uri, create=False):
room_uri = '%s@%s' % (uri.user, uri.host)
try:
room = self._rooms[room_uri]
except KeyError:
if create:
room = Room(room_uri)
self._rooms[room_uri] = room
return room
else:
raise RoomNotFoundError
else:
return room
def remove_room(self, uri):
room_uri = '%s@%s' % (uri.user, uri.host)
self._rooms.pop(room_uri, None)
def validate_acl(self, room_uri, from_uri):
room_uri = '%s@%s' % (room_uri.user, room_uri.host)
cfg = get_room_config(room_uri)
if cfg.access_policy == 'allow,deny':
if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri):
return
raise ACLValidationError
else:
if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri):
raise ACLValidationError
def incoming_session(self, session):
log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri))
audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat']
transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer']
if not audio_streams and not chat_streams and not transfer_streams:
log.msg(u'Session rejected: invalid media, only RTP audio and MSRP chat are supported')
session.reject(488)
return
try:
self.validate_acl(session._invitation.request_uri, session.remote_identity.uri)
except ACLValidationError:
log.msg(u'Session rejected: unauthorized by access list')
session.reject(403)
return
# Check if requested files belong to this room
for stream in (stream for stream in transfer_streams if stream.direction == 'sendonly'):
try:
room = self.get_room(session._invitation.request_uri)
except RoomNotFoundError:
log.msg(u'Session rejected: room not found')
session.reject(404)
return
try:
file = next(file for file in room.files if file.hash == stream.file_selector.hash)
except StopIteration:
log.msg(u'Session rejected: requested file not found')
session.reject(404)
return
filename = os.path.basename(file.name)
for dirpath, dirnames, filenames in os.walk(os.path.join(ConferenceConfig.file_transfer_dir, room.uri)):
if filename in filenames:
path = os.path.join(dirpath, filename)
stream.file_selector.fd = open(path, 'r')
if stream.file_selector.size is None:
stream.file_selector.size = os.fstat(stream.file_selector.fd.fileno()).st_size
if stream.file_selector.type is None:
mime_type, encoding = mimetypes.guess_type(filename)
if encoding is not None:
type = 'application/x-%s' % encoding
elif mime_type is not None:
type = mime_type
else:
type = 'application/octet-stream'
stream.file_selector.type = type
break
else:
# File got removed from the filesystem
log.msg(u'Session rejected: requested file removed from the filesystem')
session.reject(404)
return
NotificationCenter().add_observer(self, sender=session)
if audio_streams:
session.send_ring_indication()
streams = [streams[0] for streams in (audio_streams, chat_streams, transfer_streams) if streams]
reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams)
def incoming_subscription(self, subscribe_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
if Null in (from_header, to_header):
subscribe_request.reject(400)
return
if subscribe_request.event != 'conference':
log.msg(u'Subscription rejected: only conference event is supported')
subscribe_request.reject(489)
return
try:
self.validate_acl(data.request_uri, from_header.uri)
except ACLValidationError:
try:
self.validate_acl(to_header.uri, from_header.uri)
except ACLValidationError:
# Check if we need to skip the ACL because this was an invited participant
if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or
str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})):
log.msg(u'Subscription rejected: unauthorized by access list')
subscribe_request.reject(403)
return
try:
room = self.get_room(data.request_uri)
except RoomNotFoundError:
try:
room = self.get_room(to_header.uri)
except RoomNotFoundError:
log.msg(u'Subscription rejected: room not yet created')
subscribe_request.reject(480)
return
if not room.started:
log.msg(u'Subscription rejected: room not started yet')
subscribe_request.reject(480)
else:
room.handle_incoming_subscription(subscribe_request, data)
def incoming_referral(self, refer_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
refer_to_header = data.headers.get('Refer-To', Null)
if Null in (from_header, to_header, refer_to_header):
refer_request.reject(400)
return
log.msg(u'Room %s - join request from %s to %s' % ('%s@%s' % (to_header.uri.user, to_header.uri.host), from_header.uri, refer_to_header.uri))
try:
self.validate_acl(data.request_uri, from_header.uri)
except ACLValidationError:
log.msg(u'Room %s - invite participant request rejected: unauthorized by access list' % data.request_uri)
refer_request.reject(403)
return
referral_handler = IncomingReferralHandler(refer_request, data)
referral_handler.start()
def incoming_message(self, message_request, data):
log.msg(u'SIP MESSAGE is not supported, use MSRP media instead')
message_request.answer(405)
def accept_session(self, session, streams):
if session.state == 'incoming':
try:
session.accept(streams, is_focus=True)
except IllegalStateError:
pass
def add_participant(self, session, room_uri):
# Keep track of the invited participants, we must skip ACL policy
# for SUBSCRIBE requests
room_uri_str = '%s@%s' % (room_uri.user, room_uri.host)
log.msg(u'Room %s - outgoing session to %s started' % (room_uri_str, session.remote_identity.uri))
d = self.invited_participants_map.setdefault(room_uri_str, {})
d.setdefault(str(session.remote_identity.uri), 0)
d[str(session.remote_identity.uri)] += 1
NotificationCenter().add_observer(self, sender=session)
room = self.get_room(room_uri, True)
room.start()
room.add_session(session)
def remove_participant(self, participant_uri, room_uri):
try:
room = self.get_room(room_uri)
except RoomNotFoundError:
pass
else:
log.msg('Room %s - %s removed from conference' % (room_uri, participant_uri))
room.terminate_sessions(participant_uri)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
session = notification.sender
room = self.get_room(session._invitation.request_uri, True) # FIXME
room.start()
room.add_session(session)
@run_in_green_thread
def _NH_SIPSessionDidEnd(self, notification):
session = notification.sender
notification.center.remove_observer(self, sender=session)
if session.direction == 'incoming':
room_uri = session._invitation.request_uri # FIXME
else:
# Clear invited participants mapping
room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host)
d = self.invited_participants_map[room_uri_str]
d[str(session.remote_identity.uri)] -= 1
if d[str(session.remote_identity.uri)] == 0:
del d[str(session.remote_identity.uri)]
room_uri = session.local_identity.uri
# We could get this notifiction even if we didn't get SIPSessionDidStart
try:
room = self.get_room(room_uri)
except RoomNotFoundError:
return
if session in room.sessions:
room.remove_session(session)
if not room.stopping and room.empty:
self.remove_room(room_uri)
room.stop()
def _NH_SIPSessionDidFail(self, notification):
session = notification.sender
notification.center.remove_observer(self, sender=session)
log.msg(u'Session from %s failed: %s' % (session.remote_identity.uri, notification.data.reason))
class IncomingReferralHandler(object):
implements(IObserver)
def __init__(self, refer_request, data):
self._refer_request = refer_request
self._refer_headers = data.headers
self.room_uri = data.request_uri
self.room_uri_str = '%s@%s' % (self.room_uri.user, self.room_uri.host)
self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri)
self.method = data.headers.get('Refer-To').parameters.get('method', 'INVITE').upper()
self.session = None
self.streams = []
def start(self):
if not self.refer_to_uri.startswith(('sip:', 'sips:')):
self.refer_to_uri = 'sip:%s' % self.refer_to_uri
try:
self.refer_to_uri = SIPURI.parse(self.refer_to_uri)
except SIPCoreError:
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.reject(488)
return
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._refer_request)
if self.method == 'INVITE':
self._refer_request.accept()
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = self.refer_to_uri
lookup = DNSLookup()
notification_center.add_observer(self, sender=lookup)
lookup.lookup_sip_proxy(uri, settings.sip.transport_list)
elif self.method == 'BYE':
log.msg('Room %s - %s removed %s from the room' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri))
self._refer_request.accept()
conference_application = ConferenceApplication()
conference_application.remove_participant(self.refer_to_uri, self.room_uri)
self._refer_request.end(200)
else:
self._refer_request.reject(488)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_DNSLookupDidSucceed(self, notification):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=notification.sender)
account = DefaultAccount()
conference_application = ConferenceApplication()
try:
room = conference_application.get_room(self.room_uri)
except RoomNotFoundError:
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.end(500)
return
else:
active_media = room.active_media
if not active_media:
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.end(500)
return
if 'audio' in active_media:
self.streams.append(AudioStream())
if 'chat' in active_media:
self.streams.append(ChatStream())
self.session = Session(account)
notification_center.add_observer(self, sender=self.session)
original_from_header = self._refer_headers.get('From')
if original_from_header.display_name:
original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host)
else:
original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host)
from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call')
to_header = ToHeader(self.refer_to_uri)
transport = notification.data.result[0].transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_header = ContactHeader(SIPURI(user=self.room_uri.user, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters))
extra_headers = []
if self._refer_headers.get('Referred-By', None) is not None:
extra_headers.append(Header.new(self._refer_headers.get('Referred-By')))
else:
extra_headers.append(Header('Referred-By', str(original_from_header.uri)))
if ThorNodeConfig.enabled:
extra_headers.append(Header('Thor-Scope', 'conference-invitation'))
extra_headers.append(Header('X-Originator-From', str(original_from_header.uri)))
extra_headers.append(SubjectHeader(u'Join conference request from %s' % original_identity))
route = notification.data.result[0]
self.session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=self.streams, is_focus=True, extra_headers=extra_headers)
def _NH_DNSLookupDidFail(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
def _NH_SIPSessionGotRingIndication(self, notification):
if self._refer_request is not None:
self._refer_request.send_notify(180)
def _NH_SIPSessionGotProvisionalResponse(self, notification):
if self._refer_request is not None:
self._refer_request.send_notify(notification.data.code, notification.data.reason)
def _NH_SIPSessionDidStart(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(200)
conference_application = ConferenceApplication()
conference_application.add_participant(self.session, self.room_uri)
log.msg('Room %s - %s added %s' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri))
self.session = None
self.streams = []
def _NH_SIPSessionDidFail(self, notification):
log.msg('Room %s - failed to add %s: %s' % (self.room_uri_str, self.refer_to_uri, notification.data.reason))
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(notification.data.code or 500, notification.data.reason or notification.data.code)
self.session = None
self.streams = []
def _NH_SIPSessionDidEnd(self, notification):
# If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(200)
self.session = None
self.streams = []
def _NH_SIPIncomingReferralDidEnd(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
self._refer_request = None
diff --git a/sylk/applications/xmppgateway/im.py b/sylk/applications/xmppgateway/im.py
index 3fffa22..d49ba2e 100644
--- a/sylk/applications/xmppgateway/im.py
+++ b/sylk/applications/xmppgateway/im.py
@@ -1,451 +1,451 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.descriptor import WriteOnceAttribute
from collections import deque
from eventlib import coros
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI
from sipsimple.core import ContactHeader, FromHeader, RouteHeader, ToHeader
from sipsimple.core import Message as SIPMessageRequest
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.streams.applications.chat import CPIMIdentity
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread, run_in_waitable_green_thread
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications.xmppgateway.configuration import XMPPGatewayConfig
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, generate_sylk_resource, encode_resource
from sylk.applications.xmppgateway.logger import log
from sylk.applications.xmppgateway.xmpp import XMPPManager
from sylk.applications.xmppgateway.xmpp.session import XMPPChatSession
from sylk.applications.xmppgateway.xmpp.stanzas import ChatMessage
-from sylk.extensions import ChatStream
from sylk.session import Session
+from sylk.streams import ChatStream
__all__ = ['ChatSessionHandler', 'SIPMessageSender', 'SIPMessageError']
SESSION_TIMEOUT = XMPPGatewayConfig.sip_session_timeout
class ChatSessionHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self):
self.started = False
self.ended = False
self.sip_session = None
self.msrp_stream = None
self._sip_session_timer = None
self.use_receipts = False
self.xmpp_session = None
self._xmpp_message_queue = deque()
self._pending_msrp_chunks = {}
self._pending_xmpp_stanzas = {}
def _set_started(self, value):
old_value = self.__dict__.get('started', False)
self.__dict__['started'] = value
if not old_value and value:
NotificationCenter().post_notification('ChatSessionDidStart', sender=self)
self._send_queued_messages()
def _get_started(self):
return self.__dict__['started']
started = property(_get_started, _set_started)
del _get_started, _set_started
def _set_xmpp_session(self, session):
self.__dict__['xmpp_session'] = session
if session is not None:
# Reet SIP session timer in case it's active
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
NotificationCenter().add_observer(self, sender=session)
session.start()
# Reet SIP session timer in case it's active
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
def _get_xmpp_session(self):
return self.__dict__['xmpp_session']
xmpp_session = property(_get_xmpp_session, _set_xmpp_session)
del _get_xmpp_session, _set_xmpp_session
@classmethod
def new_from_sip_session(cls, sip_identity, session):
instance = cls()
instance.sip_identity = sip_identity
instance._start_incoming_sip_session(session)
return instance
@classmethod
def new_from_xmpp_stanza(cls, xmpp_identity, recipient):
instance = cls()
instance.xmpp_identity = xmpp_identity
instance._start_outgoing_sip_session(recipient)
return instance
@run_in_green_thread
def _start_incoming_sip_session(self, session):
self.sip_session = session
self.msrp_stream = next(stream for stream in session.proposed_streams if stream.type=='chat')
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.sip_session)
notification_center.add_observer(self, sender=self.msrp_stream)
self.sip_session.accept([self.msrp_stream])
@run_in_green_thread
def _start_outgoing_sip_session(self, target_uri):
notification_center = NotificationCenter()
# self.xmpp_identity is our local identity
from_uri = self.xmpp_identity.uri.as_sip_uri()
del from_uri.parameters['gr'] # no GRUU in From header
contact_uri = self.xmpp_identity.uri.as_sip_uri()
contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8'))
to_uri = target_uri.as_sip_uri()
lookup = DNSLookup()
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = to_uri
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError:
log.warning('DNS lookup error while looking for %s proxy' % uri)
notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='DNS lookup error'))
return
self.msrp_stream = ChatStream()
route = routes.pop(0)
from_header = FromHeader(from_uri)
to_header = ToHeader(to_uri)
contact_header = ContactHeader(contact_uri)
self.sip_session = Session(account)
notification_center.add_observer(self, sender=self.sip_session)
notification_center.add_observer(self, sender=self.msrp_stream)
self.sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self.msrp_stream])
def end(self):
if self.ended:
return
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.cancel()
self._sip_session_timer = None
notification_center = NotificationCenter()
if self.sip_session is not None:
notification_center.remove_observer(self, sender=self.sip_session)
notification_center.remove_observer(self, sender=self.msrp_stream)
self.sip_session.end()
self.sip_session = None
self.msrp_stream = None
if self.xmpp_session is not None:
notification_center.remove_observer(self, sender=self.xmpp_session)
self.xmpp_session.end()
self.xmpp_session = None
self.ended = True
if self.started:
notification_center.post_notification('ChatSessionDidEnd', sender=self)
else:
notification_center.post_notification('ChatSessionDidFail', sender=self, data=NotificationData(reason='Ended before actually started'))
def enqueue_xmpp_message(self, message):
if self.started:
raise RuntimeError('session is already started')
self._xmpp_message_queue.append(message)
def _send_queued_messages(self):
if self._xmpp_message_queue:
while self._xmpp_message_queue:
message = self._xmpp_message_queue.popleft()
if message.body is None:
continue
if not message.use_receipt:
success_report = 'no'
failure_report = 'no'
else:
success_report = 'yes'
failure_report = 'yes'
sender_uri = message.sender.uri.as_sip_uri()
sender_uri.parameters['gr'] = encode_resource(sender_uri.parameters['gr'].decode('utf-8'))
sender = CPIMIdentity(sender_uri)
self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report)
self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender)
def _inactivity_timeout(self):
log.msg("Ending SIP session %s due to inactivity" % self.sip_session._invitation.call_id)
self.sip_session.end()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
log.msg("SIP session %s started" % notification.sender._invitation.call_id)
self._sip_session_timer = reactor.callLater(SESSION_TIMEOUT, self._inactivity_timeout)
if self.sip_session.direction == 'outgoing':
# Time to set sip_identity and create the XMPPChatSession
contact_uri = self.sip_session._invitation.remote_contact_header.uri
if contact_uri.parameters.get('gr') is not None:
sip_leg_uri = FrozenURI(contact_uri.user, contact_uri.host, contact_uri.parameters.get('gr'))
else:
tmp = self.sip_session.remote_identity.uri
sip_leg_uri = FrozenURI(tmp.user, tmp.host, generate_sylk_resource())
self.sip_identity = Identity(sip_leg_uri, self.sip_session.remote_identity.display_name)
session = XMPPChatSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
self.xmpp_session = session
# Session is now established on both ends
self.started = True
# Try to wakeup XMPP clients
self.xmpp_session.send_composing_indication('active')
self.xmpp_session.send_message(' ', 'text/plain')
else:
if self.xmpp_session is not None:
# Session is now established on both ends
self.started = True
# Try to wakeup XMPP clients
self.xmpp_session.send_composing_indication('active')
self.xmpp_session.send_message(' ', 'text/plain')
else:
# Try to wakeup XMPP clients
sender = self.sip_identity
tmp = self.sip_session.local_identity.uri
recipient_uri = FrozenURI(tmp.user, tmp.host)
recipient = Identity(recipient_uri)
xmpp_manager = XMPPManager()
xmpp_manager.send_stanza(ChatMessage(sender, recipient, ' ', 'text/plain'))
# Send queued messages
self._send_queued_messages()
def _NH_SIPSessionDidEnd(self, notification):
log.msg("SIP session %s ended" % notification.sender._invitation.call_id)
notification.center.remove_observer(self, sender=self.sip_session)
notification.center.remove_observer(self, sender=self.msrp_stream)
self.sip_session = None
self.msrp_stream = None
self.end()
def _NH_SIPSessionDidFail(self, notification):
log.msg("SIP session %s failed" % notification.sender._invitation.call_id)
notification.center.remove_observer(self, sender=self.sip_session)
notification.center.remove_observer(self, sender=self.msrp_stream)
self.sip_session = None
self.msrp_stream = None
self.end()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
self.sip_session.reject_proposal()
def _NH_SIPSessionTransferNewIncoming(self, notification):
self.sip_session.reject_transfer(403)
def _NH_ChatStreamGotMessage(self, notification):
# Notification is sent by the MSRP stream
message = notification.data.message
content_type = message.content_type.lower()
if content_type not in ('text/plain', 'text/html'):
return
if content_type == 'text/plain':
html_body = None
body = message.body
else:
html_body = message.body
body = None
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
chunk = notification.data.chunk
if self.started:
self.xmpp_session.send_message(body, html_body, message_id=chunk.message_id)
if self.use_receipts:
self._pending_msrp_chunks[chunk.message_id] = chunk
else:
self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK')
else:
sender = self.sip_identity
recipient_uri = FrozenURI.parse(message.recipients[0].uri)
recipient = Identity(recipient_uri, message.recipients[0].display_name)
xmpp_manager = XMPPManager()
xmpp_manager.send_stanza(ChatMessage(sender, recipient, body, html_body))
self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK')
def _NH_ChatStreamGotComposingIndication(self, notification):
# Notification is sent by the MSRP stream
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
if not self.started:
return
state = None
if notification.data.state == 'active':
state = 'composing'
elif notification.data.state == 'idle':
state = 'paused'
if state is not None:
self.xmpp_session.send_composing_indication(state)
def _NH_ChatStreamDidDeliverMessage(self, notification):
if self.started:
message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None)
if message is not None:
self.xmpp_session.send_receipt_acknowledgement(message.id)
def _NH_ChatStreamDidNotDeliverMessage(self, notification):
if self.started:
message = self._pending_xmpp_stanzas.pop(notification.data.message_id, None)
if message is not None:
self.xmpp_session.send_error(message, 'TODO', []) # TODO
def _NH_XMPPChatSessionDidStart(self, notification):
if self.sip_session is not None:
# Session is now established on both ends
self.started = True
def _NH_XMPPChatSessionDidEnd(self, notification):
notification.center.remove_observer(self, sender=self.xmpp_session)
self.xmpp_session = None
self.end()
def _NH_XMPPChatSessionGotMessage(self, notification):
if self.sip_session is None or self.sip_session.state != 'connected':
self._xmpp_message_queue.append(notification.data.message)
return
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
message = notification.data.message
sender_uri = message.sender.uri.as_sip_uri()
del sender_uri.parameters['gr'] # no GRUU in CPIM From header
sender = CPIMIdentity(sender_uri)
self.use_receipts = message.use_receipt
if not message.use_receipt:
success_report = 'no'
failure_report = 'no'
else:
success_report = 'yes'
failure_report = 'yes'
self._pending_xmpp_stanzas[message.id] = message
# Prefer plaintext
self.msrp_stream.send_message(message.body, 'text/plain', local_identity=sender, message_id=message.id, notify_progress=True, success_report=success_report, failure_report=failure_report)
self.msrp_stream.send_composing_indication('idle', 30, local_identity=sender)
def _NH_XMPPChatSessionGotComposingIndication(self, notification):
if self.sip_session is None or self.sip_session.state != 'connected':
return
if self._sip_session_timer is not None and self._sip_session_timer.active():
self._sip_session_timer.reset(SESSION_TIMEOUT)
message = notification.data.message
state = None
if message.state == 'composing':
state = 'active'
elif message.state == 'paused':
state = 'idle'
if state is not None:
sender_uri = message.sender.uri.as_sip_uri()
del sender_uri.parameters['gr'] # no GRUU in CPIM From header
sender = CPIMIdentity(sender_uri)
self.msrp_stream.send_composing_indication(state, 30, local_identity=sender)
if message.use_receipt:
self.xmpp_session.send_receipt_acknowledgement(message.id)
def _NH_XMPPChatSessionDidDeliverMessage(self, notification):
chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None)
if chunk is not None:
self.msrp_stream.msrp_session.send_report(chunk, 200, 'OK')
def _NH_XMPPChatSessionDidNotDeliverMessage(self, notification):
chunk = self._pending_msrp_chunks.pop(notification.data.message_id, None)
if chunk is not None:
self.msrp_stream.msrp_session.send_report(chunk, notification.data.code, notification.data.reason)
def chunks(text, size):
for i in xrange(0, len(text), size):
yield text[i:i+size]
class SIPMessageError(Exception):
def __init__(self, code, reason):
Exception.__init__(self, reason)
self.code = code
self.reason = reason
class SIPMessageSender(object):
implements(IObserver)
def __init__(self, message):
# TODO: sometimes we may want to send it to the GRUU, for example when a XMPP client
# replies to one of our messages. MESSAGE requests don't need a Contact header, though
# so how should we communicate our GRUU to the recipient?
self.from_uri = message.sender.uri.as_sip_uri()
self.from_uri.parameters.pop('gr', None) # No GRUU in From header
self.to_uri = message.recipient.uri.as_sip_uri()
self.to_uri.parameters.pop('gr', None) # Don't send it to the GRUU
self.body = message.body
self.content_type = 'text/plain'
self._requests = set()
self._channel = coros.queue()
@run_in_waitable_green_thread
def send(self):
lookup = DNSLookup()
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = self.to_uri
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError:
msg = 'DNS lookup error while looking for %s proxy' % uri
log.warning(msg)
raise SIPMessageError(0, msg)
else:
route = routes.pop(0)
from_header = FromHeader(self.from_uri)
to_header = ToHeader(self.to_uri)
route_header = RouteHeader(route.uri)
notification_center = NotificationCenter()
for chunk in chunks(self.body, 1000):
request = SIPMessageRequest(from_header, to_header, route_header, self.content_type, self.body)
notification_center.add_observer(self, sender=request)
self._requests.add(request)
request.send()
error = None
count = len(self._requests)
while count > 0:
notification = self._channel.wait()
if notification.name == 'SIPMessageDidFail':
error = (notification.data.code, notification.data.reason)
count -= 1
self._requests.clear()
if error is not None:
raise SIPMessageError(*error)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPMessageDidSucceed(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
self._channel.send(notification)
def _NH_SIPMessageDidFail(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
self._channel.send(notification)
diff --git a/sylk/applications/xmppgateway/muc.py b/sylk/applications/xmppgateway/muc.py
index ed8fac2..9814c20 100644
--- a/sylk/applications/xmppgateway/muc.py
+++ b/sylk/applications/xmppgateway/muc.py
@@ -1,470 +1,470 @@
# Copyright (C) 2012 AG Projects. See LICENSE for details
#
import random
import uuid
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null, limit
from application.python.descriptor import WriteOnceAttribute
from eventlib import coros, proc
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import Engine, SIPURI, SIPCoreError, Referral, sip_status_messages
from sipsimple.core import ContactHeader, FromHeader, ToHeader, ReferToHeader, RouteHeader
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.streams.msrp import ChatStreamError
from sipsimple.streams.applications.chat import CPIMIdentity
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from time import time
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications.xmppgateway.datatypes import Identity, FrozenURI, encode_resource
from sylk.applications.xmppgateway.logger import log
from sylk.applications.xmppgateway.xmpp import XMPPManager
from sylk.applications.xmppgateway.xmpp.session import XMPPIncomingMucSession
from sylk.applications.xmppgateway.xmpp.stanzas import MUCAvailabilityPresence, MUCErrorPresence, OutgoingInvitationMessage, STANZAS_NS
from sylk.configuration import SIPConfig
-from sylk.extensions import ChatStream
from sylk.session import Session
+from sylk.streams import ChatStream
class ReferralError(Exception):
def __init__(self, error, code=0):
self.error = error
self.code = code
class SIPReferralDidFail(Exception):
def __init__(self, data):
self.data = data
class MucInvitationFailure(object):
def __init__(self, code, reason):
self.code = code
self.reason = reason
def __str__(self):
return '%s (%s)' % (self.code, self.reason)
class X2SMucInvitationHandler(object):
implements(IObserver)
def __init__(self, sender, recipient, participant):
self.sender = sender
self.recipient = recipient
self.participant = participant
self.active = False
self.route = None
self._channel = coros.queue()
self._referral = None
self._failure = None
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, name='NetworkConditionsDidChange')
proc.spawn(self._run)
notification_center.post_notification('X2SMucInvitationHandlerDidStart', sender=self)
def _run(self):
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
sender_uri = self.sender.uri.as_sip_uri()
recipient_uri = self.recipient.uri.as_sip_uri()
participant_uri = self.participant.uri.as_sip_uri()
try:
# Lookup routes
account = DefaultAccount()
if account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list:
uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport})
elif account.sip.always_use_my_proxy:
uri = SIPURI(host=account.id.domain)
else:
uri = SIPURI.new(recipient_uri)
lookup = DNSLookup()
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError, e:
timeout = random.uniform(15, 30)
raise ReferralError(error='DNS lookup failed: %s' % e)
timeout = time() + 30
for route in routes:
self.route = route
remaining_time = timeout - time()
if remaining_time > 0:
transport = route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
refer_to_header = ReferToHeader(str(participant_uri))
refer_to_header.parameters['method'] = 'INVITE'
referral = Referral(recipient_uri, FromHeader(sender_uri),
ToHeader(recipient_uri),
refer_to_header,
ContactHeader(contact_uri),
RouteHeader(route.uri),
account.credentials)
notification_center.add_observer(self, sender=referral)
try:
referral.send_refer(timeout=limit(remaining_time, min=1, max=5))
except SIPCoreError:
notification_center.remove_observer(self, sender=referral)
timeout = 5
raise ReferralError(error='Internal error')
self._referral = referral
try:
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidStart':
break
except SIPReferralDidFail, e:
notification_center.remove_observer(self, sender=referral)
self._referral = None
if e.data.code in (403, 405):
raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code)
else:
# Otherwise just try the next route
continue
else:
break
else:
self.route = None
raise ReferralError(error='No more routes to try')
# At this point it is subscribed. Handle notifications and ending/failures.
try:
self.active = True
while True:
notification = self._channel.wait()
if notification.name == 'SIPReferralDidEnd':
break
except SIPReferralDidFail, e:
notification_center.remove_observer(self, sender=self._referral)
raise ReferralError(error=e.data.reason, code=e.data.code)
else:
notification_center.remove_observer(self, sender=self._referral)
finally:
self.active = False
except ReferralError, e:
self._failure = MucInvitationFailure(e.code, e.error)
finally:
notification_center.remove_observer(self, name='NetworkConditionsDidChange')
self._referral = None
if self._failure is not None:
notification_center.post_notification('X2SMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure))
else:
notification_center.post_notification('X2SMucInvitationHandlerDidEnd', sender=self)
def _refresh(self):
account = DefaultAccount()
transport = self.route.transport
parameters = {} if transport=='udp' else {'transport': transport}
contact_uri = SIPURI(user=account.contact.username, host=SIPConfig.local_ip.normalized, port=getattr(Engine(), '%s_port' % transport), parameters=parameters)
contact_header = ContactHeader(contact_uri)
self._referral.refresh(contact_header=contact_header, timeout=2)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPReferralDidStart(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidEnd(self, notification):
self._channel.send(notification)
def _NH_SIPReferralDidFail(self, notification):
self._channel.send_exception(SIPReferralDidFail(notification.data))
def _NH_SIPReferralGotNotify(self, notification):
self._channel.send(notification)
def _NH_NetworkConditionsDidChange(self, notification):
if self.active:
self._refresh()
class S2XMucInvitationHandler(object):
implements(IObserver)
def __init__(self, session, sender, recipient, inviter):
self.session = session
self.sender = sender
self.recipient = recipient
self.inviter = inviter
self._timer = None
self._failure = None
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
stanza = OutgoingInvitationMessage(self.sender, self.recipient, self.inviter, id='MUC.'+uuid.uuid4().hex)
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
self._timer = reactor.callLater(90, self._timeout)
notification_center.post_notification('S2XMucInvitationHandlerDidStart', sender=self)
def stop(self):
if self._timer is not None and self._timer.active():
self._timer.cancel()
self._timer = None
notification_center = NotificationCenter()
if self.session is not None:
notification_center.remove_observer(self, sender=self.session)
reactor.callLater(5, self._end_session, self.session)
self.session = None
if self._failure is not None:
notification_center.post_notification('S2XMucInvitationHandlerDidFail', sender=self, data=NotificationData(failure=self._failure))
else:
notification_center.post_notification('S2XMucInvitationHandlerDidEnd', sender=self)
def _end_session(self, session):
try:
session.end(480)
except Exception:
pass
def _timeout(self):
NotificationCenter().remove_observer(self, sender=self.session)
try:
self.session.end(408)
except Exception:
pass
self.session = None
self._failure = MucInvitationFailure('Timeout', 408)
self.stop()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidFail(self, notification):
notification.center.remove_observer(self, sender=self.session)
self.session = None
self._failure = MucInvitationFailure(notification.data.reason or notification.data.failure_reason, notification.data.code)
self.stop()
class X2SMucHandler(object):
implements(IObserver)
sip_identity = WriteOnceAttribute()
xmpp_identity = WriteOnceAttribute()
def __init__(self, sip_identity, xmpp_identity, nickname):
self.sip_identity = sip_identity
self.xmpp_identity = xmpp_identity
self.nickname = nickname
self._xmpp_muc_session = None
self._sip_session = None
self._msrp_stream = None
self._first_stanza = None
self._pending_nicknames_map = {} # map message ID of MSRP NICKNAME chunk to corresponding stanza
self._pending_messages_map = {} # map message ID of MSRP SEND chunk to corresponding stanza
self._participants = set() # set of (URI, nickname) tuples
self.ended = False
def start(self):
notification_center = NotificationCenter()
self._xmpp_muc_session = XMPPIncomingMucSession(local_identity=self.sip_identity, remote_identity=self.xmpp_identity)
notification_center.add_observer(self, sender=self._xmpp_muc_session)
self._xmpp_muc_session.start()
notification_center.post_notification('X2SMucHandlerDidStart', sender=self)
self._start_sip_session()
def end(self):
if self.ended:
return
notification_center = NotificationCenter()
if self._xmpp_muc_session is not None:
notification_center.remove_observer(self, sender=self._xmpp_muc_session)
# Send indication that the user has been kicked from the room
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False)
stanza.jid = self.xmpp_identity
stanza.muc_statuses.append('307')
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
self._xmpp_muc_session.end()
self._xmpp_muc_session = None
if self._sip_session is not None:
notification_center.remove_observer(self, sender=self._sip_session)
self._sip_session.end()
self._sip_session = None
self.ended = True
notification_center.post_notification('X2SMucHandlerDidEnd', sender=self)
@run_in_green_thread
def _start_sip_session(self):
# self.xmpp_identity is our local identity
from_uri = self.xmpp_identity.uri.as_sip_uri()
del from_uri.parameters['gr'] # no GRUU in From header
contact_uri = self.xmpp_identity.uri.as_sip_uri()
contact_uri.parameters['gr'] = encode_resource(contact_uri.parameters['gr'].decode('utf-8'))
to_uri = self.sip_identity.uri.as_sip_uri()
lookup = DNSLookup()
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = to_uri
try:
routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()
except DNSLookupError:
log.warning('DNS lookup error while looking for %s proxy' % uri)
self.end()
return
self._msrp_stream = ChatStream()
route = routes.pop(0)
from_header = FromHeader(from_uri)
to_header = ToHeader(to_uri)
contact_header = ContactHeader(contact_uri)
self._sip_session = Session(account)
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._sip_session)
notification_center.add_observer(self, sender=self._msrp_stream)
self._sip_session.connect(from_header, to_header, contact_header=contact_header, route=route, streams=[self._msrp_stream])
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
log.msg("SIP multiparty session %s started" % notification.sender._invitation.call_id)
if not self._sip_session.remote_focus or not self._msrp_stream.nickname_allowed:
self.end()
return
message_id = self._msrp_stream.set_local_nickname(self.nickname)
self._pending_nicknames_map[message_id] = (self.nickname, self._first_stanza)
self._first_stanza = None
def _NH_SIPSessionDidEnd(self, notification):
log.msg("SIP multiparty session %s ended" % notification.sender._invitation.call_id)
notification.center.remove_observer(self, sender=self._sip_session)
notification.center.remove_observer(self, sender=self._msrp_stream)
self._sip_session = None
self._msrp_stream = None
self.end()
def _NH_SIPSessionDidFail(self, notification):
log.msg("SIP multiparty session %s failed" % notification.sender._invitation.call_id)
notification.center.remove_observer(self, sender=self._sip_session)
notification.center.remove_observer(self, sender=self._msrp_stream)
self._sip_session = None
self._msrp_stream = None
self.end()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
self._sip_session.reject_proposal()
def _NH_SIPSessionTransferNewIncoming(self, notification):
self._sip_session.reject_transfer(403)
def _NH_SIPSessionGotConferenceInfo(self, notification):
# Translate to XMPP payload
xmpp_manager = XMPPManager()
own_uri = FrozenURI(self.xmpp_identity.uri.user, self.xmpp_identity.uri.host)
conference_info = notification.data.conference_info
new_participants = set()
for user in conference_info.users:
user_uri = FrozenURI.parse(user.entity if user.entity.startswith(('sip:', 'sips:')) else 'sip:'+user.entity)
nickname = user.display_text.value if user.display_text else user.entity
new_participants.add((user_uri, nickname))
# Remove participants that are no longer in the room
for uri, nickname in self._participants - new_participants:
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=False)
xmpp_manager.send_muc_stanza(stanza)
# Send presence for current participants
for uri, nickname in new_participants:
if uri == own_uri:
continue
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True)
stanza.jid = Identity(uri)
xmpp_manager.send_muc_stanza(stanza)
self._participants = new_participants
# Send own status last
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, self.nickname))
stanza = MUCAvailabilityPresence(sender, self.xmpp_identity, available=True)
stanza.jid = self.xmpp_identity
stanza.muc_statuses.append('110')
xmpp_manager.send_muc_stanza(stanza)
def _NH_ChatStreamGotMessage(self, notification):
# Notification is sent by the MSRP stream
if not self._xmpp_muc_session:
return
message = notification.data.message
content_type = message.content_type.lower()
if content_type not in ('text/plain', 'text/html'):
return
if content_type == 'text/plain':
html_body = None
body = message.body
else:
html_body = message.body
body = None
resource = message.sender.display_name or str(message.sender.uri)
sender = Identity(FrozenURI(self.sip_identity.uri.user, self.sip_identity.uri.host, resource))
self._xmpp_muc_session.send_message(sender, body, html_body, message_id='MUC.'+uuid.uuid4().hex)
self._msrp_stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
def _NH_ChatStreamDidSetNickname(self, notification):
# Notification is sent by the MSRP stream
nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id)
self.nickname = nickname
def _NH_ChatStreamDidNotSetNickname(self, notification):
# Notification is sent by the MSRP stream
nickname, stanza = self._pending_nicknames_map.pop(notification.data.message_id)
error_stanza = MUCErrorPresence.from_stanza(stanza, 'cancel', [('conflict', STANZAS_NS)])
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(error_stanza)
def _NH_ChatStreamDidDeliverMessage(self, notification):
# Echo back the message to the sender
stanza = self._pending_messages_map.pop(notification.data.message_id)
stanza.sender, stanza.recipient = stanza.recipient, stanza.sender
stanza.sender.uri = FrozenURI(stanza.sender.uri.user, stanza.sender.uri.host, self.nickname)
xmpp_manager = XMPPManager()
xmpp_manager.send_muc_stanza(stanza)
def _NH_ChatStreamDidNotDeliverMessage(self, notification):
self._pending_messages_map.pop(notification.data.message_id)
def _NH_XMPPIncomingMucSessionDidEnd(self, notification):
notification.center.remove_observer(self, sender=self._xmpp_muc_session)
self._xmpp_muc_session = None
self.end()
def _NH_XMPPIncomingMucSessionGotMessage(self, notification):
if not self._sip_session:
return
message = notification.data.message
sender_uri = message.sender.uri.as_sip_uri()
del sender_uri.parameters['gr'] # no GRUU in CPIM From header
sender = CPIMIdentity(sender_uri, display_name=self.nickname)
message_id = self._msrp_stream.send_message(message.body, 'text/plain', local_identity=sender)
self._pending_messages_map[message_id] = message
# Message will be echoed back to the sender on ChatStreamDidDeliverMessage
def _NH_XMPPIncomingMucSessionChangedNickname(self, notification):
if not self._sip_session:
return
nickname = notification.data.nickname
try:
message_id = self._msrp_stream.set_local_nickname(nickname)
except ChatStreamError:
return
self._pending_nicknames_map[message_id] = (nickname, notification.data.stanza)
diff --git a/sylk/server.py b/sylk/server.py
index f44e180..c603357 100644
--- a/sylk/server.py
+++ b/sylk/server.py
@@ -1,229 +1,230 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details.
#
import sys
from threading import Event
from uuid import uuid4
from application import log
from application.notification import NotificationCenter
from application.python import Null
from eventlib import proc
from sipsimple.account import Account, BonjourAccount, AccountManager
from sipsimple.application import SIPApplication
from sipsimple.audio import AudioDevice, RootAudioBridge
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import AudioMixer
from sipsimple.lookup import DNSManager
from sipsimple.storage import MemoryStorage
from sipsimple.threading import ThreadManager
from sipsimple.threading.green import run_in_green_thread
from twisted.internet import reactor
-# Load extensions needed for integration with SIP SIMPLE SDK
-import sylk.extensions
+# Load stream extensions needed for integration with SIP SIMPLE SDK
+import sylk.streams
+del sylk.streams
from sylk.accounts import DefaultAccount
from sylk.applications import IncomingRequestHandler
from sylk.configuration import ServerConfig, SIPConfig, ThorNodeConfig
from sylk.configuration.settings import AccountExtension, BonjourAccountExtension, SylkServerSettingsExtension
from sylk.log import Logger
from sylk.session import SessionManager
class SylkServer(SIPApplication):
def __init__(self):
self.request_handler = Null
self.thor_interface = Null
self.logger = Logger()
self.stopping_event = Event()
self.stop_event = Event()
def start(self, options):
self.options = options
if self.options.enable_bonjour:
ServerConfig.enable_bonjour = True
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.add_observer(self, name='ThorNetworkGotFatalError')
Account.register_extension(AccountExtension)
BonjourAccount.register_extension(BonjourAccountExtension)
SIPSimpleSettings.register_extension(SylkServerSettingsExtension)
try:
super(SylkServer, self).start(MemoryStorage())
except Exception, e:
log.fatal("Error starting SIP Application: %s" % e)
sys.exit(1)
def _initialize_core(self):
# SylkServer needs to listen for extra events and request types
notification_center = NotificationCenter()
settings = SIPSimpleSettings()
# initialize core
options = dict(# general
ip_address=SIPConfig.local_ip,
user_agent=settings.user_agent,
# SIP
detect_sip_loops=True,
udp_port=settings.sip.udp_port if 'udp' in settings.sip.transport_list else None,
tcp_port=settings.sip.tcp_port if 'tcp' in settings.sip.transport_list else None,
tls_port=None,
# TLS
tls_verify_server=False,
tls_ca_file=None,
tls_cert_file=None,
tls_privkey_file=None,
# rtp
rtp_port_range=(settings.rtp.port_range.start, settings.rtp.port_range.end),
# audio
codecs=list(settings.rtp.audio_codec_list),
# video
video_codecs=list(settings.rtp.video_codec_list),
# logging
log_level=settings.logs.pjsip_level if settings.logs.trace_pjsip else 0,
trace_sip=settings.logs.trace_sip,
# events and requests to handle
events={'conference': ['application/conference-info+xml'],
'presence': ['application/pidf+xml'],
'refer': ['message/sipfrag;version=2.0']},
incoming_events=set(['conference', 'presence']),
incoming_requests=set(['MESSAGE']))
notification_center.add_observer(self, sender=self.engine)
self.engine.start(**options)
@run_in_green_thread
def _initialize_subsystems(self):
account_manager = AccountManager()
dns_manager = DNSManager()
notification_center = NotificationCenter()
session_manager = SessionManager()
settings = SIPSimpleSettings()
notification_center.post_notification('SIPApplicationWillStart', sender=self)
if self.state == 'stopping':
reactor.stop()
return
# Initialize default account
default_account = DefaultAccount()
account_manager.default_account = default_account
# initialize TLS
self._initialize_tls()
# initialize PJSIP internal resolver
self.engine.set_nameservers(dns_manager.nameservers)
# initialize audio objects
voice_mixer = AudioMixer(None, None, settings.audio.sample_rate, 0, 9999)
self.voice_audio_device = AudioDevice(voice_mixer)
self.voice_audio_bridge = RootAudioBridge(voice_mixer)
self.voice_audio_bridge.add(self.voice_audio_device)
# initialize instance id
settings.instance_id = uuid4().urn
settings.save()
# initialize middleware components
dns_manager.start()
account_manager.start()
session_manager.start()
notification_center.add_observer(self, name='CFGSettingsObjectDidChange')
self.state = 'started'
notification_center.post_notification('SIPApplicationDidStart', sender=self)
# start SylkServer components
if ThorNodeConfig.enabled:
from sylk.interfaces.sipthor import ConferenceNode
self.thor_interface = ConferenceNode()
self.request_handler = IncomingRequestHandler()
self.request_handler.start()
@run_in_green_thread
def _shutdown_subsystems(self):
# shutdown SylkServer components
procs = [proc.spawn(self.request_handler.stop), proc.spawn(self.thor_interface.stop)]
proc.waitall(procs)
# shutdown middleware components
dns_manager = DNSManager()
account_manager = AccountManager()
session_manager = SessionManager()
procs = [proc.spawn(dns_manager.stop), proc.spawn(account_manager.stop), proc.spawn(session_manager.stop)]
proc.waitall(procs)
# shutdown engine
self.engine.stop()
self.engine.join()
# stop threads
thread_manager = ThreadManager()
thread_manager.stop()
# stop the reactor
reactor.stop()
def _NH_AudioDevicesDidChange(self, notification):
pass
def _NH_DefaultAudioDeviceDidChange(self, notification):
pass
def _NH_SIPApplicationFailedToStartTLS(self, notification):
log.fatal("Couldn't set TLS options: %s" % notification.data.error)
def _NH_SIPApplicationWillStart(self, notification):
self.logger.start()
settings = SIPSimpleSettings()
if settings.logs.trace_sip and self.logger._siptrace_filename is not None:
log.msg('Logging SIP trace to file "%s"' % self.logger._siptrace_filename)
if settings.logs.trace_msrp and self.logger._msrptrace_filename is not None:
log.msg('Logging MSRP trace to file "%s"' % self.logger._msrptrace_filename)
if settings.logs.trace_pjsip and self.logger._pjsiptrace_filename is not None:
log.msg('Logging PJSIP trace to file "%s"' % self.logger._pjsiptrace_filename)
if settings.logs.trace_notifications and self.logger._notifications_filename is not None:
log.msg('Logging notifications trace to file "%s"' % self.logger._notifications_filename)
def _NH_SIPApplicationDidStart(self, notification):
settings = SIPSimpleSettings()
local_ip = SIPConfig.local_ip
log.msg("SylkServer started, listening on:")
for transport in settings.sip.transport_list:
try:
log.msg("%s:%d (%s)" % (local_ip, getattr(self.engine, '%s_port' % transport), transport.upper()))
except TypeError:
pass
def _NH_SIPApplicationWillEnd(self, notification):
log.msg('SIP application will end: %s' % self.end_reason)
self.stopping_event.set()
def _NH_SIPApplicationDidEnd(self, notification):
log.msg('SIP application ended')
self.logger.stop()
if not self.stopping_event.is_set():
log.warning('SIP application ended without shutting down all subsystems')
self.stopping_event.set()
self.stop_event.set()
def _NH_SIPEngineGotException(self, notification):
log.error('An exception occured within the SIP core:\n%s\n' % notification.data.traceback)
def _NH_SIPEngineDidFail(self, notification):
log.error('SIP engine failed')
super(SylkServer, self)._NH_SIPEngineDidFail(notification)
def _NH_ThorNetworkGotFatalError(self, notification):
log.error("All Thor Event Servers have unrecoverable errors.")
diff --git a/sylk/extensions.py b/sylk/streams.py
similarity index 100%
rename from sylk/extensions.py
rename to sylk/streams.py

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 7:14 AM (1 d, 14 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3409004
Default Alt Text
(72 KB)

Event Timeline