Page MenuHomePhabricator

No OneTemporary

diff --git a/debian/sylkserver.install b/debian/sylkserver.install
index 6ddca12..527b78f 100644
--- a/debian/sylkserver.install
+++ b/debian/sylkserver.install
@@ -1,3 +1,2 @@
usr/bin
usr/lib
-tls/* etc/sylkserver/tls
diff --git a/tls/ca.crt b/resources/tls/ca.crt
similarity index 100%
rename from tls/ca.crt
rename to resources/tls/ca.crt
diff --git a/tls/default.crt b/resources/tls/default.crt
similarity index 100%
rename from tls/default.crt
rename to resources/tls/default.crt
diff --git a/sylk/applications/conference/__init__.py b/sylk/applications/conference/__init__.py
index 32f2865..2f4cd20 100644
--- a/sylk/applications/conference/__init__.py
+++ b/sylk/applications/conference/__init__.py
@@ -1,434 +1,436 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details
#
import mimetypes
import os
import re
import shutil
from application.notification import IObserver, NotificationCenter
from application.python import Null
from gnutls.interfaces.twisted import X509Credentials
from sipsimple.account.bonjour import BonjourPresenceState
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI, SIPCoreError
from sipsimple.core import Header, FromHeader, ToHeader, SubjectHeader
from sipsimple.lookup import DNSLookup
from sipsimple.streams import AudioStream
from sipsimple.threading.green import run_in_green_thread
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications import SylkApplication
from sylk.applications.conference.configuration import get_room_config, ConferenceConfig
from sylk.applications.conference.logger import log
from sylk.applications.conference.room import Room
from sylk.applications.conference.web import ScreenSharingWebServer
from sylk.bonjour import BonjourServices
from sylk.configuration import ServerConfig, ThorNodeConfig
from sylk.session import Session, IllegalStateError
from sylk.streams import ChatStream
from sylk.tls import Certificate, PrivateKey
class ACLValidationError(Exception): pass
class RoomNotFoundError(Exception): pass
class ConferenceApplication(SylkApplication):
implements(IObserver)
def __init__(self):
self._rooms = {}
self.invited_participants_map = {}
self.bonjour_focus_service = Null
self.bonjour_room_service = Null
self.screen_sharing_web_server = None
def start(self):
# cleanup old files
for path in (ConferenceConfig.file_transfer_dir, ConferenceConfig.screen_sharing_dir):
try:
shutil.rmtree(path)
except EnvironmentError:
pass
if ServerConfig.enable_bonjour and ServerConfig.default_application == 'conference':
self.bonjour_focus_service = BonjourServices(service='sipfocus')
self.bonjour_focus_service.start()
log.msg("Bonjour publication started for service 'sipfocus'")
self.bonjour_room_service = BonjourServices(service='sipuri', name='Conference Room', uri_user='conference')
self.bonjour_room_service.start()
self.bonjour_room_service.presence_state = BonjourPresenceState('available', u'No participants')
log.msg("Bonjour publication started for service 'sipuri'")
self.screen_sharing_web_server = ScreenSharingWebServer(ConferenceConfig.screen_sharing_dir)
- if ConferenceConfig.screen_sharing_use_https and ConferenceConfig.screen_sharing_certificate is not None:
+ if ConferenceConfig.screen_sharing_use_https and os.path.isfile(ConferenceConfig.screen_sharing_certificate):
cert = Certificate(ConferenceConfig.screen_sharing_certificate.normalized)
key = PrivateKey(ConferenceConfig.screen_sharing_certificate.normalized)
credentials = X509Credentials(cert, key)
+ transport = 'https'
else:
credentials = None
+ transport = 'http'
self.screen_sharing_web_server.start(ConferenceConfig.screen_sharing_ip, ConferenceConfig.screen_sharing_port, credentials)
listen_address = self.screen_sharing_web_server.listener.getHost()
- log.msg("ScreenSharing listener started on %s:%d" % (listen_address.host, listen_address.port))
+ log.msg("ScreenSharing listener started on %s://%s:%d" % (transport, listen_address.host, listen_address.port))
def stop(self):
self.bonjour_focus_service.stop()
self.bonjour_room_service.stop()
self.screen_sharing_web_server.stop()
def get_room(self, uri, create=False):
room_uri = '%s@%s' % (uri.user, uri.host)
try:
room = self._rooms[room_uri]
except KeyError:
if create:
room = Room(room_uri)
self._rooms[room_uri] = room
return room
else:
raise RoomNotFoundError
else:
return room
def remove_room(self, uri):
room_uri = '%s@%s' % (uri.user, uri.host)
self._rooms.pop(room_uri, None)
def validate_acl(self, room_uri, from_uri):
room_uri = '%s@%s' % (room_uri.user, room_uri.host)
cfg = get_room_config(room_uri)
if cfg.access_policy == 'allow,deny':
if cfg.allow.match(from_uri) and not cfg.deny.match(from_uri):
return
raise ACLValidationError
else:
if cfg.deny.match(from_uri) and not cfg.allow.match(from_uri):
raise ACLValidationError
def incoming_session(self, session):
log.msg('New session from %s to %s' % (session.remote_identity.uri, session.local_identity.uri))
audio_streams = [stream for stream in session.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in session.proposed_streams if stream.type=='chat']
transfer_streams = [stream for stream in session.proposed_streams if stream.type=='file-transfer']
if not audio_streams and not chat_streams and not transfer_streams:
log.msg(u'Session rejected: invalid media, only RTP audio and MSRP chat are supported')
session.reject(488)
return
try:
self.validate_acl(session.request_uri, session.remote_identity.uri)
except ACLValidationError:
log.msg(u'Session rejected: unauthorized by access list')
session.reject(403)
return
# Check if requested files belong to this room
for stream in (stream for stream in transfer_streams if stream.direction == 'sendonly'):
try:
room = self.get_room(session.request_uri)
except RoomNotFoundError:
log.msg(u'Session rejected: room not found')
session.reject(404)
return
try:
file = next(file for file in room.files if file.hash == stream.file_selector.hash)
except StopIteration:
log.msg(u'Session rejected: requested file not found')
session.reject(404)
return
filename = os.path.basename(file.name)
for dirpath, dirnames, filenames in os.walk(os.path.join(ConferenceConfig.file_transfer_dir, room.uri)):
if filename in filenames:
path = os.path.join(dirpath, filename)
stream.file_selector.fd = open(path, 'r')
if stream.file_selector.size is None:
stream.file_selector.size = os.fstat(stream.file_selector.fd.fileno()).st_size
if stream.file_selector.type is None:
mime_type, encoding = mimetypes.guess_type(filename)
if encoding is not None:
type = 'application/x-%s' % encoding
elif mime_type is not None:
type = mime_type
else:
type = 'application/octet-stream'
stream.file_selector.type = type
break
else:
# File got removed from the filesystem
log.msg(u'Session rejected: requested file removed from the filesystem')
session.reject(404)
return
NotificationCenter().add_observer(self, sender=session)
if audio_streams:
session.send_ring_indication()
streams = [streams[0] for streams in (audio_streams, chat_streams, transfer_streams) if streams]
reactor.callLater(4 if audio_streams else 0, self.accept_session, session, streams)
def incoming_subscription(self, subscribe_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
if Null in (from_header, to_header):
subscribe_request.reject(400)
return
if subscribe_request.event != 'conference':
log.msg(u'Subscription rejected: only conference event is supported')
subscribe_request.reject(489)
return
try:
self.validate_acl(data.request_uri, from_header.uri)
except ACLValidationError:
try:
self.validate_acl(to_header.uri, from_header.uri)
except ACLValidationError:
# Check if we need to skip the ACL because this was an invited participant
if not (str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (data.request_uri.user, data.request_uri.host), {}) or
str(from_header.uri) in self.invited_participants_map.get('%s@%s' % (to_header.uri.user, to_header.uri.host), {})):
log.msg(u'Subscription rejected: unauthorized by access list')
subscribe_request.reject(403)
return
try:
room = self.get_room(data.request_uri)
except RoomNotFoundError:
try:
room = self.get_room(to_header.uri)
except RoomNotFoundError:
log.msg(u'Subscription rejected: room not yet created')
subscribe_request.reject(480)
return
if not room.started:
log.msg(u'Subscription rejected: room not started yet')
subscribe_request.reject(480)
else:
room.handle_incoming_subscription(subscribe_request, data)
def incoming_referral(self, refer_request, data):
from_header = data.headers.get('From', Null)
to_header = data.headers.get('To', Null)
refer_to_header = data.headers.get('Refer-To', Null)
if Null in (from_header, to_header, refer_to_header):
refer_request.reject(400)
return
log.msg(u'Room %s - join request from %s to %s' % ('%s@%s' % (to_header.uri.user, to_header.uri.host), from_header.uri, refer_to_header.uri))
try:
self.validate_acl(data.request_uri, from_header.uri)
except ACLValidationError:
log.msg(u'Room %s - invite participant request rejected: unauthorized by access list' % data.request_uri)
refer_request.reject(403)
return
referral_handler = IncomingReferralHandler(refer_request, data)
referral_handler.start()
def incoming_message(self, message_request, data):
log.msg(u'SIP MESSAGE is not supported, use MSRP media instead')
message_request.answer(405)
def accept_session(self, session, streams):
if session.state == 'incoming':
try:
session.accept(streams, is_focus=True)
except IllegalStateError:
pass
def add_participant(self, session, room_uri):
# Keep track of the invited participants, we must skip ACL policy
# for SUBSCRIBE requests
room_uri_str = '%s@%s' % (room_uri.user, room_uri.host)
log.msg(u'Room %s - outgoing session to %s started' % (room_uri_str, session.remote_identity.uri))
d = self.invited_participants_map.setdefault(room_uri_str, {})
d.setdefault(str(session.remote_identity.uri), 0)
d[str(session.remote_identity.uri)] += 1
NotificationCenter().add_observer(self, sender=session)
room = self.get_room(room_uri, True)
room.start()
room.add_session(session)
def remove_participant(self, participant_uri, room_uri):
try:
room = self.get_room(room_uri)
except RoomNotFoundError:
pass
else:
log.msg('Room %s - %s removed from conference' % (room_uri, participant_uri))
room.terminate_sessions(participant_uri)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidStart(self, notification):
session = notification.sender
room = self.get_room(session.request_uri, True)
room.start()
room.add_session(session)
@run_in_green_thread
def _NH_SIPSessionDidEnd(self, notification):
session = notification.sender
notification.center.remove_observer(self, sender=session)
if session.direction == 'incoming':
room_uri = session.request_uri
else:
# Clear invited participants mapping
room_uri_str = '%s@%s' % (session.local_identity.uri.user, session.local_identity.uri.host)
d = self.invited_participants_map[room_uri_str]
d[str(session.remote_identity.uri)] -= 1
if d[str(session.remote_identity.uri)] == 0:
del d[str(session.remote_identity.uri)]
room_uri = session.local_identity.uri
# We could get this notifiction even if we didn't get SIPSessionDidStart
try:
room = self.get_room(room_uri)
except RoomNotFoundError:
return
if session in room.sessions:
room.remove_session(session)
if not room.stopping and room.empty:
self.remove_room(room_uri)
room.stop()
def _NH_SIPSessionDidFail(self, notification):
session = notification.sender
notification.center.remove_observer(self, sender=session)
log.msg(u'Session from %s failed: %s' % (session.remote_identity.uri, notification.data.reason))
class IncomingReferralHandler(object):
implements(IObserver)
def __init__(self, refer_request, data):
self._refer_request = refer_request
self._refer_headers = data.headers
self.room_uri = data.request_uri
self.room_uri_str = '%s@%s' % (self.room_uri.user, self.room_uri.host)
self.refer_to_uri = re.sub('<|>', '', data.headers.get('Refer-To').uri)
self.method = data.headers.get('Refer-To').parameters.get('method', 'INVITE').upper()
self.session = None
self.streams = []
def start(self):
if not self.refer_to_uri.startswith(('sip:', 'sips:')):
self.refer_to_uri = 'sip:%s' % self.refer_to_uri
try:
self.refer_to_uri = SIPURI.parse(self.refer_to_uri)
except SIPCoreError:
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.reject(488)
return
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self._refer_request)
if self.method == 'INVITE':
self._refer_request.accept()
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = self.refer_to_uri
lookup = DNSLookup()
notification_center.add_observer(self, sender=lookup)
lookup.lookup_sip_proxy(uri, settings.sip.transport_list)
elif self.method == 'BYE':
log.msg('Room %s - %s removed %s from the room' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri))
self._refer_request.accept()
conference_application = ConferenceApplication()
conference_application.remove_participant(self.refer_to_uri, self.room_uri)
self._refer_request.end(200)
else:
self._refer_request.reject(488)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_DNSLookupDidSucceed(self, notification):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=notification.sender)
account = DefaultAccount()
conference_application = ConferenceApplication()
try:
room = conference_application.get_room(self.room_uri)
except RoomNotFoundError:
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.end(500)
return
else:
active_media = room.active_media
if not active_media:
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
self._refer_request.end(500)
return
if 'audio' in active_media:
self.streams.append(AudioStream())
if 'chat' in active_media:
self.streams.append(ChatStream())
self.session = Session(account)
notification_center.add_observer(self, sender=self.session)
original_from_header = self._refer_headers.get('From')
if original_from_header.display_name:
original_identity = "%s <%s@%s>" % (original_from_header.display_name, original_from_header.uri.user, original_from_header.uri.host)
else:
original_identity = "%s@%s" % (original_from_header.uri.user, original_from_header.uri.host)
from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference Call')
to_header = ToHeader(self.refer_to_uri)
extra_headers = []
if self._refer_headers.get('Referred-By', None) is not None:
extra_headers.append(Header.new(self._refer_headers.get('Referred-By')))
else:
extra_headers.append(Header('Referred-By', str(original_from_header.uri)))
if ThorNodeConfig.enabled:
extra_headers.append(Header('Thor-Scope', 'conference-invitation'))
extra_headers.append(Header('X-Originator-From', str(original_from_header.uri)))
extra_headers.append(SubjectHeader(u'Join conference request from %s' % original_identity))
route = notification.data.result[0]
self.session.connect(from_header, to_header, route=route, streams=self.streams, is_focus=True, extra_headers=extra_headers)
def _NH_DNSLookupDidFail(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
def _NH_SIPSessionGotRingIndication(self, notification):
if self._refer_request is not None:
self._refer_request.send_notify(180)
def _NH_SIPSessionGotProvisionalResponse(self, notification):
if self._refer_request is not None:
self._refer_request.send_notify(notification.data.code, notification.data.reason)
def _NH_SIPSessionDidStart(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(200)
conference_application = ConferenceApplication()
conference_application.add_participant(self.session, self.room_uri)
log.msg('Room %s - %s added %s' % (self.room_uri_str, self._refer_headers.get('From').uri, self.refer_to_uri))
self.session = None
self.streams = []
def _NH_SIPSessionDidFail(self, notification):
log.msg('Room %s - failed to add %s: %s' % (self.room_uri_str, self.refer_to_uri, notification.data.reason))
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(notification.data.code or 500, notification.data.reason or notification.data.code)
self.session = None
self.streams = []
def _NH_SIPSessionDidEnd(self, notification):
# If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead
log.msg('Room %s - failed to add %s' % (self.room_uri_str, self.refer_to_uri))
notification.center.remove_observer(self, sender=notification.sender)
if self._refer_request is not None:
self._refer_request.end(200)
self.session = None
self.streams = []
def _NH_SIPIncomingReferralDidEnd(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
self._refer_request = None
diff --git a/sylk/applications/conference/configuration.py b/sylk/applications/conference/configuration.py
index 1bbfca1..b341048 100644
--- a/sylk/applications/conference/configuration.py
+++ b/sylk/applications/conference/configuration.py
@@ -1,131 +1,132 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details.
#
__all__ = ['ConferenceConfig', 'get_room_config']
import re
from application.configuration import ConfigFile, ConfigSection, ConfigSetting
from application.configuration.datatypes import StringList, Hostname
from application.system import host
-from sylk.configuration.datatypes import IPAddress, NillablePath, Path, Port
+from sylk.configuration.datatypes import IPAddress, Path, Port
+from sylk.resources import Resources
# Datatypes
class AccessPolicyValue(str):
allowed_values = ('allow,deny', 'deny,allow')
def __new__(cls, value):
value = re.sub('\s', '', value)
if value not in cls.allowed_values:
raise ValueError('invalid value, allowed values are: %s' % ' | '.join(cls.allowed_values))
return str.__new__(cls, value)
class Domain(str):
domain_re = re.compile(r"^[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*$")
def __new__(cls, value):
value = str(value)
if not cls.domain_re.match(value):
raise ValueError("illegal domain: %s" % value)
return str.__new__(cls, value)
class SIPAddress(str):
def __new__(cls, address):
address = str(address)
address = address.replace('@', '%40', address.count('@')-1)
try:
username, domain = address.split('@')
Domain(domain)
except ValueError:
raise ValueError("illegal SIP address: %s, must be in user@domain format" % address)
return str.__new__(cls, address)
class PolicySettingValue(list):
def __init__(self, value):
if isinstance(value, (tuple, list)):
l = [str(x) for x in value]
elif isinstance(value, basestring):
if value.lower() in ('none', ''):
return list.__init__(self, [])
elif value.lower() in ('any', 'all', '*'):
return list.__init__(self, ['*'])
else:
l = re.split(r'\s*,\s*', value)
else:
raise TypeError("value must be a string, list or tuple")
values = []
for item in l:
if '@' in item:
values.append(SIPAddress(item))
else:
values.append(Domain(item))
return list.__init__(self, values)
def match(self, uri):
if self == ['*']:
return True
domain = uri.host
uri = re.sub('^(sip:|sips:)', '', str(uri))
return uri in self or domain in self
# Configuration objects
class ConferenceConfig(ConfigSection):
__cfgfile__ = 'conference.ini'
__section__ = 'Conference'
history_size = 20
access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny'))
allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all'))
deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none'))
file_transfer_dir = ConfigSetting(type=Path, value=Path('var/spool/sylkserver'))
push_file_transfer = False
screen_sharing_dir = ConfigSetting(type=Path, value=Path('var/spool/sylkserver/screensharing'))
screen_sharing_ip = ConfigSetting(type=IPAddress, value=IPAddress(host.default_ip))
screen_sharing_hostname = ConfigSetting(type=Hostname, value=IPAddress(host.default_ip))
screen_sharing_port = ConfigSetting(type=Port, value=0)
screen_sharing_use_https = True
- screen_sharing_certificate = ConfigSetting(type=NillablePath, value=NillablePath('tls/default.crt'))
+ screen_sharing_certificate = ConfigSetting(type=Path, value=Path(Resources.get('tls/default.crt')))
advertise_xmpp_support = True
pstn_access_numbers = ConfigSetting(type=StringList, value='')
class RoomConfig(ConfigSection):
__cfgfile__ = 'conference.ini'
access_policy = ConfigSetting(type=AccessPolicyValue, value=AccessPolicyValue('allow, deny'))
allow = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('all'))
deny = ConfigSetting(type=PolicySettingValue, value=PolicySettingValue('none'))
pstn_access_numbers = ConferenceConfig.pstn_access_numbers
advertise_xmpp_support = ConferenceConfig.advertise_xmpp_support
class Configuration(object):
def __init__(self, data):
self.__dict__.update(data)
def get_room_config(room):
config_file = ConfigFile(RoomConfig.__cfgfile__)
section = config_file.get_section(room)
if section is not None:
RoomConfig.read(section=room)
config = Configuration(dict(RoomConfig))
RoomConfig.reset()
else:
# Apply general policy
config = Configuration(dict(RoomConfig))
return config
diff --git a/sylk/applications/conference/room.py b/sylk/applications/conference/room.py
index cc468aa..6045b09 100644
--- a/sylk/applications/conference/room.py
+++ b/sylk/applications/conference/room.py
@@ -1,1183 +1,1184 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details.
#
import hashlib
import os
import random
import re
import shutil
import string
import weakref
from collections import defaultdict, deque
from glob import glob
from itertools import chain, count, cycle
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.system import makedirs
from eventlib import api, coros, proc
from sipsimple.account.bonjour import BonjourPresenceState
from sipsimple.application import SIPApplication
from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPCoreError, SIPCoreInvalidStateError, SIPURI
from sipsimple.core import Header, FromHeader, ToHeader, SubjectHeader
from sipsimple.lookup import DNSLookup, DNSLookupError
from sipsimple.payloads import conference
from sipsimple.streams.applications.chat import CPIMIdentity
from sipsimple.streams.msrp import ChatStreamError, FileSelector
from sipsimple.threading import run_in_thread, run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from sipsimple.util import ISOTimestamp
from twisted.internet import reactor
from zope.interface import implements
from sylk.accounts import DefaultAccount
from sylk.applications.conference.configuration import get_room_config, ConferenceConfig
from sylk.applications.conference.logger import log
from sylk.bonjour import BonjourServices
from sylk.configuration import ServerConfig, ThorNodeConfig
-from sylk.configuration.datatypes import ResourcePath, URL
+from sylk.configuration.datatypes import URL
+from sylk.resources import Resources
from sylk.session import Session, IllegalStateError
from sylk.streams import FileTransferStream
def format_identity(identity):
uri = identity.uri
if identity.display_name:
return u'%s <%s@%s>' % (identity.display_name, uri.user, uri.host)
else:
return u'%s@%s' % (uri.user, uri.host)
class ScreenImage(object):
def __init__(self, room, sender):
self.room = weakref.ref(room)
self.room_uri = room.uri
self.sender = sender
self.filename = os.path.join(ConferenceConfig.screen_sharing_dir, room.uri, '%s@%s_%s.jpg' % (sender.uri.user, sender.uri.host, ''.join(random.sample(string.letters+string.digits, 10))))
from sylk.applications.conference import ConferenceApplication
port = ConferenceApplication().screen_sharing_web_server.port
scheme = 'https' if ConferenceConfig.screen_sharing_use_https else 'http'
host = ConferenceConfig.screen_sharing_hostname or ConferenceConfig.screen_sharing_ip.normalized
self.url = URL('%s://%s:%s/' % (scheme, host, port))
self.url.query_items['image'] = os.path.join(room.uri, os.path.basename(self.filename))
self.state = None
self.timer = None
@property
def active(self):
return self.state == 'active'
@property
def idle(self):
return self.state == 'idle'
@run_in_thread('file-io')
def save(self, image):
makedirs(os.path.dirname(self.filename))
tmp_filename = self.filename + '.tmp'
try:
with open(tmp_filename, 'wb') as file:
file.write(image)
except EnvironmentError, e:
log.msg('Room %s - cannot write screen sharing image: %s: %s' % (self.room_uri, self.filename, e))
else:
try:
os.rename(tmp_filename, self.filename)
except EnvironmentError:
pass
self.advertise()
@run_in_twisted_thread
def advertise(self):
if self.state == 'active':
self.timer.reset(10)
else:
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.state = 'active'
self.timer = reactor.callLater(10, self.stop_advertising)
room = self.room() or Null
room.dispatch_conference_info()
txt = 'Room %s - %s is sharing the screen at %s' % (self.room_uri, format_identity(self.sender), self.url)
room.dispatch_server_message(txt)
log.msg(txt)
@run_in_twisted_thread
def stop_advertising(self):
if self.state != 'idle':
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.state = 'idle'
self.timer = None
room = self.room() or Null
room.dispatch_conference_info()
txt = '%s stopped sharing the screen' % format_identity(self.sender)
room.dispatch_server_message(txt)
log.msg(txt)
class Room(object):
"""
Object representing a conference room, it will handle the message dispatching
among all the participants.
"""
implements(IObserver)
def __init__(self, uri):
self.config = get_room_config(uri)
self.uri = uri
self.identity = CPIMIdentity(SIPURI.parse('sip:%s' % self.uri), display_name='Conference Room')
self.files = []
self.screen_images = {}
self.sessions = []
self.subscriptions = []
self.transfer_handlers = weakref.WeakSet()
self.state = 'stopped'
self.incoming_message_queue = coros.queue()
self.message_dispatcher = None
self.audio_conference = None
self.moh_player = None
self.conference_info_payload = None
self.conference_info_version = count(1)
self.bonjour_services = Null
self.session_nickname_map = {}
self.last_nicknames_map = {}
self.participants_counter = defaultdict(lambda: 0)
self.history = deque(maxlen=ConferenceConfig.history_size)
@property
def empty(self):
return len(self.sessions) == 0
@property
def started(self):
return self.state == 'started'
@property
def stopping(self):
return self.state in ('stopping', 'stopped')
@property
def active_media(self):
return set((stream.type for stream in chain(*(session.streams for session in self.sessions if session.streams))))
@property
def conference_info(self):
if self.conference_info_payload is None:
settings = SIPSimpleSettings()
conference_description = conference.ConferenceDescription(display_text='Ad-hoc conference', free_text='Hosted by %s' % settings.user_agent)
conference_description.conf_uris = conference.ConfUris()
conference_description.conf_uris.add(conference.ConfUrisEntry('sip:%s' % self.uri, purpose='participation'))
if self.config.advertise_xmpp_support:
conference_description.conf_uris.add(conference.ConfUrisEntry('xmpp:%s' % self.uri, purpose='participation'))
# TODO: add grouptextchat service uri
for number in self.config.pstn_access_numbers:
conference_description.conf_uris.add(conference.ConfUrisEntry('tel:%s' % number, purpose='participation'))
host_info = conference.HostInfo(web_page=conference.WebPage('http://sylkserver.com'))
self.conference_info_payload = conference.Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=conference.Users())
self.conference_info_payload.version = next(self.conference_info_version)
user_count = len(self.participants_counter.keys())
self.conference_info_payload.conference_state = conference.ConferenceState(user_count=user_count, active=True)
users = conference.Users()
for session in (session for session in self.sessions if not (len(session.streams) == 1 and session.streams[0].type == 'file-transfer')):
try:
user = next(user for user in users if user.entity == str(session.remote_identity.uri))
except StopIteration:
display_text = self.last_nicknames_map.get(str(session.remote_identity.uri), session.remote_identity.display_name)
user = conference.User(str(session.remote_identity.uri), display_text=display_text)
user_uri = '%s@%s' % (session.remote_identity.uri.user, session.remote_identity.uri.host)
screen_image = self.screen_images.get(user_uri, None)
if screen_image is not None and screen_image.active:
user.screen_image_url = screen_image.url
users.add(user)
joining_info = conference.JoiningInfo(when=session.start_time)
holdable_streams = [stream for stream in session.streams if stream.hold_supported]
session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams)
hold_status = conference.EndpointStatus('on-hold' if session_on_hold else 'connected')
display_text = self.session_nickname_map.get(session, session.remote_identity.display_name)
endpoint = conference.Endpoint(str(session._invitation.remote_contact_header.uri), display_text=display_text, joining_info=joining_info, status=hold_status)
for stream in session.streams:
if stream.type == 'file-transfer':
continue
endpoint.add(conference.Media(id(stream), media_type=self.format_conference_stream_type(stream)))
user.add(endpoint)
self.conference_info_payload.users = users
if self.files:
files = conference.FileResources(conference.FileResource(os.path.basename(file.name), file.hash, file.size, file.sender, file.status) for file in self.files)
self.conference_info_payload.conference_description.resources = conference.Resources(files=files)
return self.conference_info_payload.toxml()
def start(self):
if self.started:
return
if ServerConfig.enable_bonjour and self.identity.uri.user != 'conference':
room_user = self.identity.uri.user
self.bonjour_services = BonjourServices(service='sipuri', name='Conference Room %s' % room_user, uri_user=room_user)
self.bonjour_services.start()
self.message_dispatcher = proc.spawn(self._message_dispatcher)
self.audio_conference = AudioConference()
self.audio_conference.hold()
self.moh_player = MoHPlayer(self.audio_conference)
self.moh_player.start()
self.state = 'started'
def stop(self):
if not self.started:
return
self.state = 'stopping'
self.bonjour_services.stop()
self.bonjour_services = None
self.incoming_message_queue.send_exception(api.GreenletExit)
self.incoming_message_queue = None
self.message_dispatcher.kill(proc.ProcExit)
self.message_dispatcher = None
self.moh_player.stop()
self.moh_player = None
self.audio_conference = None
[handler.stop() for handler in self.transfer_handlers]
notification_center = NotificationCenter()
for subscription in self.subscriptions:
notification_center.remove_observer(self, sender=subscription)
subscription.end()
self.subscriptions = []
self.cleanup_files()
self.conference_info_payload = None
self.state = 'stopped'
@run_in_thread('file-io')
def cleanup_files(self):
path = os.path.join(ConferenceConfig.file_transfer_dir, self.uri)
try:
shutil.rmtree(path)
except EnvironmentError:
pass
path = os.path.join(ConferenceConfig.screen_sharing_dir, self.uri)
try:
shutil.rmtree(path)
except EnvironmentError:
pass
def _message_dispatcher(self):
"""Read from self.incoming_message_queue and dispatch the messages to other participants"""
while True:
session, message_type, data = self.incoming_message_queue.wait()
if message_type == 'message':
message = data.message
if message.sender.uri != session.remote_identity.uri:
continue
if message.body.startswith('?OTR:'):
continue
if message.timestamp is None:
message.timestamp = ISOTimestamp.utcnow()
message.sender.display_name = self.last_nicknames_map.get(str(session.remote_identity.uri), message.sender.display_name)
recipient = message.recipients[0]
private = len(message.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri
if private:
self.dispatch_private_message(session, message)
else:
self.history.append(message)
self.dispatch_message(session, message)
elif message_type == 'composing_indication':
if data.sender.uri != session.remote_identity.uri:
continue
recipient = data.recipients[0]
private = len(data.recipients) == 1 and '%s@%s' % (recipient.uri.user, recipient.uri.host) != self.uri
if private:
self.dispatch_private_iscomposing(session, data)
else:
self.dispatch_iscomposing(session, data)
def dispatch_message(self, session, message):
for s in (s for s in self.sessions if s is not session):
try:
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
try:
chat_stream.send_message(message.body, message.content_type, sender=message.sender, recipients=[self.identity], timestamp=message.timestamp, additional_headers=message.additional_headers)
except ChatStreamError, e:
log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e))
def dispatch_private_message(self, session, message):
# Private messages are delivered to all sessions matching the recipient but also to the sender,
# for replication in clients
recipient = message.recipients[0]
for s in (s for s in self.sessions if s is not session and s.remote_identity.uri in (recipient.uri, session.remote_identity.uri)):
try:
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
try:
chat_stream.send_message(message.body, message.content_type, sender=message.sender, recipients=[recipient], timestamp=message.timestamp, additional_headers=message.additional_headers)
except ChatStreamError, e:
log.error(u'Error dispatching private message to %s: %s' % (s.remote_identity.uri, e))
def dispatch_iscomposing(self, session, data):
for s in (s for s in self.sessions if s is not session):
try:
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
identity = CPIMIdentity(session.remote_identity.uri, session.remote_identity.display_name)
try:
chat_stream.send_composing_indication(data.state, data.refresh, sender=identity, recipients=[self.identity])
except ChatStreamError, e:
log.error(u'Error dispatching composing indication to %s: %s' % (s.remote_identity.uri, e))
def dispatch_private_iscomposing(self, session, data):
recipient_uri = data.recipients[0].uri
for s in (s for s in self.sessions if s is not session and s.remote_identity.uri == recipient_uri):
try:
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
continue
identity = CPIMIdentity(session.remote_identity.uri, session.remote_identity.display_name)
try:
chat_stream.send_composing_indication(data.state, data.refresh, sender=identity)
except ChatStreamError, e:
log.error(u'Error dispatching private composing indication to %s: %s' % (s.remote_identity.uri, e))
def dispatch_server_message(self, body, content_type='text/plain', exclude=None):
for session in (session for session in self.sessions if session is not exclude):
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
continue
chat_stream.send_message(body, content_type, sender=self.identity, recipients=[self.identity])
def dispatch_conference_info(self):
data = self.conference_info
for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'):
try:
subscription.push_content(conference.ConferenceDocument.content_type, data)
except (SIPCoreError, SIPCoreInvalidStateError):
pass
def dispatch_file(self, file):
sender_uri = file.sender.uri
for uri in set(session.remote_identity.uri for session in self.sessions if str(session.remote_identity.uri) != str(sender_uri)):
handler = OutgoingFileTransferHandler(self, uri, file)
self.transfer_handlers.add(handler)
handler.start()
def add_session(self, session):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=session)
self.sessions.append(session)
remote_uri = str(session.remote_identity.uri)
self.participants_counter[remote_uri] += 1
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=chat_stream)
try:
audio_stream = next(stream for stream in session.streams if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=audio_stream)
log.msg(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, audio_stream.codec, audio_stream.sample_rate,
audio_stream.local_rtp_address, audio_stream.local_rtp_port,
audio_stream.remote_rtp_address, audio_stream.remote_rtp_port))
if audio_stream.encryption.type != 'ZRTP':
# We don't listen for stream notifications early enough
if audio_stream.encryption.active:
log.msg(u'Room %s - %s audio stream enabled %s encryption' % (self.uri,
format_identity(session.remote_identity),
audio_stream.encryption.type))
else:
log.msg(u'Room %s - %s audio stream did not enable encryption' % (self.uri,
format_identity(session.remote_identity)))
try:
transfer_stream = next(stream for stream in session.streams if stream.type == 'file-transfer')
except StopIteration:
pass
else:
if transfer_stream.direction == 'recvonly':
transfer_handler = IncomingFileTransferHandler(self, session)
transfer_handler.start()
txt = u'Room %s - %s is uploading file %s (%s)' % (self.uri, format_identity(session.remote_identity), transfer_stream.file_selector.name.decode('utf-8'), self.format_file_size(transfer_stream.file_selector.size))
else:
transfer_handler = OutgoingFileTransferRequestHandler(self, session)
transfer_handler.start()
txt = u'Room %s - %s requested file %s' % (self.uri, format_identity(session.remote_identity), transfer_stream.file_selector.name.decode('utf-8'))
log.msg(txt)
self.dispatch_server_message(txt)
if len(session.streams) == 1:
return
welcome_handler = WelcomeHandler(self, initial=True, session=session, streams=session.streams)
welcome_handler.run()
self.dispatch_conference_info()
if len(self.sessions) == 1:
log.msg(u'Room %s - started by %s with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams)))
else:
log.msg(u'Room %s - %s joined with %s' % (self.uri, format_identity(session.remote_identity), self.format_stream_types(session.streams)))
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), self.format_stream_types(session.streams)), exclude=session)
if ServerConfig.enable_bonjour:
self._update_bonjour_presence()
def remove_session(self, session):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=session)
self.sessions.remove(session)
self.session_nickname_map.pop(session, None)
remote_uri = str(session.remote_identity.uri)
self.participants_counter[remote_uri] -= 1
if self.participants_counter[remote_uri] <= 0:
del self.participants_counter[remote_uri]
self.last_nicknames_map.pop(remote_uri, None)
try:
chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=chat_stream)
try:
audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=audio_stream)
try:
self.audio_conference.remove(audio_stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.moh_player.pause()
self.audio_conference.hold()
elif len(self.audio_conference.streams) == 1:
self.moh_player.play()
try:
next(stream for stream in session.streams if stream.type == 'file-transfer')
except StopIteration:
pass
else:
if len(session.streams) == 1:
return
self.dispatch_conference_info()
log.msg(u'Room %s - %s left conference after %s' % (self.uri, format_identity(session.remote_identity), self.format_session_duration(session)))
if not self.sessions:
log.msg(u'Room %s - Last participant left conference' % self.uri)
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), self.format_session_duration(session)))
if ServerConfig.enable_bonjour:
self._update_bonjour_presence()
def terminate_sessions(self, uri):
if not self.started:
return
for session in (session for session in self.sessions if session.remote_identity.uri == uri):
session.end()
def handle_incoming_subscription(self, subscribe_request, data):
log.msg('Room %s - subscription from %s' % (self.uri, data.headers['From'].uri))
if subscribe_request.event != 'conference':
log.msg('Room %s - Subscription rejected: only conference event is supported' % self.uri)
subscribe_request.reject(489)
return
NotificationCenter().add_observer(self, sender=subscribe_request)
self.subscriptions.append(subscribe_request)
subscribe_request.accept(conference.ConferenceDocument.content_type, self.conference_info)
def _accept_proposal(self, session, streams):
try:
session.accept_proposal(streams)
except IllegalStateError:
pass
session.proposal_timer = None
def add_file(self, file):
if file.status == 'INCOMPLETE':
self.dispatch_server_message('%s has cancelled upload of file %s (%s)' % (format_identity(file.sender), os.path.basename(file.name), self.format_file_size(file.size)))
else:
self.dispatch_server_message('%s has uploaded file %s (%s)' % (format_identity(file.sender), os.path.basename(file.name), self.format_file_size(file.size)))
self.files.append(file)
self.dispatch_conference_info()
if ConferenceConfig.push_file_transfer:
self.dispatch_file(file)
def add_screen_image(self, sender, image):
sender_uri = '%s@%s' % (sender.uri.user, sender.uri.host)
screen_image = self.screen_images.setdefault(sender_uri, ScreenImage(self, sender))
screen_image.save(image)
def _update_bonjour_presence(self):
num = len(self.sessions)
if num == 0:
num_str = 'No'
elif num == 1:
num_str = 'One'
elif num == 2:
num_str = 'Two'
else:
num_str = str(num)
txt = u'%s participant%s' % (num_str, '' if num==1 else 's')
presence_state = BonjourPresenceState('available', txt)
if self.bonjour_services is Null:
# This is the room being published all the time
from sylk.applications.conference import ConferenceApplication
ConferenceApplication().bonjour_room_service.presence_state = presence_state
else:
self.bonjour_services.presence_state = presence_state
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_RTPStreamDidEnableEncryption(self, notification):
stream = notification.sender
session = stream.session
log.msg(u'Room %s - %s %s stream enabled %s encryption' % (self.uri,
format_identity(session.remote_identity),
stream.type,
stream.encryption.type))
def _NH_RTPStreamDidNotEnableEncryption(self, notification):
stream = notification.sender
session = stream.session
log.msg(u'Room %s - %s %s stream did not enable encryption: %s' % (self.uri,
format_identity(session.remote_identity),
stream.type,
notification.data.reason))
def _NH_RTPStreamDidTimeout(self, notification):
stream = notification.sender
if stream.type != 'audio':
return
session = stream.session
log.msg(u'Room %s - audio stream for session %s timed out' % (self.uri, format_identity(session.remote_identity)))
if session.streams == [stream]:
session.end()
def _NH_ChatStreamGotMessage(self, notification):
stream = notification.sender
data = notification.data
session = notification.sender.session
message = data.message
content_type = message.content_type.lower()
if content_type.startswith(('text/', 'image/')):
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
self.incoming_message_queue.send((session, 'message', data))
elif content_type == 'application/blink-screensharing':
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
self.add_screen_image(message.sender, message.body)
else:
stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message')
def _NH_ChatStreamGotComposingIndication(self, notification):
stream = notification.sender
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
data = notification.data
session = notification.sender.session
self.incoming_message_queue.send((session, 'composing_indication', data))
def _NH_ChatStreamGotNicknameRequest(self, notification):
nickname = notification.data.nickname
session = notification.sender.session
chunk = notification.data.chunk
if nickname:
if nickname in self.session_nickname_map.values() and (session not in self.session_nickname_map or self.session_nickname_map[session] != nickname):
notification.sender.reject_nickname(chunk, 425, 'Nickname reserved or already in use')
return
self.session_nickname_map[session] = nickname
self.last_nicknames_map[str(session.remote_identity.uri)] = nickname
else:
self.session_nickname_map.pop(session, None)
self.last_nicknames_map.pop(str(session.remote_identity.uri), None)
notification.sender.accept_nickname(chunk)
self.dispatch_conference_info()
def _NH_SIPIncomingSubscriptionDidEnd(self, notification):
subscription = notification.sender
try:
self.subscriptions.remove(subscription)
except ValueError:
pass
else:
notification.center.remove_observer(self, sender=subscription)
def _NH_SIPSessionDidChangeHoldState(self, notification):
session = notification.sender
if notification.data.originator == 'remote':
if notification.data.on_hold:
log.msg(u'Room %s - %s has put the audio session on hold' % (self.uri, format_identity(session.remote_identity)))
else:
log.msg(u'Room %s - %s has taken the audio session out of hold' % (self.uri, format_identity(session.remote_identity)))
self.dispatch_conference_info()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat']
if not audio_streams and not chat_streams:
session.reject_proposal()
return
streams = [streams[0] for streams in (audio_streams, chat_streams) if streams]
timer = reactor.callLater(3, self._accept_proposal, session, streams)
old_timer = getattr(session, 'proposal_timer', None)
assert old_timer is None
session.proposal_timer = timer
def _NH_SIPSessionProposalRejected(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
timer = getattr(session, 'proposal_timer', None)
if timer is not None:
timer.cancel()
session.proposal_timer = None
def _NH_SIPSessionHadProposalFailure(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
timer = getattr(session, 'proposal_timer', None)
assert timer is not None
timer.cancel()
session.proposal_timer = None
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
session = notification.sender
for stream in notification.data.added_streams:
notification.center.add_observer(self, sender=stream)
txt = u'%s has added %s' % (format_identity(session.remote_identity), stream.type)
log.msg(u'Room %s - %s' % (self.uri, txt))
self.dispatch_server_message(txt, exclude=session)
if stream.type == 'audio':
log.msg(u'Room %s - audio stream %s/%sHz, end-points: %s:%d <-> %s:%d' % (self.uri, stream.codec, stream.sample_rate,
stream.local_rtp_address, stream.local_rtp_port,
stream.remote_rtp_address, stream.remote_rtp_port))
if stream.encryption.type != 'ZRTP':
# We don't listen for stream notifications early enough
if stream.encryption.active:
log.msg(u'Room %s - %s %s stream enabled %s encryption' % (self.uri,
format_identity(session.remote_identity),
stream.type,
stream.encryption.type))
else:
log.msg(u'Room %s - %s %s stream did not enable encryption' % (self.uri,
format_identity(session.remote_identity),
stream.type))
if notification.data.added_streams:
welcome_handler = WelcomeHandler(self, initial=False, session=session, streams=notification.data.added_streams)
welcome_handler.run()
for stream in notification.data.removed_streams:
notification.center.remove_observer(self, sender=stream)
txt = u'%s has removed %s' % (format_identity(session.remote_identity), stream.type)
log.msg(u'Room %s - %s' % (self.uri, txt))
self.dispatch_server_message(txt, exclude=session)
if stream.type == 'audio':
try:
self.audio_conference.remove(stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.moh_player.pause()
self.audio_conference.hold()
elif len(self.audio_conference.streams) == 1:
self.moh_player.play()
if not session.streams:
log.msg(u'Room %s - %s has removed all streams, session will be terminated' % (self.uri, format_identity(session.remote_identity)))
session.end()
self.dispatch_conference_info()
def _NH_SIPSessionTransferNewIncoming(self, notification):
log.msg(u'Room %s - Call transfer request rejected, REFER must be out of dialog (RFC4579 5.5)' % self.uri)
notification.sender.reject_transfer(403)
def _NH_SIPSessionWillEnd(self, notification):
session = notification.sender
timer = getattr(session, 'proposal_timer', None)
if timer is not None and timer.isActive():
timer.cancel()
session.proposal_timer = None
@staticmethod
def format_stream_types(streams):
if not streams:
return ''
if len(streams) == 1:
txt = 'with %s' % streams[0].type
else:
txt = 'with %s' % ','.join(stream.type for stream in streams[:-1])
txt += ' and %s' % streams[-1:][0].type
return txt
@staticmethod
def format_conference_stream_type(stream):
if stream.type == 'chat':
return 'message'
return stream.type
@staticmethod
def format_session_duration(session):
if session.start_time:
duration = session.end_time - session.start_time
seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1
minutes, seconds = seconds / 60, seconds % 60
hours, minutes = minutes / 60, minutes % 60
hours += duration.days*24
if not minutes and not hours:
duration_text = '%d seconds' % seconds
elif not hours:
duration_text = '%02d:%02d' % (minutes, seconds)
else:
duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds)
else:
duration_text = '0s'
return duration_text
@staticmethod
def format_file_size(size):
infinite = float('infinity')
boundaries = [( 1024, '%d bytes', 1),
( 10*1024, '%.2f KB', 1024.0), ( 1024*1024, '%.1f KB', 1024.0),
( 10*1024*1024, '%.2f MB', 1024*1024.0), (1024*1024*1024, '%.1f MB', 1024*1024.0),
(10*1024*1024*1024, '%.2f GB', 1024*1024*1024.0), ( infinite, '%.1f GB', 1024*1024*1024.0)]
for boundary, format, divisor in boundaries:
if size < boundary:
return format % (size/divisor,)
else:
return "%d bytes" % size
class MoHPlayer(object):
implements(IObserver)
def __init__(self, conference):
self.conference = conference
self.files = None
self.paused = None
self._player = None
def start(self):
- files = glob('%s/*.wav' % ResourcePath('sounds/moh').normalized)
+ files = glob('%s/*.wav' % Resources.get('sounds/moh'))
if not files:
log.error(u'No files found, MoH is disabled')
return
random.shuffle(files)
self.files = cycle(files)
self._player = WavePlayer(SIPApplication.voice_audio_mixer, '', pause_time=1, initial_delay=1, volume=20)
self.paused = True
self.conference.bridge.add(self._player)
NotificationCenter().add_observer(self, sender=self._player)
def stop(self):
if self._player is None:
return
NotificationCenter().remove_observer(self, sender=self._player)
self._player.stop()
self.paused = True
self.conference.bridge.remove(self._player)
self.conference = None
def play(self):
if self._player is not None and self.paused:
self.paused = False
self._play_next_file()
def pause(self):
if self._player is not None and not self.paused:
self.paused = True
self._player.stop()
def _play_next_file(self):
self._player.filename = next(self.files)
self._player.play()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_WavePlayerDidFail(self, notification):
if not self.paused:
self._play_next_file()
_NH_WavePlayerDidEnd = _NH_WavePlayerDidFail
class WelcomeHandler(object):
implements(IObserver)
def __init__(self, room, initial, session, streams):
self.room = room
self.initial = initial
self.session = session
self.streams = streams
self.procs = proc.RunningProcSet()
@run_in_green_thread
def run(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
for stream in self.streams:
if stream.type == 'audio':
self.procs.spawn(self.audio_welcome, stream)
elif stream.type == 'chat':
self.procs.spawn(self.chat_welcome, stream)
self.procs.waitall()
notification_center.remove_observer(self, sender=self.session)
self.session = None
self.streams = None
self.room = None
self.procs = None
def play_file_in_player(self, player, file, delay):
player.filename = file
player.pause_time = delay
try:
player.play().wait()
except WavePlayerError, e:
log.warning(u"Error playing file %s: %s" % (file, e))
def audio_welcome(self, stream):
player = WavePlayer(stream.mixer, '', pause_time=1, initial_delay=1, volume=50)
stream.bridge.add(player)
try:
if self.initial:
- file = ResourcePath('sounds/co_welcome_conference.wav').normalized
+ file = Resources.get('sounds/co_welcome_conference.wav')
self.play_file_in_player(player, file, 1)
user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(self.session.remote_identity.uri)]))
if user_count == 0:
- file = ResourcePath('sounds/co_only_one.wav').normalized
+ file = Resources.get('sounds/co_only_one.wav')
self.play_file_in_player(player, file, 0.5)
elif user_count == 1:
- file = ResourcePath('sounds/co_there_is_one.wav').normalized
+ file = Resources.get('sounds/co_there_is_one.wav')
self.play_file_in_player(player, file, 0.5)
elif user_count < 100:
- file = ResourcePath('sounds/co_there_are.wav').normalized
+ file = Resources.get('sounds/co_there_are.wav')
self.play_file_in_player(player, file, 0.2)
if user_count <= 24:
- file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized
+ file = Resources.get('sounds/bi_%d.wav' % user_count)
self.play_file_in_player(player, file, 0.1)
else:
- file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized
+ file = Resources.get('sounds/bi_%d0.wav' % (user_count / 10))
self.play_file_in_player(player, file, 0.1)
- file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized
+ file = Resources.get('sounds/bi_%d.wav' % (user_count % 10))
self.play_file_in_player(player, file, 0.1)
- file = ResourcePath('sounds/co_more_participants.wav').normalized
+ file = Resources.get('sounds/co_more_participants.wav')
self.play_file_in_player(player, file, 0)
- file = ResourcePath('sounds/connected_tone.wav').normalized
+ file = Resources.get('sounds/connected_tone.wav')
self.play_file_in_player(player, file, 0.1)
except proc.ProcExit:
# No need to remove the bridge from the stream, it's done automatically
pass
else:
stream.bridge.remove(player)
self.room.audio_conference.add(stream)
self.room.audio_conference.unhold()
if len(self.room.audio_conference.streams) == 1:
self.room.moh_player.play()
else:
self.room.moh_player.pause()
finally:
player.stop()
def chat_welcome(self, stream):
if self.initial:
txt = 'Welcome to SylkServer!'
else:
txt = ''
user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions) - set([str(self.session.remote_identity.uri)]))
if user_count == 0:
txt += ' You are the first participant'
else:
if user_count == 1:
txt += ' There is one more participant'
else:
txt += ' There are %s more participants' % user_count
txt += ' in this conference room.'
if not ServerConfig.enable_bonjour:
if self.room.config.advertise_xmpp_support or self.room.config.pstn_access_numbers:
txt += '\n\nOther participants can join at these addresses:\n\n'
if self.room.config.pstn_access_numbers:
if len(self.room.config.pstn_access_numbers) == 1:
nums = self.room.config.pstn_access_numbers[0]
else:
nums = ', '.join(self.room.config.pstn_access_numbers[:-1]) + ' or %s' % self.room.config.pstn_access_numbers[-1]
txt += ' - Using a landline or mobile phone, dial %s (audio)\n' % nums
if self.room.config.advertise_xmpp_support:
txt += ' - Using an XMPP client, connect to group chat room %s (chat)\n' % self.room.uri
txt += ' - Using an XMPP Jingle capable client, add contact %s and call it (audio)\n' % self.room.uri
txt += ' - Using a SIP client, initiate a session to %s (audio and chat)\n' % self.room.uri
stream.send_message(txt, 'text/plain', sender=self.room.identity, recipients=[self.room.identity])
for msg in self.room.history:
stream.send_message(msg.body, msg.content_type, sender=msg.sender, recipients=[self.room.identity], timestamp=msg.timestamp)
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionWillEnd(self, notification):
self.procs.killall()
class RoomFile(object):
def __init__(self, name, hash, size, sender, status):
self.name = name
self.hash = hash
self.size = size
self.sender = sender
self.status = status
@property
def file_selector(self):
return FileSelector.for_file(self.name.encode('utf-8'), hash=self.hash)
class IncomingFileTransferHandler(object):
implements(IObserver)
def __init__(self, room, session):
self.room = weakref.ref(room)
self.room_uri = room.uri
self.session = session
self.stream = next(stream for stream in self.session.streams if stream.type == 'file-transfer' and stream.direction == 'recvonly')
self.error = False
self.ended = False
self.file = None
self.file_selector = None
self.filename = None
self.hash = None
self.status = None
self.timer = None
self.transfer_finished = False
def start(self):
self.file_selector = self.stream.file_selector
path = os.path.join(ConferenceConfig.file_transfer_dir, self.room_uri)
makedirs(path)
self.filename = filename = os.path.join(path, self.file_selector.name.decode('utf-8'))
basename, ext = os.path.splitext(filename)
i = 1
while os.path.exists(filename):
filename = '%s_%d%s' % (basename, i, ext)
i += 1
self.filename = filename
try:
self.file = open(self.filename, 'wb')
except EnvironmentError:
log.msg('Room %s - cannot write destination filename: %s' % (self.room_uri, self.filename))
self.session.end()
return
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.stream)
self.hash = hashlib.sha1()
@run_in_thread('file-transfer')
def write_chunk(self, data):
notification_center = NotificationCenter()
if data is not None:
try:
self.file.write(data)
except EnvironmentError, e:
notification_center.post_notification('IncomingFileTransferHandlerGotError', sender=self, data=NotificationData(error=str(e)))
else:
self.hash.update(data)
else:
self.file.close()
if self.error:
notification_center.post_notification('IncomingFileTransferHandlerDidFail', sender=self)
else:
notification_center.post_notification('IncomingFileTransferHandlerDidEnd', sender=self)
@run_in_thread('file-io')
def remove_bogus_file(self, filename):
try:
os.unlink(filename)
except OSError:
pass
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionDidEnd(self, notification):
self.ended = True
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.timer = None
notification.center.remove_observer(self, sender=self.stream)
notification.center.remove_observer(self, sender=self.session)
# Mark end of write operation
self.write_chunk(None)
def _NH_FileTransferStreamGotChunk(self, notification):
self.write_chunk(notification.data.content)
def _NH_FileTransferStreamDidFinish(self, notification):
self.transfer_finished = True
if self.timer is None:
self.timer = reactor.callLater(5, self.session.end)
def _NH_IncomingFileTransferHandlerGotError(self, notification):
log.error('Error while handling incoming file transfer: %s' % notification.data.error)
self.error = True
self.status = notification.data.error
if not self.ended and self.timer is None:
self.timer = reactor.callLater(5, self.session.end)
def _NH_IncomingFileTransferHandlerDidEnd(self, notification):
notification.center.remove_observer(self, sender=self)
remote_hash = self.file_selector.hash
if not self.transfer_finished:
log.msg('File transfer of %s cancelled' % os.path.basename(self.filename))
self.remove_bogus_file(self.filename)
self.status = 'INCOMPLETE'
else:
local_hash = 'sha1:' + ':'.join(re.findall(r'..', self.hash.hexdigest().upper()))
if local_hash != remote_hash:
log.warning('Hash of transferred file does not match the remote hash (file may have changed).')
self.status = 'Hash missmatch'
self.remove_bogus_file(self.filename)
else:
self.status = 'OK'
sender = CPIMIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name)
file = RoomFile(self.filename, remote_hash, self.file_selector.size, sender, self.status)
room = self.room() or Null
room.add_file(file)
self.session = None
self.stream = None
def _NH_IncomingFileTransferHandlerDidFail(self, notification):
notification.center.remove_observer(self, sender=self)
sender = CPIMIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name)
file = RoomFile(self.filename, self.file_selector.hash, self.file_selector.size, sender, self.status)
room = self.room() or Null
room.add_file(file)
self.session = None
self.stream = None
class OutgoingFileTransferRequestHandler(object):
implements(IObserver)
def __init__(self, room, session):
self.room = weakref.ref(room)
self.session = session
self.stream = next(stream for stream in self.session.streams if stream.type == 'file-transfer')
self.timer = None
def start(self):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.stream)
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_FileTransferStreamDidFinish(self, notification):
if self.timer is None:
self.timer = reactor.callLater(2, self.session.end)
def _NH_SIPSessionDidEnd(self, notification):
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.timer = None
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.stream)
notification_center.remove_observer(self, sender=self.session)
self.session = None
self.stream = None
_NH_SIPSessionDidFail = _NH_SIPSessionDidEnd
class InterruptFileTransfer(Exception): pass
class OutgoingFileTransferHandler(object):
implements(IObserver)
def __init__(self, room, destination, file):
self.room_uri = room.identity.uri
self.destination = destination
self.file = file
self.session = None
self.stream = None
self.timer = None
@run_in_green_thread
def start(self):
self.greenlet = api.getcurrent()
settings = SIPSimpleSettings()
account = DefaultAccount()
if account.sip.outbound_proxy is not None:
uri = SIPURI(host=account.sip.outbound_proxy.host,
port=account.sip.outbound_proxy.port,
parameters={'transport': account.sip.outbound_proxy.transport})
else:
uri = SIPURI.new(self.destination)
lookup = DNSLookup()
try:
route = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait()[0]
except (DNSLookupError, IndexError):
return
notification_center = NotificationCenter()
self.session = Session(account)
self.stream = FileTransferStream(self.file.file_selector, 'sendonly')
notification_center.add_observer(self, sender=self.session)
notification_center.add_observer(self, sender=self.stream)
from_header = FromHeader(SIPURI.new(self.room_uri), u'Conference File Transfer')
to_header = ToHeader(SIPURI.new(self.destination))
extra_headers = []
if ThorNodeConfig.enabled:
extra_headers.append(Header('Thor-Scope', 'conference-invitation'))
extra_headers.append(Header('X-Originator-From', str(self.file.sender.uri)))
extra_headers.append(SubjectHeader(u'File uploaded by %s' % self.file.sender))
self.session.connect(from_header, to_header, route=route, streams=[self.stream], is_focus=True, extra_headers=extra_headers)
def stop(self):
if self.session is not None:
self.session.end()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_FileTransferStreamDidFinish(self, notification):
if self.timer is None:
self.timer = reactor.callLater(2, self.session.end)
def _NH_SIPSessionDidEnd(self, notification):
if self.timer is not None and self.timer.active():
self.timer.cancel()
self.timer = None
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.stream)
notification_center.remove_observer(self, sender=self.session)
self.session = None
self.stream = None
_NH_SIPSessionDidFail = _NH_SIPSessionDidEnd
diff --git a/sylk/applications/ircconference/room.py b/sylk/applications/ircconference/room.py
index e7f1cd4..f104c20 100644
--- a/sylk/applications/ircconference/room.py
+++ b/sylk/applications/ircconference/room.py
@@ -1,679 +1,679 @@
# Copyright (C) 2011 AG Projects. See LICENSE for details.
#
import random
import urllib
import lxml.html
import lxml.html.clean
from itertools import count
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from eventlib import coros, proc
from sipsimple.audio import AudioConference, WavePlayer, WavePlayerError
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SIPURI, SIPCoreError, SIPCoreInvalidStateError
from sipsimple.payloads.conference import Conference, ConferenceDocument, ConferenceDescription, ConferenceState, Endpoint, EndpointStatus, HostInfo, JoiningInfo, Media, User, Users, WebPage
from sipsimple.streams.applications.chat import CPIMIdentity
from sipsimple.streams.msrp import ChatStreamError
from sipsimple.threading import run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from twisted.internet import protocol, reactor
from twisted.words.protocols import irc
from zope.interface import implements
from sylk.applications.ircconference.configuration import get_room_configuration
from sylk.applications.ircconference.logger import log
-from sylk.configuration.datatypes import ResourcePath
+from sylk.resources import Resources
def format_identity(identity):
uri = identity.uri
if identity.display_name:
return u'%s <sip:%s@%s>' % (identity.display_name, uri.user, uri.host)
else:
return u'sip:%s@%s' % (uri.user, uri.host)
def format_stream_types(streams):
if not streams:
return ''
if len(streams) == 1:
txt = 'with %s' % streams[0].type
else:
txt = 'with %s' % ','.join(stream.type for stream in streams[:-1])
txt += ' and %s' % streams[-1:][0].type
return txt
def format_session_duration(session):
if session.start_time:
duration = session.end_time - session.start_time
seconds = duration.seconds if duration.microseconds < 500000 else duration.seconds+1
minutes, seconds = seconds / 60, seconds % 60
hours, minutes = minutes / 60, minutes % 60
hours += duration.days*24
if not minutes and not hours:
duration_text = '%d seconds' % seconds
elif not hours:
duration_text = '%02d:%02d' % (minutes, seconds)
else:
duration_text = '%02d:%02d:%02d' % (hours, minutes, seconds)
else:
duration_text = '0s'
return duration_text
def format_conference_stream_type(stream):
if stream.type == 'chat':
return 'message'
return stream.type
def html2text(data):
try:
doc = lxml.html.document_fromstring(data)
cleaner = lxml.html.clean.Cleaner(style=True)
doc = cleaner.clean_html(doc)
return doc.text_content().strip('\n')
except Exception:
return ''
class IRCMessage(object):
def __init__(self, username, uri, body, content_type='text/plain'):
self.sender = CPIMIdentity(uri, display_name=username)
self.body = body
self.content_type = content_type
class IRCRoom(object):
"""
Object representing a conference room, it will handle the message dispatching
among all the participants.
"""
__metaclass__ = Singleton
implements(IObserver)
def __init__(self, uri):
self.uri = uri
self.identity = CPIMIdentity.parse('<sip:%s>' % self.uri)
self.sessions = []
self.sessions_with_proposals = []
self.subscriptions = []
self.pending_messages = []
self.state = 'stopped'
self.incoming_message_queue = coros.queue()
self.message_dispatcher = None
self.audio_conference = None
self.conference_info_payload = None
self.conference_info_version = count(1)
self.irc_connector = None
self.irc_protocol = None
@classmethod
def get_room(cls, uri):
room_uri = '%s@%s' % (uri.user, uri.host)
room = cls(room_uri)
return room
@property
def empty(self):
return len(self.sessions) == 0
@property
def started(self):
return self.state == 'started'
def start(self):
if self.state != 'stopped':
return
config = get_room_configuration(self.uri.split('@')[0])
factory = IRCBotFactory(config)
host, port = config.server
self.irc_connector = reactor.connectTCP(host, port, factory)
NotificationCenter().add_observer(self, sender=self.irc_connector.factory)
self.message_dispatcher = proc.spawn(self._message_dispatcher)
self.audio_conference = AudioConference()
self.audio_conference.hold()
self.state = 'started'
def stop(self):
if self.state != 'started':
return
self.state = 'stopped'
NotificationCenter().remove_observer(self, sender=self.irc_connector.factory)
self.irc_connector.factory.stop_requested = True
self.irc_connector.disconnect()
self.irc_connector = None
self.message_dispatcher.kill(proc.ProcExit)
self.audio_conference = None
def _message_dispatcher(self):
"""Read from self.incoming_message_queue and dispatch the messages to other participants"""
while True:
session, message_type, data = self.incoming_message_queue.wait()
if message_type == 'msrp_message':
if data.sender.uri != session.remote_identity.uri:
return
self.dispatch_message(session, data)
elif message_type == 'irc_message':
self.dispatch_irc_message(data)
def dispatch_message(self, session, message):
for s in (s for s in self.sessions if s is not session):
try:
identity = CPIMIdentity.parse(format_identity(session.remote_identity, True))
chat_stream = next(stream for stream in s.streams if stream.type == 'chat')
except StopIteration:
pass
else:
try:
chat_stream.send_message(message.body, message.content_type, sender=identity, recipients=[self.identity], timestamp=message.timestamp)
except ChatStreamError, e:
log.error(u'Error dispatching message to %s: %s' % (s.remote_identity.uri, e))
def dispatch_irc_message(self, message):
for session in self.sessions:
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
try:
chat_stream.send_message(message.body, message.content_type, sender=message.sender, recipients=[self.identity])
except ChatStreamError, e:
log.error(u'Error dispatching message to %s: %s' % (session.remote_identity.uri, e))
def dispatch_server_message(self, body, content_type='text/plain', exclude=None):
for session in (session for session in self.sessions if session is not exclude):
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
try:
chat_stream.send_message(body, content_type, sender=self.identity, recipients=[self.identity])
except ChatStreamError, e:
log.error(u'Error dispatching message to %s: %s' % (session.remote_identity.uri, e))
def get_conference_info(self):
# Send request to get participants list, we'll get a notification with it
if self.irc_protocol is not None:
self.irc_protocol.get_participants()
else:
self.dispatch_conference_info([])
def dispatch_conference_info(self, irc_participants):
data = self.build_conference_info_payload(irc_participants)
for subscription in (subscription for subscription in self.subscriptions if subscription.state == 'active'):
try:
subscription.push_content(ConferenceDocument.content_type, data)
except (SIPCoreError, SIPCoreInvalidStateError):
pass
def build_conference_info_payload(self, irc_participants):
irc_configuration = get_room_configuration(self.uri.split('@')[0])
if self.conference_info_payload is None:
settings = SIPSimpleSettings()
conference_description = ConferenceDescription(display_text='#%s on %s' % (irc_configuration.channel, irc_configuration.server[0]), free_text='Hosted by %s' % settings.user_agent)
host_info = HostInfo(web_page=WebPage(irc_configuration.website))
self.conference_info_payload = Conference(self.identity.uri, conference_description=conference_description, host_info=host_info, users=Users())
self.conference_info_payload.version = next(self.conference_info_version)
user_count = len(set(str(s.remote_identity.uri) for s in self.sessions)) + len(irc_participants)
self.conference_info_payload.conference_state = ConferenceState(user_count=user_count, active=True)
users = Users()
for session in self.sessions:
try:
user = next(user for user in users if user.entity == str(session.remote_identity.uri))
except StopIteration:
user = User(str(session.remote_identity.uri), display_text=session.remote_identity.display_name)
users.add(user)
joining_info = JoiningInfo(when=session.start_time)
holdable_streams = [stream for stream in session.streams if stream.hold_supported]
session_on_hold = holdable_streams and all(stream.on_hold_by_remote for stream in holdable_streams)
hold_status = EndpointStatus('on-hold' if session_on_hold else 'connected')
endpoint = Endpoint(str(session._invitation.remote_contact_header.uri), display_text=session.remote_identity.display_name, joining_info=joining_info, status=hold_status)
for stream in session.streams:
endpoint.add(Media(id(stream), media_type=format_conference_stream_type(stream)))
user.add(endpoint)
for nick in irc_participants:
irc_uri = '%s@%s' % (urllib.quote(nick), irc_configuration.server[0])
user = User(irc_uri, display_text=nick)
users.add(user)
endpoint = Endpoint(irc_uri, display_text=nick)
endpoint.add(Media(random.randint(100000000, 999999999), media_type='message'))
user.add(endpoint)
self.conference_info_payload.users = users
return self.conference_info_payload.toxml()
def add_session(self, session):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=session)
self.sessions.append(session)
try:
chat_stream = next(stream for stream in session.streams if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=chat_stream)
try:
audio_stream = next(stream for stream in session.streams if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.add_observer(self, sender=audio_stream)
log.msg(u'Audio stream using %s/%sHz (%s), end-points: %s:%d <-> %s:%d' % (audio_stream.codec, audio_stream.sample_rate,
audio_stream.local_rtp_address, audio_stream.local_rtp_port,
audio_stream.remote_rtp_address, audio_stream.remote_rtp_port))
welcome_handler = WelcomeHandler(self, session)
welcome_handler.start()
self.get_conference_info()
if len(self.sessions) == 1:
log.msg(u'%s started conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams)))
else:
log.msg(u'%s joined conference %s %s' % (format_identity(session.remote_identity), self.uri, format_stream_types(session.streams)))
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has joined the room %s' % (format_identity(session.remote_identity), format_stream_types(session.streams)), exclude=session)
def remove_session(self, session):
notification_center = NotificationCenter()
try:
chat_stream = next(stream for stream in session.streams or [] if stream.type == 'chat')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=chat_stream)
try:
audio_stream = next(stream for stream in session.streams or [] if stream.type == 'audio')
except StopIteration:
pass
else:
notification_center.remove_observer(self, sender=audio_stream)
try:
self.audio_conference.remove(audio_stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.audio_conference.hold()
notification_center.remove_observer(self, sender=session)
self.sessions.remove(session)
self.get_conference_info()
log.msg(u'%s left conference %s after %s' % (format_identity(session.remote_identity), self.uri, format_session_duration(session)))
if not self.sessions:
log.msg(u'Last participant left conference %s' % self.uri)
if str(session.remote_identity.uri) not in set(str(s.remote_identity.uri) for s in self.sessions if s is not session):
self.dispatch_server_message('%s has left the room after %s' % (format_identity(session.remote_identity), format_session_duration(session)))
def accept_proposal(self, session, streams):
if session in self.sessions_with_proposals:
session.accept_proposal(streams)
self.sessions_with_proposals.remove(session)
def handle_incoming_subscription(self, subscribe_request, data):
if subscribe_request.event != 'conference':
subscribe_request.reject(489)
return
NotificationCenter().add_observer(self, sender=subscribe_request)
subscribe_request.accept()
self.subscriptions.append(subscribe_request)
self.get_conference_info()
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPIncomingSubscriptionDidEnd(self, notification):
subscription = notification.sender
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=subscription)
self.subscriptions.remove(subscription)
def _NH_SIPSessionDidChangeHoldState(self, notification):
session = notification.sender
if notification.data.originator == 'remote':
if notification.data.on_hold:
log.msg(u'%s has put the audio session on hold' % format_identity(session.remote_identity))
else:
log.msg(u'%s has taken the audio session out of hold' % format_identity(session.remote_identity))
self.get_conference_info()
def _NH_SIPSessionNewProposal(self, notification):
if notification.data.originator == 'remote':
session = notification.sender
audio_streams = [stream for stream in notification.data.proposed_streams if stream.type=='audio']
chat_streams = [stream for stream in notification.data.proposed_streams if stream.type=='chat']
if not audio_streams and not chat_streams:
session.reject_proposal()
return
if chat_streams:
chat_streams[0].chatroom_capabilities = []
streams = [streams[0] for streams in (audio_streams, chat_streams) if streams]
self.sessions_with_proposals.append(session)
reactor.callLater(4, self.accept_proposal, session, streams)
def _NH_SIPSessionProposalRejected(self, notification):
session = notification.sender
self.sessions_with_proposals.remove(session)
def _NH_SIPSessionDidRenegotiateStreams(self, notification):
session = notification.sender
for stream in notification.data.added_streams:
notification.center.add_observer(self, sender=stream)
log.msg(u'%s has added %s to %s' % (format_identity(session.remote_identity), stream.type, self.uri))
self.dispatch_server_message('%s has added %s' % (format_identity(session.remote_identity), stream.type), exclude=session)
if stream.type == 'audio':
log.msg(u'Audio stream using %s/%sHz, end-points: %s:%d <-> %s:%d' % (stream.codec, stream.sample_rate,
stream.local_rtp_address, stream.local_rtp_port,
stream.remote_rtp_address, stream.remote_rtp_port))
welcome_handler = WelcomeHandler(self, session)
welcome_handler.start(welcome_prompt=False)
for stream in notification.data.removed_streams:
notification.center.remove_observer(self, sender=stream)
log.msg(u'%s has removed %s from %s' % (format_identity(session.remote_identity), stream.type, self.uri))
self.dispatch_server_message('%s has removed %s' % (format_identity(session.remote_identity), stream.type), exclude=session)
if stream.type == 'audio':
try:
self.audio_conference.remove(stream)
except ValueError:
# User may hangup before getting bridged into the conference
pass
if len(self.audio_conference.streams) == 0:
self.audio_conference.hold()
if not session.streams:
log.msg(u'%s has removed all streams from %s, session will be terminated' % (format_identity(session.remote_identity), self.uri))
session.end()
self.get_conference_info()
def _NH_RTPStreamDidTimeout(self, notification):
stream = notification.sender
if stream.type != 'audio':
return
session = stream.session
log.msg(u'Audio stream for session %s timed out' % format_identity(session.remote_identity))
if session.streams == [stream]:
session.end()
def _NH_ChatStreamGotMessage(self, notification):
stream = notification.sender
message = notification.data.message
if message.content_type not in ('text/html', 'text/plain'):
log.msg(u'Unsupported content type: %s, ignoring message' % message.content_type)
stream.msrp_session.send_report(notification.data.chunk, 413, 'Unwanted message')
return
stream.msrp_session.send_report(notification.data.chunk, 200, 'OK')
# Send MSRP chat message to other participants
session = stream.session
self.incoming_message_queue.send((session, 'msrp_message', message))
# Send MSRP chat message to IRC chat room
if message.content_type == 'text/html':
body = html2text(message.body)
elif message.content_type == 'text/plain':
body = message.body
else:
raise RuntimeError('unexpected message type: %s' % message.content_type)
sender = message.sender
irc_message = '%s: %s' % (format_identity(sender), body)
if self.irc_protocol is not None:
self.irc_protocol.send_message(irc_message.encode('utf-8'))
else:
self.pending_messages.append(irc_message)
def _NH_ChatStreamGotNicknameRequest(self, notification):
# Discard the nickname but pretend we accept it so that XMPP clients can work
chunk = notification.data.chunk
notification.sender.accept_nickname(chunk)
def _NH_IRCBotGotConnected(self, notification):
self.irc_protocol = notification.data.protocol
# Send enqueued messages
while self.pending_messages:
message = self.pending_messages.pop(0)
self.irc_protocol.send_message(message.encode('utf-8'))
# Update participants list
self.get_conference_info()
def _NH_IRCBotGotDisconnected(self, notification):
self.irc_protocol = None
def _NH_IRCBotGotMessage(self, notification):
message = notification.data.message
self.incoming_message_queue.send((None, 'irc_message', message))
def _NH_IRCBotGotParticipantsList(self, notification):
self.dispatch_conference_info(notification.data.participants)
def _NH_IRCBotJoinedChannel(self, notification):
self.get_conference_info()
def _NH_IRCBotUserJoined(self, notification):
self.dispatch_server_message('%s joined the IRC channel' % notification.data.user)
self.get_conference_info()
def _NH_IRCBotUserLeft(self, notification):
self.dispatch_server_message('%s left the IRC channel' % notification.data.user)
self.get_conference_info()
def _NH_IRCBotUserQuit(self, notification):
self.dispatch_server_message('%s quit the IRC channel: %s' % (notification.data.user, notification.data.reason))
self.get_conference_info()
def _NH_IRCBotUserKicked(self, notification):
data = notification.data
self.dispatch_server_message('%s kicked %s out of the IRC channel: %s' % (data.kicker, data.kickee, data.reason))
self.get_conference_info()
def _NH_IRCBotUserRenamed(self, notification):
self.dispatch_server_message('%s changed his name to %s' % (notification.data.oldname, notification.data.newname))
self.get_conference_info()
def _NH_IRCBotUserAction(self, notification):
self.dispatch_server_message('%s %s' % (notification.data.user, notification.data.action))
class WelcomeHandler(object):
implements(IObserver)
def __init__(self, room, session):
self.room = room
self.session = session
self.proc = None
@run_in_green_thread
def start(self, welcome_prompt=True):
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self.session)
self.proc = proc.spawn(self.play_audio_welcome, welcome_prompt)
self.proc.wait()
notification_center.remove_observer(self, sender=self.session)
self.session = None
self.room = None
self.proc = None
def play_file_in_player(self, player, file, delay):
player.filename = file
player.pause_time = delay
try:
player.play().wait()
except WavePlayerError, e:
log.warning(u"Error playing file %s: %s" % (file, e))
def play_audio_welcome(self, welcome_prompt):
try:
audio_stream = next(stream for stream in self.session.streams if stream.type == 'audio')
except StopIteration:
return
player = WavePlayer(audio_stream.mixer, '', pause_time=1, initial_delay=1, volume=50)
audio_stream.bridge.add(player)
try:
if welcome_prompt:
- file = ResourcePath('sounds/co_welcome_conference.wav').normalized
+ file = Resources.get('sounds/co_welcome_conference.wav')
self.play_file_in_player(player, file, 1)
user_count = len(set(str(s.remote_identity.uri) for s in self.room.sessions if any(stream for stream in s.streams if stream.type == 'audio')) - set([str(self.session.remote_identity.uri)]))
if user_count == 0:
- file = ResourcePath('sounds/co_only_one.wav').normalized
+ file = Resources.get('sounds/co_only_one.wav')
self.play_file_in_player(player, file, 0.5)
elif user_count == 1:
- file = ResourcePath('sounds/co_there_is_one.wav').normalized
+ file = Resources.get('sounds/co_there_is_one.wav')
self.play_file_in_player(player, file, 0.5)
elif user_count < 100:
- file = ResourcePath('sounds/co_there_are.wav').normalized
+ file = Resources.get('sounds/co_there_are.wav')
self.play_file_in_player(player, file, 0.2)
if user_count <= 24:
- file = ResourcePath('sounds/bi_%d.wav' % user_count).normalized
+ file = Resources.get('sounds/bi_%d.wav' % user_count)
self.play_file_in_player(player, file, 0.1)
else:
- file = ResourcePath('sounds/bi_%d0.wav' % (user_count / 10)).normalized
+ file = Resources.get('sounds/bi_%d0.wav' % (user_count / 10))
self.play_file_in_player(player, file, 0.1)
- file = ResourcePath('sounds/bi_%d.wav' % (user_count % 10)).normalized
+ file = Resources.get('sounds/bi_%d.wav' % (user_count % 10))
self.play_file_in_player(player, file, 0.1)
- file = ResourcePath('sounds/co_more_participants.wav').normalized
+ file = Resources.get('sounds/co_more_participants.wav')
self.play_file_in_player(player, file, 0)
- file = ResourcePath('sounds/connected_tone.wav').normalized
+ file = Resources.get('sounds/connected_tone.wav')
self.play_file_in_player(player, file, 0.1)
except proc.ProcExit:
# No need to remove the bridge from the stream, it's done automatically
pass
else:
audio_stream.bridge.remove(player)
self.room.audio_conference.add(audio_stream)
self.room.audio_conference.unhold()
finally:
player.stop()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPSessionWillEnd(self, notification):
self.proc.kill()
class IRCBot(irc.IRCClient):
nickname = 'SylkServer'
def __init__(self):
self._nick_collector = []
self.nicks = []
def connectionMade(self):
irc.IRCClient.connectionMade(self)
log.msg('Connection to IRC has been established')
NotificationCenter().post_notification('IRCBotGotConnected', self.factory, NotificationData(protocol=self))
def connectionLost(self, failure):
irc.IRCClient.connectionLost(self, failure)
NotificationCenter().post_notification('IRCBotGotDisconnected', self.factory, NotificationData())
def signedOn(self):
log.msg('Logging into %s channel...' % self.factory.channel)
self.join(self.factory.channel)
def kickedFrom(self, channel, kicker, message):
log.msg('Got kicked from %s by %s: %s. Rejoining...' % (channel, kicker, message))
self.join(self.factory.channel)
def joined(self, channel):
log.msg('Logged into %s channel' % channel)
NotificationCenter().post_notification('IRCBotJoinedChannel', self.factory, NotificationData(channel=self.factory.channel))
def privmsg(self, user, channel, message):
if channel == '*':
return
username = user.split('!', 1)[0]
if username == self.nickname:
return
if channel == self.nickname:
self.msg(username, "Sorry, I don't support private messages, I'm a bot.")
return
uri = SIPURI.parse('sip:%s@%s' % (urllib.quote(username), self.factory.config.server[0]))
irc_message = IRCMessage(username, uri, message.decode('utf-8'))
data = NotificationData(message=irc_message)
NotificationCenter().post_notification('IRCBotGotMessage', self.factory, data)
def send_message(self, message):
self.say(self.factory.channel, message)
def get_participants(self):
self.sendLine("NAMES #%s" % self.factory.channel)
def got_participants(self, nicks):
data = NotificationData(participants=nicks)
NotificationCenter().post_notification('IRCBotGotParticipantsList', self.factory, data)
def irc_RPL_NAMREPLY(self, prefix, params):
"""Collect usernames from this channel. Several of these
messages may be sent to cover the channel's full nicklist.
An RPL_ENDOFNAMES signals the end of the list.
"""
# We just separate these into individual nicks and stuff them in
# the nickCollector, transferred to 'nicks' when we get the RPL_ENDOFNAMES.
for name in params[3].split():
# Remove operator and voice prefixes
if name[0] in '@+':
name = name[1:]
if name != self.nickname:
self._nick_collector.append(name)
def irc_RPL_ENDOFNAMES(self, prefix, params):
"""This is sent after zero or more RPL_NAMREPLY commands to
terminate the list of users in a channel.
"""
self.nicks = self._nick_collector
self._nick_collector = []
self.got_participants(self.nicks)
def userJoined(self, user, channel):
if channel.strip('#') == self.factory.channel:
data = NotificationData(user=user)
NotificationCenter().post_notification('IRCBotUserJoined', self.factory, data)
def userLeft(self, user, channel):
if channel.strip('#') == self.factory.channel:
data = NotificationData(user=user)
NotificationCenter().post_notification('IRCBotUserLeft', self.factory, data)
def userQuit(self, user, reason):
data = NotificationData(user=user, reason=reason)
NotificationCenter().post_notification('IRCBotUserQuit', self.factory, data)
def userKicked(self, kickee, channel, kicker, message):
if channel.strip('#') == self.factory.channel:
data = NotificationData(kickee=kickee, kicker=kicker, reason=message)
NotificationCenter().post_notification('IRCBotUserKicked', self.factory, data)
def userRenamed(self, oldname, newname):
data = NotificationData(oldname=oldname, newname=newname)
NotificationCenter().post_notification('IRCBotUserRenamed', self.factory, data)
def action(self, user, channel, data):
if channel.strip('#') == self.factory.channel:
username = user.split('!', 1)[0]
data = NotificationData(user=username, action=data)
NotificationCenter().post_notification('IRCBotUserAction', self.factory, data)
class IRCBotFactory(protocol.ClientFactory):
protocol = IRCBot
def __init__(self, config):
self.config = config
self.channel = config.channel
self.stop_requested = False
def clientConnectionLost(self, connector, failure):
log.msg('Disconnected from IRC: %s' % failure.getErrorMessage())
if not self.stop_requested:
log.msg('Reconnecting...')
connector.connect()
def clientConnectionFailed(self, connector, failure):
log.error('Connection to IRC server failed: %s' % failure.getErrorMessage())
diff --git a/sylk/applications/playback/configuration.py b/sylk/applications/playback/configuration.py
index 16dc0c4..0b96c64 100644
--- a/sylk/applications/playback/configuration.py
+++ b/sylk/applications/playback/configuration.py
@@ -1,47 +1,48 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details.
#
__all__ = ['get_config']
import os
from application.configuration import ConfigFile, ConfigSection, ConfigSetting
-from sylk.configuration.datatypes import Path, ResourcePath
+from sylk.configuration.datatypes import Path
+from sylk.resources import Resources
class GeneralConfig(ConfigSection):
__cfgfile__ = 'playback.ini'
__section__ = 'Playback'
- files_dir = ConfigSetting(type=Path, value=ResourcePath('sounds/playback').normalized)
+ files_dir = ConfigSetting(type=Path, value=Path(Resources.get('sounds/playback')))
enable_video = False
answer_delay = 1
class PlaybackConfig(ConfigSection):
__cfgfile__ = 'playback.ini'
file = ConfigSetting(type=Path, value=None)
enable_video = GeneralConfig.enable_video
answer_delay = GeneralConfig.answer_delay
class Configuration(object):
def __init__(self, data):
self.__dict__.update(data)
def get_config(uri):
config_file = ConfigFile(PlaybackConfig.__cfgfile__)
GeneralConfig.read(cfgfile=config_file)
section = config_file.get_section(uri)
if section is not None:
PlaybackConfig.read(section=uri)
if not os.path.isabs(PlaybackConfig.file):
PlaybackConfig.file = os.path.join(GeneralConfig.files_dir, PlaybackConfig.file)
config = Configuration(dict(PlaybackConfig))
PlaybackConfig.reset()
return config
return None
diff --git a/sylk/configuration/__init__.py b/sylk/configuration/__init__.py
index 737b6e7..02ec556 100644
--- a/sylk/configuration/__init__.py
+++ b/sylk/configuration/__init__.py
@@ -1,78 +1,78 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details.
#
from application.configuration import ConfigSection, ConfigSetting
from application.configuration.datatypes import NetworkRangeList, StringList
from application.system import host
from sipsimple.configuration.datatypes import NonNegativeInteger, SampleRate
from sylk import configuration_filename
-from sylk.configuration.datatypes import AudioCodecs, IPAddress, NillablePath, Path, Port, PortRange, SIPProxyAddress, SRTPEncryption
+from sylk.configuration.datatypes import AudioCodecs, IPAddress, Path, Port, PortRange, SIPProxyAddress, SRTPEncryption
+from sylk.resources import Resources
from sylk.tls import Certificate, PrivateKey
class ServerConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'Server'
- ca_file = ConfigSetting(type=NillablePath, value=NillablePath('tls/ca.crt'))
- certificate = ConfigSetting(type=NillablePath, value=NillablePath('tls/default.crt'))
+ ca_file = ConfigSetting(type=Path, value=Path(Resources.get('tls/ca.crt')))
+ certificate = ConfigSetting(type=Path, value=Path(Resources.get('tls/default.crt')))
verify_server = False
enable_bonjour = False
default_application = 'conference'
application_map = ConfigSetting(type=StringList, value=['echo:echo'])
disabled_applications = ConfigSetting(type=StringList, value='')
- extra_applications_dir = ConfigSetting(type=NillablePath, value=None)
- resources_dir = ConfigSetting(type=Path, value=None)
+ extra_applications_dir = ConfigSetting(type=Path, value=None)
trace_dir = ConfigSetting(type=Path, value=Path('var/log/sylkserver'))
trace_core = False
trace_sip = False
trace_msrp = False
trace_notifications = False
class SIPConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'SIP'
local_ip = ConfigSetting(type=IPAddress, value=IPAddress(host.default_ip))
local_udp_port = ConfigSetting(type=Port, value=5060)
local_tcp_port = ConfigSetting(type=Port, value=5060)
local_tls_port = ConfigSetting(type=Port, value=5061)
advertised_ip = ConfigSetting(type=IPAddress, value=None)
outbound_proxy = ConfigSetting(type=SIPProxyAddress, value=None)
trusted_peers = ConfigSetting(type=NetworkRangeList, value=NetworkRangeList('any'))
enable_ice = False
class MSRPConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'MSRP'
use_tls = True
class RTPConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'RTP'
audio_codecs = ConfigSetting(type=AudioCodecs, value=['opus', 'speex', 'G722', 'PCMA', 'PCMU'])
port_range = ConfigSetting(type=PortRange, value=PortRange('50000:50500'))
srtp_encryption = ConfigSetting(type=SRTPEncryption, value='opportunistic')
timeout = ConfigSetting(type=NonNegativeInteger, value=30)
sample_rate = ConfigSetting(type=SampleRate, value=32000)
zrtp_cache_dir = ConfigSetting(type=Path, value=Path('var/spool/sylkserver'))
class ThorNodeConfig(ConfigSection):
__cfgfile__ = configuration_filename
__section__ = 'ThorNetwork'
enabled = False
domain = "sipthor.net"
multiply = 1000
certificate = ConfigSetting(type=Certificate, value=None)
private_key = ConfigSetting(type=PrivateKey, value=None)
ca = ConfigSetting(type=Certificate, value=None)
diff --git a/sylk/configuration/datatypes.py b/sylk/configuration/datatypes.py
index 480df86..9204844 100644
--- a/sylk/configuration/datatypes.py
+++ b/sylk/configuration/datatypes.py
@@ -1,239 +1,179 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details.
#
import os
import re
import socket
import sys
import urllib
import urlparse
from application.python.descriptor import classproperty
from application.system import host
from sipsimple.configuration.datatypes import AudioCodecList, Hostname, SIPTransport
class AudioCodecs(list):
def __new__(cls, value):
if isinstance(value, (tuple, list)):
return [str(x) for x in value if x in AudioCodecList.available_values] or None
elif isinstance(value, basestring):
if value.lower() in ('none', ''):
return None
return [x for x in re.split(r'\s*,\s*', value) if x in AudioCodecList.available_values] or None
else:
raise TypeError("value must be a string, list or tuple")
class IPAddress(str):
"""An IP address in quad dotted number notation"""
def __new__(cls, value):
try:
socket.inet_aton(value)
except socket.error:
raise ValueError("invalid IP address: %r" % value)
except TypeError:
raise TypeError("value must be a string")
return str.__new__(cls, value)
@property
def normalized(self):
if self == '0.0.0.0':
return host.default_ip or '127.0.0.1'
return str(self)
-class ResourcePath(object):
- def __init__(self, path):
- self.path = os.path.normpath(str(path))
-
- def __getstate__(self):
- return unicode(self.path)
-
- def __setstate__(self, state):
- self.__init__(state)
-
- @property
- def normalized(self):
- path = os.path.expanduser(self.path)
- if os.path.isabs(path):
- return os.path.realpath(path)
- return os.path.realpath(os.path.join(self.resources_directory, path))
-
- @classproperty
- def resources_directory(cls):
- from sylk.configuration import ServerConfig
- if ServerConfig.resources_dir is not None:
- return os.path.realpath(ServerConfig.resources_dir)
- else:
- binary_directory = os.path.dirname(os.path.realpath(sys.argv[0]))
- if os.path.basename(binary_directory) == 'bin':
- application_directory = os.path.dirname(binary_directory)
- resources_component = 'share/sylkserver'
- else:
- application_directory = binary_directory
- resources_component = 'resources'
- return os.path.realpath(os.path.join(application_directory, resources_component))
-
- def __eq__(self, other):
- try:
- return self.path == other.path
- except AttributeError:
- return False
-
- def __hash__(self):
- return hash(self.path)
-
- def __repr__(self):
- return '%s(%r)' % (self.__class__.__name__, self.path)
-
- def __unicode__(self):
- return unicode(self.path)
-
-
class Port(int):
def __new__(cls, value):
try:
value = int(value)
except ValueError:
return None
if not (0 <= value <= 65535):
raise ValueError("illegal port value: %s" % value)
return value
class PortRange(object):
"""A port range in the form start:end with start and end being even numbers in the [1024, 65536] range"""
def __init__(self, value):
self.start, self.end = [int(p) for p in value.split(':', 1)]
allowed = xrange(1024, 65537, 2)
if not (self.start in allowed and self.end in allowed and self.start < self.end):
raise ValueError("bad range: %r: ports must be even numbers in the range [1024, 65536] with start < end" % value)
class SIPProxyAddress(object):
_description_re = re.compile(r"^(?P<host>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|([a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+)*))(:(?P<port>\d+))?(;transport=(?P<transport>.+))?$")
def __new__(cls, description):
if not description:
return None
if not cls._description_re.match(description):
raise ValueError("illegal SIP proxy address: %s" % description)
return super(SIPProxyAddress, cls).__new__(cls)
def __init__(self, description):
match = self.__class__._description_re.match(description)
data = match.groupdict()
host = data.get('host')
port = data.get('port', None) or 5060
transport = data.get('transport', None) or 'udp'
self.host = Hostname(host)
self.port = Port(port)
if self.port == 0:
raise ValueError("illegal port value: 0")
self.transport = SIPTransport(transport)
def __getstate__(self):
return unicode(self)
def __setstate__(self, state):
if not self.__class__._description_re.match(state):
raise ValueError("illegal SIP proxy address: %s" % state)
self.__init__(state)
def __eq__(self, other):
try:
return (self.host, self.port, self.transport) == (other.host, other.port, other.transport)
except AttributeError:
return False
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self.host, self.port, self.transport))
def __unicode__(self):
return u'%s:%d;transport=%s' % (self.host, self.port, self.transport)
-class NillablePath(unicode):
- def __new__(cls, path):
- path = os.path.normpath(path)
- if not os.path.exists(path):
- return None
- return unicode.__new__(cls, path)
-
- @property
- def normalized(self):
- return os.path.expanduser(self)
-
-
class Path(unicode):
def __new__(cls, path):
path = os.path.normpath(path)
return unicode.__new__(cls, path)
@property
def normalized(self):
return os.path.expanduser(self)
class URL(object):
"""A class describing an URL and providing access to its elements"""
def __init__(self, url):
scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
if netloc:
if "@" in netloc:
userinfo, hostport = netloc.split("@", 1)
if ":" in userinfo:
username, password = userinfo.split(":", 1)
else:
username, password = userinfo, None
else:
username = password = None
hostport = netloc
if ':' in hostport:
host, port = hostport.split(':', 1)
else:
host, port = hostport, None
else:
username = password = host = port = None
self.original_url = url
self.scheme = scheme
self.username = username
self.password = password
self.host = host
self.port = int(port) if port is not None else None
self.path = urllib.url2pathname(path)
self.query_items = dict(urlparse.parse_qsl(query))
self.fragment = fragment
def __str__(self):
return urlparse.urlunsplit((self.scheme, self.netloc, urllib.pathname2url(self.path), self.query, self.fragment))
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.__str__())
url = property(__str__)
@property
def query(self):
return urllib.urlencode(self.query_items)
@property
def netloc(self):
authinfo = ':'.join(str(x) for x in (self.username, self.password) if x is not None) or None
hostport = ':'.join(str(x) for x in (self.host or '', self.port) if x is not None)
return '@'.join(x for x in (authinfo, hostport) if x is not None)
class SRTPEncryption(str):
available_values = ('opportunistic', 'sdes', 'zrtp', 'disabled')
def __new__(cls, value):
value = str(value)
if value not in cls.available_values:
raise ValueError("illegal value for SRTP encryption: %s" % value)
return value
diff --git a/sylk/configuration/settings.py b/sylk/configuration/settings.py
index 4c4a3c2..e249b84 100644
--- a/sylk/configuration/settings.py
+++ b/sylk/configuration/settings.py
@@ -1,155 +1,166 @@
# Copyright (C) 2010-2011 AG Projects. See LICENSE for details.
#
"""
SIP SIMPLE SDK settings extensions.
"""
__all__ = ['AccountExtension', 'BonjourAccountExtension', 'SylkServerSettingsExtension']
+import os
+
+from application import log
from sipsimple.account import MSRPSettings as AccountMSRPSettings, NATTraversalSettings as AccountNATTraversalSettings
from sipsimple.account import RTPSettings as AccountRTPSettings, SIPSettings as AccountSIPSettings, TLSSettings as AccountTLSSettings, SRTPEncryptionSettings as AccountSRTPEncryptionSettings
from sipsimple.account import MessageSummarySettings as AccountMessageSummarySettings, PresenceSettings as AccountPresenceSettingss, XCAPSettings as AccountXCAPSettings
from sipsimple.configuration import CorrelatedSetting, Setting, SettingsObjectExtension
from sipsimple.configuration.datatypes import MSRPConnectionModel, MSRPTransport, NonNegativeInteger, PortRange, SampleRate, SIPTransportList, SRTPKeyNegotiation
from sipsimple.configuration.settings import AudioSettings, EchoCancellerSettings, LogsSettings, RTPSettings, SIPSettings, TLSSettings
from sylk import __version__ as server_version
from sylk.configuration import ServerConfig, SIPConfig, MSRPConfig, RTPConfig
-from sylk.configuration.datatypes import AudioCodecs, NillablePath, Path, Port, SIPProxyAddress
+from sylk.configuration.datatypes import AudioCodecs, Path, Port, SIPProxyAddress
# Account settings extensions
class AccountMessageSummarySettingsExtension(AccountMessageSummarySettings):
enabled = Setting(type=bool, default=False)
class AccountMSRPSettingsExtension(AccountMSRPSettings):
transport = Setting(type=MSRPTransport, default='tls' if MSRPConfig.use_tls else 'tcp')
connection_model = Setting(type=MSRPConnectionModel, default='relay' if ServerConfig.enable_bonjour else 'acm')
class AccountNATTraversalSettingsExtension(AccountNATTraversalSettings):
use_ice = Setting(type=bool, default=SIPConfig.enable_ice)
use_msrp_relay_for_outbound = Setting(type=bool, default=False)
class AccountPresenceSettingssExtension(AccountPresenceSettingss):
enabled = Setting(type=bool, default=False)
if RTPConfig.srtp_encryption == 'disabled':
# doesn't matter because it's disabled
srtp_key_negotiation = 'opportunistic'
elif RTPConfig.srtp_encryption == 'sdes':
srtp_key_negotiation = 'sdes_optional'
else:
srtp_key_negotiation = RTPConfig.srtp_encryption
class AccountSRTPEncryptionSettingsExtension(AccountSRTPEncryptionSettings):
enabled = Setting(type=bool, default=RTPConfig.srtp_encryption!='disabled')
key_negotiation = Setting(type=SRTPKeyNegotiation, default=srtp_key_negotiation)
class AccountRTPSettingsExtension(AccountRTPSettings):
audio_codec_list = Setting(type=AudioCodecs, default=None, nillable=True)
encryption = AccountSRTPEncryptionSettingsExtension
class AccountSIPSettingsExtension(AccountSIPSettings):
register = Setting(type=bool, default=False)
outbound_proxy = Setting(type=SIPProxyAddress, default=SIPConfig.outbound_proxy, nillable=True)
+account_cert = ServerConfig.certificate
+if account_cert is not None and not os.path.isfile(account_cert):
+ account_cert = None
+
class AccountTLSSettingsExtension(AccountTLSSettings):
- certificate = Setting(type=NillablePath, default=ServerConfig.certificate, nillable=True)
+ certificate = Setting(type=Path, default=account_cert, nillable=True)
verify_server = Setting(type=bool, default=ServerConfig.verify_server)
class AccountXCAPSettingsExtension(AccountXCAPSettings):
enabled = Setting(type=bool, default=False)
class AccountExtension(SettingsObjectExtension):
enabled = Setting(type=bool, default=True)
message_summary = AccountMessageSummarySettingsExtension
msrp = AccountMSRPSettingsExtension
nat_traversal = AccountNATTraversalSettingsExtension
presence = AccountPresenceSettingssExtension
rtp = AccountRTPSettingsExtension
sip = AccountSIPSettingsExtension
tls = AccountTLSSettingsExtension
xcap = AccountXCAPSettingsExtension
class BonjourAccountExtension(SettingsObjectExtension):
enabled = Setting(type=bool, default=False)
# General settings extensions
class EchoCancellerSettingsExtension(EchoCancellerSettings):
enabled = Setting(type=bool, default=False)
tail_length = Setting(type=NonNegativeInteger, default=0)
class AudioSettingsExtension(AudioSettings):
input_device = Setting(type=str, default=None, nillable=True)
output_device = Setting(type=str, default=None, nillable=True)
sample_rate = Setting(type=SampleRate, default=RTPConfig.sample_rate)
echo_canceller = EchoCancellerSettings
class LogsSettingsExtension(LogsSettings):
directory = Setting(type=Path, default=ServerConfig.trace_dir)
trace_sip = Setting(type=bool, default=ServerConfig.trace_sip)
trace_msrp = Setting(type=bool, default=ServerConfig.trace_msrp)
trace_pjsip = Setting(type=bool, default=ServerConfig.trace_core)
trace_notifications = Setting(type=bool, default=ServerConfig.trace_notifications)
class RTPSettingsExtension(RTPSettings):
audio_codec_list = Setting(type=AudioCodecs, default=RTPConfig.audio_codecs)
port_range = Setting(type=PortRange, default=PortRange(RTPConfig.port_range.start, RTPConfig.port_range.end))
timeout = Setting(type=NonNegativeInteger, default=RTPConfig.timeout)
+ca_file = ServerConfig.ca_file
+if ca_file is not None and not os.path.isfile(ca_file):
+ ca_file = None
+
+class TLSSettingsExtension(TLSSettings):
+ ca_list = Setting(type=Path, default=ca_file, nillable=True)
+
+
def sip_port_validator(port, sibling_port):
if port == sibling_port != 0:
raise ValueError("the TCP and TLS ports must be different")
transport_list = []
if SIPConfig.local_udp_port is not None:
transport_list.append('udp')
if SIPConfig.local_tcp_port is not None:
transport_list.append('tcp')
-if SIPConfig.local_tls_port is not None:
+tls_port = SIPConfig.local_tls_port
+if tls_port is not None and None in (ca_file, account_cert):
+ log.warning('Cannot enable TLS because the CA or the certificate are not specified')
+ tls_port = None
+if tls_port is not None:
transport_list.append('tls')
-udp_port = SIPConfig.local_udp_port or 0
-tcp_port = SIPConfig.local_tcp_port or 0
-tls_port = SIPConfig.local_tls_port or 0
-
class SIPSettingsExtension(SIPSettings):
- udp_port = Setting(type=Port, default=udp_port)
- tcp_port = CorrelatedSetting(type=Port, sibling='tls_port', validator=sip_port_validator, default=tcp_port)
- tls_port = CorrelatedSetting(type=Port, sibling='tcp_port', validator=sip_port_validator, default=tls_port)
+ udp_port = Setting(type=Port, default=SIPConfig.local_udp_port, nillable=True)
+ tcp_port = CorrelatedSetting(type=Port, sibling='tls_port', validator=sip_port_validator, default=SIPConfig.local_tcp_port, nillable=True)
+ tls_port = CorrelatedSetting(type=Port, sibling='tcp_port', validator=sip_port_validator, default=tls_port, nillable=True)
transport_list = Setting(type=SIPTransportList, default=transport_list)
-class TLSSettingsExtension(TLSSettings):
- ca_list = Setting(type=NillablePath, default=ServerConfig.ca_file, nillable=True)
-
-
class SylkServerSettingsExtension(SettingsObjectExtension):
user_agent = Setting(type=str, default='SylkServer-%s' % server_version)
audio = AudioSettingsExtension
logs = LogsSettingsExtension
rtp = RTPSettingsExtension
sip = SIPSettingsExtension
tls = TLSSettingsExtension
diff --git a/sylk/resources.py b/sylk/resources.py
new file mode 100644
index 0000000..2dcdd23
--- /dev/null
+++ b/sylk/resources.py
@@ -0,0 +1,32 @@
+# Copyright (C) 2015 AG Projects. See LICENSE for details.
+#
+
+import os
+import sys
+
+from application.python.descriptor import classproperty
+
+
+class Resources(object):
+ """Provide access to SylkServer's resources"""
+
+ _cached_directory = None
+
+ @classproperty
+ def directory(cls):
+ if cls._cached_directory is None:
+ script = sys.argv[0]
+ binary_directory = os.path.dirname(os.path.realpath(script))
+ if os.path.basename(binary_directory) == 'bin':
+ application_directory = os.path.dirname(binary_directory)
+ resources_component = 'share/sylkserver'
+ else:
+ application_directory = binary_directory
+ resources_component = 'resources'
+ cls._cached_directory = os.path.join(application_directory, resources_component).decode(sys.getfilesystemencoding())
+ return cls._cached_directory
+
+ @classmethod
+ def get(cls, resource):
+ return os.path.join(cls.directory, resource or u'')
+

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 3:15 AM (13 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3406242
Default Alt Text
(136 KB)

Event Timeline