Page MenuHomePhabricator

No OneTemporary

diff --git a/sipsimple/streams/msrp/__init__.py b/sipsimple/streams/msrp/__init__.py
index 2e91f495..aed97f03 100644
--- a/sipsimple/streams/msrp/__init__.py
+++ b/sipsimple/streams/msrp/__init__.py
@@ -1,407 +1,408 @@
"""
Handling of MSRP media streams according to RFC4975, RFC4976, RFC5547 and RFC3994.
"""
__all__ = ['MSRPStreamError', 'MSRPStreamBase']
import traceback
from application.notification import NotificationCenter, NotificationData, IObserver
from application.python import Null
from application.system import host
from twisted.internet.error import ConnectionDone
from zope.interface import implementer
from eventlib import api
from msrplib.connect import DirectConnector, DirectAcceptor, RelayConnection, MSRPRelaySettings
from msrplib.protocol import URI
from msrplib.session import contains_mime_type
from sipsimple.account import Account, BonjourAccount
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SDPAttribute, SDPConnection, SDPMediaStream
from sipsimple.streams import IMediaStream, MediaStreamType, StreamError
from sipsimple.threading.green import run_in_green_thread
class MSRPStreamError(StreamError):
pass
@implementer(IMediaStream, IObserver)
class MSRPStreamBase(object, metaclass=MediaStreamType):
# Attributes that need to be defined by each MSRP stream type
type = None
priority = None
msrp_session_class = None
media_type = None
accept_types = None
accept_wrapped_types = None
# These attributes are always False for any MSRP stream
hold_supported = False
on_hold = False
on_hold_by_local = False
on_hold_by_remote = False
def __new__(cls, *args, **kw):
if cls is MSRPStreamBase:
raise TypeError("MSRPStreamBase cannot be instantiated directly")
return object.__new__(cls)
def __init__(self, direction='sendrecv'):
self.direction = direction
self.greenlet = None
self.local_media = None
self.remote_media = None
self.msrp = None # Placeholder for the MSRPTransport that will be set when started
self.msrp_connector = None
self.cpim_enabled = None # Boolean value. None means it was not negotiated yet
self.session = None
self.msrp_session = None
self.shutting_down = False
self.local_role = None
self.remote_role = None
self.transport = None
self.remote_accept_types = None
self.remote_accept_wrapped_types = None
self._initialize_done = False
self._done = False
self._failure_reason = None
@property
def local_uri(self):
msrp = self.msrp or self.msrp_connector
return msrp.local_uri if msrp is not None else None
def _create_local_media(self, uri_path):
transport = "TCP/TLS/MSRP" if uri_path[-1].use_tls else "TCP/MSRP"
attributes = []
path = " ".join(str(uri) for uri in uri_path)
attributes.append(SDPAttribute(b"path", path.encode()))
if self.direction not in [None, 'sendrecv']:
attributes.append(SDPAttribute(self.direction.encode(), b''))
if self.accept_types is not None:
a_types = " ".join(self.accept_types)
attributes.append(SDPAttribute(b"accept-types", a_types.encode()))
if self.accept_wrapped_types is not None:
a_w_types = " ".join(self.accept_wrapped_types)
attributes.append(SDPAttribute(b"accept-wrapped-types", a_w_types.encode()))
attributes.append(SDPAttribute(b"setup", self.local_role.encode() if self.local_role else None))
local_ip = uri_path[-1].host
connection = SDPConnection(local_ip.encode())
return SDPMediaStream(self.media_type.encode(), uri_path[-1].port or 2855, transport.encode(), connection=connection, formats=[b"*"], attributes=attributes)
# The public API (the IMediaStream interface)
# noinspection PyUnusedLocal
def get_local_media(self, remote_sdp=None, index=0):
return self.local_media
def new_from_sdp(self, session, remote_sdp, stream_index):
raise NotImplementedError
@run_in_green_thread
def initialize(self, session, direction):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=self)
try:
self.session = session
self.transport = self.session.account.msrp.transport
outgoing = direction == 'outgoing'
logger = NotificationProxyLogger()
if self.session.account is BonjourAccount():
if outgoing:
self.msrp_connector = DirectConnector(logger=logger)
self.local_role = 'active'
else:
if self.transport == 'tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key):
raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate")
self.msrp_connector = DirectAcceptor(logger=logger)
self.local_role = 'passive'
else:
if self.session.account.msrp.connection_model == 'relay':
if not outgoing and self.remote_role in ('actpass', 'passive'):
# 'passive' not allowed by the RFC but play nice for interoperability. -Saul
self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True)
self.local_role = 'active'
elif outgoing and not self.session.account.nat_traversal.use_msrp_relay_for_outbound:
self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True)
self.local_role = 'active'
else:
if self.session.account.nat_traversal.msrp_relay is None:
relay_host = relay_port = None
else:
if self.transport != self.session.account.nat_traversal.msrp_relay.transport:
raise MSRPStreamError("MSRP relay transport conflicts with MSRP transport setting")
relay_host = self.session.account.nat_traversal.msrp_relay.host
relay_port = self.session.account.nat_traversal.msrp_relay.port
relay = MSRPRelaySettings(domain=self.session.account.uri.host,
username=self.session.account.uri.user,
password=self.session.account.credentials.password,
host=relay_host,
port=relay_port,
use_tls=self.transport=='tls')
self.msrp_connector = RelayConnection(relay, 'passive', logger=logger, use_sessmatch=True)
self.local_role = 'actpass' if outgoing else 'passive'
else:
if not outgoing and self.remote_role in ('actpass', 'passive'):
# 'passive' not allowed by the RFC but play nice for interoperability. -Saul
self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True)
self.local_role = 'active'
else:
if not outgoing and self.transport == 'tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key):
raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate")
self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True)
self.local_role = 'actpass' if outgoing else 'passive'
full_local_path = self.msrp_connector.prepare(local_uri=URI(host=host.default_ip, port=0, use_tls=self.transport=='tls', credentials=self.session.account.tls_credentials))
self.local_media = self._create_local_media(full_local_path)
except Exception as e:
traceback.print_exc()
notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=str(e)))
else:
notification_center.post_notification('MediaStreamDidInitialize', sender=self)
finally:
self._initialize_done = True
self.greenlet = None
# noinspection PyUnusedLocal
@run_in_green_thread
def start(self, local_sdp, remote_sdp, stream_index):
self.greenlet = api.getcurrent()
notification_center = NotificationCenter()
context = 'sdp_negotiation'
try:
remote_media = remote_sdp.media[stream_index]
self.remote_media = remote_media
- self.remote_accept_types = remote_media.attributes.getfirst('accept-types', '').split()
- self.remote_accept_wrapped_types = remote_media.attributes.getfirst('accept-wrapped-types', '').split()
+ self.remote_accept_types = remote_media.attributes.getfirst(b'accept-types', b'').decode().split()
+ self.remote_accept_wrapped_types = remote_media.attributes.getfirst(b'accept-wrapped-types', b'').decode().split()
self.cpim_enabled = contains_mime_type(self.accept_types, 'message/cpim') and contains_mime_type(self.remote_accept_types, 'message/cpim')
- remote_uri_path = remote_media.attributes.getfirst('path')
+ remote_uri_path = remote_media.attributes.getfirst(b'path')
if remote_uri_path is None:
raise AttributeError("remote SDP media does not have 'path' attribute")
- full_remote_path = [URI.parse(uri) for uri in remote_uri_path.split()]
+ full_remote_path = [URI.parse(uri) for uri in remote_uri_path.decode().split()]
remote_transport = 'tls' if full_remote_path[0].use_tls else 'tcp'
if self.transport != remote_transport:
raise MSRPStreamError("remote transport ('%s') different from local transport ('%s')" % (remote_transport, self.transport))
if isinstance(self.session.account, Account) and self.local_role == 'actpass':
remote_setup = remote_media.attributes.getfirst('setup', 'passive')
+ remote_setup = remote_setup.decode() if remote_setup else None
if remote_setup == 'passive':
# If actpass is offered connectors are always started as passive
# We need to switch to active if the remote answers with passive
if self.session.account.msrp.connection_model == 'relay':
self.msrp_connector.mode = 'active'
else:
local_uri = self.msrp_connector.local_uri
logger = self.msrp_connector.logger
self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True)
self.msrp_connector.prepare(local_uri)
context = 'start'
self.msrp = self.msrp_connector.complete(full_remote_path)
if self.msrp_session_class is not None:
self.msrp_session = self.msrp_session_class(self.msrp, accept_types=self.accept_types, on_incoming_cb=self._handle_incoming, automatic_reports=False)
self.msrp_connector = None
except Exception as e:
self._failure_reason = str(e)
notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context=context, reason=self._failure_reason))
else:
notification_center.post_notification('MediaStreamDidStart', sender=self)
finally:
self.greenlet = None
def deactivate(self):
self.shutting_down = True
@run_in_green_thread
def end(self):
if self._done:
return
self._done = True
notification_center = NotificationCenter()
if not self._initialize_done:
# we are in the middle of initialize()
try:
msrp_connector = self.msrp_connector
if self.greenlet is not None:
api.kill(self.greenlet)
if msrp_connector is not None:
msrp_connector.cleanup()
finally:
notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason='Interrupted'))
notification_center.remove_observer(self, sender=self)
self.msrp_connector = None
self.greenlet = None
else:
notification_center.post_notification('MediaStreamWillEnd', sender=self)
msrp = self.msrp
msrp_session = self.msrp_session
msrp_connector = self.msrp_connector
try:
if self.greenlet is not None:
api.kill(self.greenlet)
if msrp_session is not None:
msrp_session.shutdown()
elif msrp is not None:
msrp.loseConnection(wait=False)
if msrp_connector is not None:
msrp_connector.cleanup()
finally:
notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason))
notification_center.remove_observer(self, sender=self)
self.msrp = None
self.msrp_session = None
self.msrp_connector = None
self.session = None
self.greenlet = None
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def validate_update(self, remote_sdp, stream_index):
return True # TODO
def update(self, local_sdp, remote_sdp, stream_index):
pass # TODO
def hold(self):
pass
def unhold(self):
pass
def reset(self, stream_index):
pass
# Internal IObserver interface
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
# Internal message handlers
def _handle_incoming(self, chunk=None, error=None):
notification_center = NotificationCenter()
if error is not None:
if self.shutting_down and isinstance(error.value, ConnectionDone):
return
self._failure_reason = error.getErrorMessage()
notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason))
elif chunk is not None:
method_handler = getattr(self, '_handle_%s' % chunk.method, None)
if method_handler is not None:
method_handler(chunk)
def _handle_REPORT(self, chunk):
pass
def _handle_SEND(self, chunk):
pass
# temporary solution. to be replaced later by a better logging system in msrplib -Dan
#
class ChunkInfo(object):
__slots__ = 'content_type', 'header', 'footer', 'data'
def __init__(self, content_type, header='', footer='', data=''):
self.content_type = content_type
self.header = header
self.footer = footer
self.data = data
def __repr__(self):
return "{0.__class__.__name__}(content_type={0.content_type!r}, header={0.header!r}, footer={0.footer!r}, data={0.data!r})".format(self)
@property
def content(self):
return self.header + self.data + self.footer
@property
def normalized_content(self):
if not self.data:
return self.header + self.footer
elif self.content_type == 'message/cpim':
headers, sep, body = self.data.partition('\r\n\r\n')
if not sep:
return self.header + self.data + self.footer
mime_headers, mime_sep, mime_body = body.partition('\n\n')
if not mime_sep:
return self.header + self.data + self.footer
for mime_header in mime_headers.lower().splitlines():
if mime_header.startswith('content-type:'):
wrapped_content_type = mime_header[13:].partition(';')[0].strip()
break
else:
wrapped_content_type = None
if wrapped_content_type is None or wrapped_content_type == 'application/im-iscomposing+xml' or wrapped_content_type.startswith(('text/', 'message/')):
data = self.data
else:
data = headers + sep + mime_headers + mime_sep + '<<<stripped data>>>'
return self.header + data + self.footer
elif self.content_type is None or self.content_type == 'application/im-iscomposing+xml' or self.content_type.startswith(('text/', 'message/')):
return self.header + self.data + self.footer
else:
return self.header + '<<<stripped data>>>' + self.footer
class NotificationProxyLogger(object):
def __init__(self):
from application import log
self.level = log.level
self.notification_center = NotificationCenter()
self.log_settings = SIPSimpleSettings().logs
def received_chunk(self, data, transport):
if self.log_settings.trace_msrp:
chunk_info = ChunkInfo(data.content_type, header=data.chunk_header, footer=data.chunk_footer, data=data.data)
notification_data = NotificationData(direction='incoming', local_address=transport.getHost(), remote_address=transport.getPeer(), data=chunk_info.normalized_content, illegal=False)
self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data)
def sent_chunk(self, data, transport):
if self.log_settings.trace_msrp:
chunk_info = ChunkInfo(data.content_type, header=data.encoded_header, footer=data.encoded_footer, data=data.data)
notification_data = NotificationData(direction='outgoing', local_address=transport.getHost(), remote_address=transport.getPeer(), data=chunk_info.normalized_content, illegal=False)
self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data)
def received_illegal_data(self, data, transport):
if self.log_settings.trace_msrp:
notification_data = NotificationData(direction='incoming', local_address=transport.getHost(), remote_address=transport.getPeer(), data=data, illegal=True)
self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data)
def debug(self, message, *args, **kw):
pass
def info(self, message, *args, **kw):
if self.log_settings.trace_msrp:
self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.INFO))
def warning(self, message, *args, **kw):
if self.log_settings.trace_msrp:
self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.WARNING))
warn = warning
def error(self, message, *args, **kw):
if self.log_settings.trace_msrp:
self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.ERROR))
def exception(self, message='', *args, **kw):
if self.log_settings.trace_msrp:
message = message % args if args else message
exception = traceback.format_exc()
self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message + '\n' + exception if message else exception, level=self.level.ERROR))
def critical(self, message, *args, **kw):
if self.log_settings.trace_msrp:
self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.CRITICAL))
fatal = critical
from sipsimple.streams.msrp import chat, filetransfer, screensharing
diff --git a/sipsimple/streams/msrp/chat.py b/sipsimple/streams/msrp/chat.py
index 36d33cc6..437b2a49 100644
--- a/sipsimple/streams/msrp/chat.py
+++ b/sipsimple/streams/msrp/chat.py
@@ -1,900 +1,900 @@
"""
This module provides classes to parse and generate SDP related to SIP sessions that negotiate Instant Messaging, including CPIM as defined in RFC3862
"""
import pickle as pickle
import codecs
import os
import random
import re
from application.python.descriptor import WriteOnceAttribute
from application.notification import IObserver, NotificationCenter, NotificationData
from application.python import Null
from application.python.types import Singleton
from application.system import openfile
from collections import defaultdict
from email.message import Message as EmailMessage
from email.parser import Parser as EmailParser
from eventlib.coros import queue
from eventlib.proc import spawn, ProcExit
from functools import partial
from msrplib.protocol import FailureReportHeader, SuccessReportHeader, UseNicknameHeader
from msrplib.session import MSRPSession, contains_mime_type
from otr import OTRSession, OTRTransport, OTRState, SMPStatus
from otr.cryptography import DSAPrivateKey
from otr.exceptions import IgnoreMessage, UnencryptedMessage, EncryptedMessageError, OTRError
from zope.interface import implementer
from sipsimple.core import SIPURI, BaseSIPURI
from sipsimple.payloads import ParserError
from sipsimple.payloads.iscomposing import IsComposingDocument, State, LastActive, Refresh, ContentType
from sipsimple.storage import ISIPSimpleApplicationDataStorage
from sipsimple.streams import InvalidStreamError, UnknownStreamError
from sipsimple.streams.msrp import MSRPStreamError, MSRPStreamBase
from sipsimple.threading import run_in_thread, run_in_twisted_thread
from sipsimple.threading.green import run_in_green_thread
from sipsimple.util import MultilingualText, ISOTimestamp
__all__ = ['ChatStream', 'ChatStreamError', 'ChatIdentity', 'CPIMPayload', 'CPIMHeader', 'CPIMNamespace', 'CPIMParserError', 'OTRState', 'SMPStatus']
class OTRTrustedPeer(object):
fingerprint = WriteOnceAttribute() # in order to be hashable this needs to be immutable
def __init__(self, fingerprint, description='', **kw):
if not isinstance(fingerprint, str):
raise TypeError("fingerprint must be a string")
self.fingerprint = fingerprint
self.description = description
self.__dict__.update(kw)
def __hash__(self):
return hash(self.fingerprint)
def __eq__(self, other):
if isinstance(other, OTRTrustedPeer):
return self.fingerprint == other.fingerprint
elif isinstance(other, str):
return self.fingerprint == other
else:
return NotImplemented
def __ne__(self, other):
return not (self == other)
def __repr__(self):
return "{0.__class__.__name__}({0.fingerprint!r}, description={0.description!r})".format(self)
def __reduce__(self):
return self.__class__, (self.fingerprint,), self.__dict__
class OTRTrustedPeerSet(object):
def __init__(self, iterable=()):
self.__data__ = {}
self.update(iterable)
def __repr__(self):
return "{}({})".format(self.__class__.__name__, list(self.__data__.values()))
def __contains__(self, item):
return item in self.__data__
def __getitem__(self, item):
return self.__data__[item]
def __iter__(self):
return iter(list(self.__data__.values()))
def __len__(self):
return len(self.__data__)
def get(self, item, default=None):
return self.__data__.get(item, default)
def add(self, item):
if not isinstance(item, OTRTrustedPeer):
raise TypeError("item should be and instance of OTRTrustedPeer")
self.__data__[item.fingerprint] = item
def remove(self, item):
del self.__data__[item]
def discard(self, item):
self.__data__.pop(item, None)
def update(self, iterable=()):
for item in iterable:
self.add(item)
class OTRCache(object, metaclass=Singleton):
def __init__(self):
from sipsimple.application import SIPApplication
if SIPApplication.storage is None:
raise RuntimeError("Cannot access the OTR cache before SIPApplication.storage is defined")
if ISIPSimpleApplicationDataStorage.providedBy(SIPApplication.storage):
self.key_file = os.path.join(SIPApplication.storage.directory, 'otr.key')
self.trusted_file = os.path.join(SIPApplication.storage.directory, 'otr.trusted')
try:
self.private_key = DSAPrivateKey.load(self.key_file)
if self.private_key.key_size != 1024:
raise ValueError
except (EnvironmentError, ValueError):
self.private_key = DSAPrivateKey.generate()
self.private_key.save(self.key_file)
try:
self.trusted_peers = pickle.load(open(self.trusted_file, 'rb'))
if not isinstance(self.trusted_peers, OTRTrustedPeerSet) or not all(isinstance(item, OTRTrustedPeer) for item in self.trusted_peers):
raise ValueError("invalid OTR trusted peers file")
except Exception:
self.trusted_peers = OTRTrustedPeerSet()
self.save()
else:
self.key_file = self.trusted_file = None
self.private_key = DSAPrivateKey.generate()
self.trusted_peers = OTRTrustedPeerSet()
# def generate_private_key(self):
# self.private_key = DSAPrivateKey.generate()
# if self.key_file:
# self.private_key.save(self.key_file)
@run_in_thread('file-io')
def save(self):
if self.trusted_file is not None:
with openfile(self.trusted_file, 'wb', permissions=0o600) as trusted_file:
pickle.dump(self.trusted_peers, trusted_file)
@implementer(IObserver)
class OTREncryption(object):
def __init__(self, stream):
self.stream = stream
self.otr_cache = OTRCache()
self.otr_session = OTRSession(self.otr_cache.private_key, self.stream, supported_versions={3}) # we need at least OTR-v3 for question based SMP
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=stream)
notification_center.add_observer(self, sender=self.otr_session)
@property
def active(self):
try:
return self.otr_session.encrypted
except AttributeError:
return False
@property
def cipher(self):
return 'AES-128-CTR' if self.active else None
@property
def key_fingerprint(self):
try:
return self.otr_session.local_private_key.public_key.fingerprint
except AttributeError:
return None
@property
def peer_fingerprint(self):
try:
return self.otr_session.remote_public_key.fingerprint
except AttributeError:
return None
@property
def peer_name(self):
try:
return self.__dict__['peer_name']
except KeyError:
trusted_peer = self.otr_cache.trusted_peers.get(self.peer_fingerprint, None)
if trusted_peer is None:
return ''
else:
return self.__dict__.setdefault('peer_name', trusted_peer.description)
@peer_name.setter
def peer_name(self, name):
old_name = self.peer_name
new_name = self.__dict__['peer_name'] = name
if old_name != new_name:
trusted_peer = self.otr_cache.trusted_peers.get(self.peer_fingerprint, None)
if trusted_peer is not None:
trusted_peer.description = new_name
self.otr_cache.save()
notification_center = NotificationCenter()
notification_center.post_notification("ChatStreamOTRPeerNameChanged", sender=self.stream, data=NotificationData(name=name))
@property
def verified(self):
return self.peer_fingerprint in self.otr_cache.trusted_peers
@verified.setter
def verified(self, value):
peer_fingerprint = self.peer_fingerprint
old_verified = peer_fingerprint in self.otr_cache.trusted_peers
new_verified = bool(value)
if peer_fingerprint is None or new_verified == old_verified:
return
if new_verified:
self.otr_cache.trusted_peers.add(OTRTrustedPeer(peer_fingerprint, description=self.peer_name))
else:
self.otr_cache.trusted_peers.remove(peer_fingerprint)
self.otr_cache.save()
notification_center = NotificationCenter()
notification_center.post_notification("ChatStreamOTRVerifiedStateChanged", sender=self.stream, data=NotificationData(verified=new_verified))
@run_in_twisted_thread
def start(self):
if self.otr_session is not None:
self.otr_session.start()
@run_in_twisted_thread
def stop(self):
if self.otr_session is not None:
self.otr_session.stop()
@run_in_twisted_thread
def smp_verify(self, secret, question=None):
self.otr_session.smp_verify(secret, question)
@run_in_twisted_thread
def smp_answer(self, secret):
self.otr_session.smp_answer(secret)
@run_in_twisted_thread
def smp_abort(self):
self.otr_session.smp_abort()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_MediaStreamDidStart(self, notification):
if self.stream.start_otr:
self.otr_session.start()
def _NH_MediaStreamDidEnd(self, notification):
notification.center.remove_observer(self, sender=self.stream)
notification.center.remove_observer(self, sender=self.otr_session)
self.otr_session.stop()
self.otr_session = None
self.stream = None
_NH_MediaStreamDidNotInitialize = _NH_MediaStreamDidEnd
def _NH_OTRSessionStateChanged(self, notification):
notification.center.post_notification('ChatStreamOTREncryptionStateChanged', sender=self.stream, data=notification.data)
def _NH_OTRSessionSMPVerificationDidStart(self, notification):
notification.center.post_notification('ChatStreamSMPVerificationDidStart', sender=self.stream, data=notification.data)
def _NH_OTRSessionSMPVerificationDidNotStart(self, notification):
notification.center.post_notification('ChatStreamSMPVerificationDidNotStart', sender=self.stream, data=notification.data)
def _NH_OTRSessionSMPVerificationDidEnd(self, notification):
notification.center.post_notification('ChatStreamSMPVerificationDidEnd', sender=self.stream, data=notification.data)
class ChatStreamError(MSRPStreamError): pass
class ChatStream(MSRPStreamBase):
type = 'chat'
priority = 1
msrp_session_class = MSRPSession
media_type = 'message'
accept_types = ['message/cpim', 'text/*', 'image/*', 'application/im-iscomposing+xml']
accept_wrapped_types = ['text/*', 'image/*', 'application/im-iscomposing+xml']
prefer_cpim = True
start_otr = True
def __init__(self):
super(ChatStream, self).__init__(direction='sendrecv')
self.message_queue = queue()
self.sent_messages = set()
self.incoming_queue = defaultdict(list)
self.message_queue_thread = None
self.encryption = OTREncryption(self)
@classmethod
def new_from_sdp(cls, session, remote_sdp, stream_index):
remote_stream = remote_sdp.media[stream_index]
- if remote_stream.media != 'message':
+ if remote_stream.media != b'message':
raise UnknownStreamError
expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport=='tls' else 'TCP/MSRP'
- if remote_stream.transport != expected_transport:
+ if remote_stream.transport != expected_transport.encode():
raise InvalidStreamError("expected %s transport in chat stream, got %s" % (expected_transport, remote_stream.transport))
- if remote_stream.formats != ['*']:
+ if remote_stream.formats != [b'*']:
raise InvalidStreamError("wrong format list specified")
stream = cls()
stream.remote_role = remote_stream.attributes.getfirst('setup', 'active')
- if remote_stream.direction != 'sendrecv':
+ if remote_stream.direction != b'sendrecv':
raise InvalidStreamError("Unsupported direction for chat stream: %s" % remote_stream.direction)
- remote_accept_types = remote_stream.attributes.getfirst('accept-types')
+ remote_accept_types = remote_stream.attributes.getfirst(b'accept-types')
if remote_accept_types is None:
raise InvalidStreamError("remote SDP media does not have 'accept-types' attribute")
- if not any(contains_mime_type(cls.accept_types, mime_type) for mime_type in remote_accept_types.split()):
+ if not any(contains_mime_type(cls.accept_types, mime_type) for mime_type in remote_accept_types.decode().split()):
raise InvalidStreamError("no compatible media types found")
return stream
@property
def local_identity(self):
try:
return ChatIdentity(self.session.local_identity.uri, self.session.local_identity.display_name)
except AttributeError:
return None
@property
def remote_identity(self):
try:
return ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name)
except AttributeError:
return None
@property
def private_messages_allowed(self):
return 'private-messages' in self.chatroom_capabilities
@property
def nickname_allowed(self):
return 'nickname' in self.chatroom_capabilities
@property
def chatroom_capabilities(self):
try:
if self.cpim_enabled and self.session.remote_focus:
return ' '.join(self.remote_media.attributes.getall('chatroom')).split()
except AttributeError:
pass
return []
def _NH_MediaStreamDidStart(self, notification):
self.message_queue_thread = spawn(self._message_queue_handler)
def _NH_MediaStreamDidNotInitialize(self, notification):
message_queue, self.message_queue = self.message_queue, queue()
while message_queue:
message = message_queue.wait()
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream was closed')
notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
def _NH_MediaStreamDidEnd(self, notification):
if self.message_queue_thread is not None:
self.message_queue_thread.kill()
else:
message_queue, self.message_queue = self.message_queue, queue()
while message_queue:
message = message_queue.wait()
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended')
notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
def _handle_REPORT(self, chunk):
# in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message.
if chunk.message_id in self.sent_messages:
self.sent_messages.remove(chunk.message_id)
notification_center = NotificationCenter()
data = NotificationData(message_id=chunk.message_id, message=chunk, code=chunk.status.code, reason=chunk.status.comment)
if chunk.status.code == 200:
notification_center.post_notification('ChatStreamDidDeliverMessage', sender=self, data=data)
else:
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
def _handle_SEND(self, chunk):
if chunk.size == 0: # keep-alive
self.msrp_session.send_report(chunk, 200, 'OK')
return
content_type = chunk.content_type.lower()
if not contains_mime_type(self.accept_types, content_type):
self.msrp_session.send_report(chunk, 413, 'Unwanted Message')
return
if chunk.contflag == '#':
self.incoming_queue.pop(chunk.message_id, None)
self.msrp_session.send_report(chunk, 200, 'OK')
return
elif chunk.contflag == '+':
self.incoming_queue[chunk.message_id].append(chunk.data)
self.msrp_session.send_report(chunk, 200, 'OK')
return
else:
data = ''.join(self.incoming_queue.pop(chunk.message_id, [])) + chunk.data
if content_type == 'message/cpim':
try:
payload = CPIMPayload.decode(data)
except CPIMParserError:
self.msrp_session.send_report(chunk, 400, 'CPIM Parser Error')
return
else:
message = Message(**{name: getattr(payload, name) for name in Message.__slots__})
if not contains_mime_type(self.accept_wrapped_types, message.content_type):
self.msrp_session.send_report(chunk, 413, 'Unwanted Message')
return
if message.timestamp is None:
message.timestamp = ISOTimestamp.now()
if message.sender is None:
message.sender = self.remote_identity
private = self.session.remote_focus and len(message.recipients) == 1 and message.recipients[0] != self.remote_identity
else:
payload = SimplePayload.decode(data, content_type)
message = Message(payload.content, payload.content_type, sender=self.remote_identity, recipients=[self.local_identity], timestamp=ISOTimestamp.now())
private = False
try:
message.content = self.encryption.otr_session.handle_input(message.content.encode(), message.content_type)
except IgnoreMessage:
self.msrp_session.send_report(chunk, 200, 'OK')
return
except UnencryptedMessage:
encrypted = False
encryption_active = True
except EncryptedMessageError as e:
self.msrp_session.send_report(chunk, 400, str(e))
notification_center = NotificationCenter()
notification_center.post_notification('ChatStreamOTRError', sender=self, data=NotificationData(error=str(e)))
return
except OTRError as e:
self.msrp_session.send_report(chunk, 200, 'OK')
notification_center = NotificationCenter()
notification_center.post_notification('ChatStreamOTRError', sender=self, data=NotificationData(error=str(e)))
return
else:
encrypted = encryption_active = self.encryption.active
if payload.charset is not None:
message.content = message.content.decode(payload.charset)
elif payload.content_type.startswith('text/'):
message.content.decode('utf8')
notification_center = NotificationCenter()
if message.content_type.lower() == IsComposingDocument.content_type:
try:
document = IsComposingDocument.parse(message.content)
except ParserError as e:
self.msrp_session.send_report(chunk, 400, str(e))
return
self.msrp_session.send_report(chunk, 200, 'OK')
data = NotificationData(state=document.state.value,
refresh=document.refresh.value if document.refresh is not None else 120,
content_type=document.content_type.value if document.content_type is not None else None,
last_active=document.last_active.value if document.last_active is not None else None,
sender=message.sender, recipients=message.recipients, private=private,
encrypted=encrypted, encryption_active=encryption_active)
notification_center.post_notification('ChatStreamGotComposingIndication', sender=self, data=data)
else:
self.msrp_session.send_report(chunk, 200, 'OK')
data = NotificationData(message=message, private=private, encrypted=encrypted, encryption_active=encryption_active)
notification_center.post_notification('ChatStreamGotMessage', sender=self, data=data)
def _on_transaction_response(self, message_id, response):
if message_id in self.sent_messages and response.code != 200:
self.sent_messages.remove(message_id)
data = NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment)
NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
def _on_nickname_transaction_response(self, message_id, response):
notification_center = NotificationCenter()
if response.code == 200:
notification_center.post_notification('ChatStreamDidSetNickname', sender=self, data=NotificationData(message_id=message_id, response=response))
else:
notification_center.post_notification('ChatStreamDidNotSetNickname', sender=self, data=NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment))
def _message_queue_handler(self):
notification_center = NotificationCenter()
try:
while True:
message = self.message_queue.wait()
if self.msrp_session is None:
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended')
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
break
try:
if isinstance(message.content, str):
message.content = message.content.encode('utf8')
charset = 'utf8'
else:
charset = None
if not isinstance(message, QueuedOTRInternalMessage):
try:
message.content = self.encryption.otr_session.handle_output(message.content, message.content_type)
except OTRError as e:
raise ChatStreamError(str(e))
message.sender = message.sender or self.local_identity
message.recipients = message.recipients or [self.remote_identity]
# check if we MUST use CPIM
need_cpim = (message.sender != self.local_identity or message.recipients != [self.remote_identity] or
message.courtesy_recipients or message.subject or message.timestamp or message.required or message.additional_headers)
if need_cpim or not contains_mime_type(self.remote_accept_types, message.content_type):
if not contains_mime_type(self.remote_accept_wrapped_types, message.content_type):
raise ChatStreamError('Unsupported content_type for outgoing message: %r' % message.content_type)
if not self.cpim_enabled:
raise ChatStreamError('Additional message meta-data cannot be sent, because the CPIM wrapper is not used')
if not self.private_messages_allowed and message.recipients != [self.remote_identity]:
raise ChatStreamError('The remote end does not support private messages')
if message.timestamp is None:
message.timestamp = ISOTimestamp.now()
payload = CPIMPayload(charset=charset, **{name: getattr(message, name) for name in Message.__slots__})
elif self.prefer_cpim and self.cpim_enabled and contains_mime_type(self.remote_accept_wrapped_types, message.content_type):
if message.timestamp is None:
message.timestamp = ISOTimestamp.now()
payload = CPIMPayload(charset=charset, **{name: getattr(message, name) for name in Message.__slots__})
else:
payload = SimplePayload(message.content, message.content_type, charset)
except ChatStreamError as e:
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason=e.args[0])
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
continue
else:
content, content_type = payload.encode()
message_id = message.id
notify_progress = message.notify_progress
report = 'yes' if notify_progress else 'no'
chunk = self.msrp_session.make_message(content, content_type=content_type, message_id=message_id)
chunk.add_header(FailureReportHeader(report))
chunk.add_header(SuccessReportHeader(report))
try:
self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_transaction_response, message_id))
except Exception as e:
if notify_progress:
data = NotificationData(message_id=message_id, message=None, code=0, reason=str(e))
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
except ProcExit:
if notify_progress:
data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended')
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
raise
else:
if notify_progress:
self.sent_messages.add(message_id)
notification_center.post_notification('ChatStreamDidSendMessage', sender=self, data=NotificationData(message=chunk))
finally:
self.message_queue_thread = None
while self.sent_messages:
message_id = self.sent_messages.pop()
data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended')
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
message_queue, self.message_queue = self.message_queue, queue()
while message_queue:
message = message_queue.wait()
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended')
notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
@run_in_twisted_thread
def _enqueue_message(self, message):
if self._done:
if message.notify_progress:
data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended')
NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data)
else:
self.message_queue.send(message)
@run_in_green_thread
def _set_local_nickname(self, nickname, message_id):
if self.msrp_session is None:
# should we generate ChatStreamDidNotSetNickname here?
return
chunk = self.msrp.make_request('NICKNAME')
chunk.add_header(UseNicknameHeader(nickname or ''))
try:
self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_nickname_transaction_response, message_id))
except Exception as e:
self._failure_reason = str(e)
NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='sending', reason=self._failure_reason))
def inject_otr_message(self, data):
message = QueuedOTRInternalMessage(data)
self._enqueue_message(message)
def send_message(self, content, content_type='text/plain', recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None):
message = QueuedMessage(content, content_type, recipients=recipients, courtesy_recipients=courtesy_recipients, subject=subject, timestamp=timestamp, required=required, additional_headers=additional_headers, notify_progress=True)
self._enqueue_message(message)
return message.id
def send_composing_indication(self, state, refresh=None, last_active=None, recipients=None):
content = IsComposingDocument.create(state=State(state), refresh=Refresh(refresh) if refresh is not None else None, last_active=LastActive(last_active) if last_active is not None else None, content_type=ContentType('text'))
message = QueuedMessage(content, IsComposingDocument.content_type, recipients=recipients, notify_progress=False)
self._enqueue_message(message)
return message.id
def set_local_nickname(self, nickname):
if not self.nickname_allowed:
raise ChatStreamError('Setting nickname is not supported')
message_id = '%x' % random.getrandbits(64)
self._set_local_nickname(nickname, message_id)
return message_id
OTRTransport.register(ChatStream)
# Chat related objects, including CPIM support as defined in RFC3862
#
class ChatIdentity(object):
_format_re = re.compile(r'^(?:"?(?P<display_name>[^<]*[^"\s])"?)?\s*<(?P<uri>sips?:.+)>$')
def __init__(self, uri, display_name=None):
self.uri = uri
self.display_name = display_name
def __eq__(self, other):
if isinstance(other, ChatIdentity):
return self.uri.user == other.uri.user and self.uri.host == other.uri.host
elif isinstance(other, BaseSIPURI):
return self.uri.user == other.user and self.uri.host == other.host
elif isinstance(other, str):
try:
other_uri = SIPURI.parse(other)
except Exception:
return False
else:
return self.uri.user == other_uri.user and self.uri.host == other_uri.host
else:
return NotImplemented
def __ne__(self, other):
return not (self == other)
def __repr__(self):
return '{0.__class__.__name__}(uri={0.uri!r}, display_name={0.display_name!r})'.format(self)
def __str__(self):
if self.display_name:
return '{0.display_name} <{0.uri}>'.format(self)
else:
return '<{0.uri}>'.format(self)
@classmethod
def parse(cls, value):
match = cls._format_re.match(value)
if match is None:
raise ValueError('Cannot parse identity value: %r' % value)
return cls(SIPURI.parse(match.group('uri')), match.group('display_name'))
class Message(object):
__slots__ = 'content', 'content_type', 'sender', 'recipients', 'courtesy_recipients', 'subject', 'timestamp', 'required', 'additional_headers'
def __init__(self, content, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None):
self.content = content
self.content_type = content_type
self.sender = sender
self.recipients = recipients or []
self.courtesy_recipients = courtesy_recipients or []
self.subject = subject
self.timestamp = ISOTimestamp(timestamp) if timestamp is not None else None
self.required = required or []
self.additional_headers = additional_headers or []
class QueuedMessage(Message):
__slots__ = 'id', 'notify_progress'
def __init__(self, content, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None, id=None, notify_progress=True):
super(QueuedMessage, self).__init__(content, content_type, sender, recipients, courtesy_recipients, subject, timestamp, required, additional_headers)
self.id = id or '%x' % random.getrandbits(64)
self.notify_progress = notify_progress
class QueuedOTRInternalMessage(QueuedMessage):
def __init__(self, content):
super(QueuedOTRInternalMessage, self).__init__(content, 'text/plain', notify_progress=False)
class SimplePayload(object):
def __init__(self, content, content_type, charset=None):
if not isinstance(content, bytes):
raise TypeError("content should be an instance of bytes")
self.content = content
self.content_type = content_type
self.charset = charset
def encode(self):
if self.charset is not None:
return self.content, '{0.content_type}; charset="{0.charset}"'.format(self)
else:
return self.content, str(self.content_type)
@classmethod
def decode(cls, content, content_type):
if not isinstance(content, bytes):
raise TypeError("content should be an instance of bytes")
type_helper = EmailParser().parsestr('Content-Type: {}'.format(content_type))
content_type = type_helper.get_content_type()
charset = type_helper.get_content_charset()
return cls(content, content_type, charset)
class CPIMPayload(object):
standard_namespace = 'urn:ietf:params:cpim-headers:'
headers_re = re.compile(r'(?:([^:]+?)\.)?(.+?):\s*(.+?)(?:\r\n|$)')
subject_re = re.compile(r'^(?:;lang=([a-z]{1,8}(?:-[a-z0-9]{1,8})*)\s+)?(.*)$')
namespace_re = re.compile(r'^(?:(\S+) ?)?<(.*)>$')
def __init__(self, content, content_type, charset=None, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None):
self.content = content
self.content_type = content_type
self.charset = charset
self.sender = sender
self.recipients = recipients or []
self.courtesy_recipients = courtesy_recipients or []
self.subject = subject if isinstance(subject, (MultilingualText, type(None))) else MultilingualText(subject)
self.timestamp = ISOTimestamp(timestamp) if timestamp is not None else None
self.required = required or []
self.additional_headers = additional_headers or []
def encode(self):
namespaces = {'': CPIMNamespace(self.standard_namespace)}
header_list = []
if self.sender is not None:
header_list.append('From: {}'.format(self.sender))
header_list.extend('To: {}'.format(recipient) for recipient in self.recipients)
header_list.extend('cc: {}'.format(recipient) for recipient in self.courtesy_recipients)
if self.subject is not None:
header_list.append('Subject: {}'.format(self.subject))
header_list.extend('Subject:;lang={} {}'.format(language, translation) for language, translation in list(self.subject.translations.items()))
if self.timestamp is not None:
header_list.append('DateTime: {}'.format(self.timestamp))
if self.required:
header_list.append('Required: {}'.format(','.join(self.required)))
for header in self.additional_headers:
if namespaces.get(header.namespace.prefix) != header.namespace:
if header.namespace.prefix:
header_list.append('NS: {0.namespace.prefix} <{0.namespace}>'.format(header))
else:
header_list.append('NS: <{0.namespace}>'.format(header))
namespaces[header.namespace.prefix] = header.namespace
if header.namespace.prefix:
header_list.append('{0.namespace.prefix}.{0.name}: {0.value}'.format(header))
else:
header_list.append('{0.name}: {0.value}'.format(header))
headers = '\r\n'.join(header_list)
mime_message = EmailMessage()
mime_message.set_payload(self.content)
mime_message.set_type(self.content_type)
if self.charset is not None:
mime_message.set_param('charset', self.charset)
return headers + '\r\n\r\n' + mime_message.as_string(), 'message/cpim'
@classmethod
def decode(cls, message):
headers, separator, body = message.partition('\r\n\r\n')
if not separator:
raise CPIMParserError('Invalid CPIM message')
sender = None
recipients = []
courtesy_recipients = []
subject = None
timestamp = None
required = []
additional_headers = []
namespaces = {'': CPIMNamespace(cls.standard_namespace)}
subjects = {}
for prefix, name, value in cls.headers_re.findall(headers):
namespace = namespaces.get(prefix)
if namespace is None or '.' in name:
continue
try:
#value = value.decode('cpim-header')
if namespace == cls.standard_namespace:
if name == 'From':
sender = ChatIdentity.parse(value)
elif name == 'To':
recipients.append(ChatIdentity.parse(value))
elif name == 'cc':
courtesy_recipients.append(ChatIdentity.parse(value))
elif name == 'Subject':
match = cls.subject_re.match(value)
if match is None:
raise ValueError('Illegal Subject header: %r' % value)
lang, subject = match.groups()
# language tags must be ASCII
subjects[str(lang) if lang is not None else None] = subject
elif name == 'DateTime':
timestamp = ISOTimestamp(value)
elif name == 'Required':
required.extend(re.split(r'\s*,\s*', value))
elif name == 'NS':
match = cls.namespace_re.match(value)
if match is None:
raise ValueError('Illegal NS header: %r' % value)
prefix, uri = match.groups()
namespaces[prefix] = CPIMNamespace(uri, prefix)
else:
additional_headers.append(CPIMHeader(name, namespace, value))
else:
additional_headers.append(CPIMHeader(name, namespace, value))
except ValueError:
pass
if None in subjects:
subject = MultilingualText(subjects.pop(None), **subjects)
elif subjects:
subject = MultilingualText(**subjects)
mime_message = EmailParser().parsestr(body)
content_type = mime_message.get_content_type()
if content_type is None:
raise CPIMParserError("CPIM message missing Content-Type MIME header")
content = mime_message.get_payload()
charset = mime_message.get_content_charset()
return cls(content, content_type, charset, sender, recipients, courtesy_recipients, subject, timestamp, required, additional_headers)
class CPIMParserError(Exception): pass
class CPIMNamespace(str):
def __new__(cls, value, prefix=''):
obj = str.__new__(cls, value)
obj.prefix = prefix
return obj
class CPIMHeader(object):
def __init__(self, name, namespace, value):
self.name = name
self.namespace = namespace
self.value = value
class CPIMCodec(codecs.Codec):
character_map = {c: '\\u{:04x}'.format(c) for c in list(range(32)) + [127]}
character_map[ord('\\')] = '\\\\'
@classmethod
def encode(cls, input, errors='strict'):
return input.translate(cls.character_map).encode('utf-8', errors), len(input)
@classmethod
def decode(cls, input, errors='strict'):
return input.decode('utf-8', errors).encode('raw-unicode-escape', errors).decode('unicode-escape', errors), len(input)
def cpim_codec_search(name):
if name.lower() in ('cpim-header', 'cpim_header'):
return codecs.CodecInfo(name='CPIM-header',
encode=CPIMCodec.encode,
decode=CPIMCodec.decode,
incrementalencoder=codecs.IncrementalEncoder,
incrementaldecoder=codecs.IncrementalDecoder,
streamwriter=codecs.StreamWriter,
streamreader=codecs.StreamReader)
codecs.register(cpim_codec_search)
del cpim_codec_search
diff --git a/sipsimple/streams/msrp/filetransfer.py b/sipsimple/streams/msrp/filetransfer.py
index d096e98d..225a02b2 100644
--- a/sipsimple/streams/msrp/filetransfer.py
+++ b/sipsimple/streams/msrp/filetransfer.py
@@ -1,735 +1,735 @@
"""
This module provides classes to parse and generate SDP related to SIP sessions that negotiate File Transfer.
"""
__all__ = ['FileTransferStream', 'FileSelector']
import pickle as pickle
import hashlib
import mimetypes
import os
import random
import re
import time
import uuid
from abc import ABCMeta, abstractmethod
from application.notification import NotificationCenter, NotificationData, IObserver
from application.python.threadpool import ThreadPool, run_in_threadpool
from application.python.types import MarkerType
from application.system import FileExistsError, makedirs, openfile, unlink
from itertools import count
from msrplib.protocol import FailureReportHeader, SuccessReportHeader, ContentTypeHeader, IntegerHeaderType, MSRPNamedHeader, HeaderParsingError
from msrplib.session import MSRPSession
from msrplib.transport import make_response
from queue import Queue
from threading import Event, Lock
from zope.interface import implementer
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import SDPAttribute
from sipsimple.storage import ISIPSimpleApplicationDataStorage
from sipsimple.streams import InvalidStreamError, UnknownStreamError
from sipsimple.streams.msrp import MSRPStreamBase
from sipsimple.threading import run_in_twisted_thread, run_in_thread
from sipsimple.util import sha1
HASH = type(hashlib.sha1())
class RandomID(metaclass=MarkerType): pass
class FileSelectorHash(str):
_hash_re = re.compile(r'^sha-1(:[0-9A-F]{2}){20}$')
_byte_re = re.compile(r'..')
def __new__(cls, value):
if isinstance(value, str):
if value.startswith('sha1:'): # backward compatibility hack (sort of).
value = 'sha-1' + value[len('sha1'):]
if not cls._hash_re.match(value):
raise ValueError("Invalid hash value: {!r}".format(value))
return super(FileSelectorHash, cls).__new__(cls, value)
elif isinstance(value, (HASH, sha1)):
return super(FileSelectorHash, cls).__new__(cls, cls.encode_hash(value))
else:
raise ValueError("Invalid hash value: {!r}".format(value))
def __eq__(self, other):
if isinstance(other, str):
return super(FileSelectorHash, self).__eq__(other)
elif isinstance(other, (HASH, sha1)) and other.name.lower() == 'sha1':
return super(FileSelectorHash, self).__eq__(self.encode_hash(other))
else:
return NotImplemented
def __ne__(self, other):
return not self == other
@classmethod
def encode_hash(cls, hash_instance):
if hash_instance.name.lower() != 'sha1':
raise TypeError("Invalid hash type: {.name} (only sha1 hashes are supported).".format(hash_instance))
# unexpected as it may be, using a regular expression is the fastest method to do this
return 'sha-1:' + ':'.join(cls._byte_re.findall(hash_instance.hexdigest().upper()))
class FileSelector(object):
_name_re = re.compile(r'name:"([^"]+)"')
_size_re = re.compile(r'size:(\d+)')
_type_re = re.compile(r'type:([^ ]+)')
_hash_re = re.compile(r'hash:([^ ]+)')
def __init__(self, name=None, type=None, size=None, hash=None, fd=None):
# If present, hash should be a sha1 object or a string in the form: sha-1:72:24:5F:E8:65:3D:DA:F3:71:36:2F:86:D4:71:91:3E:E4:A2:CE:2E
# According to the specification, only sha1 is supported ATM.
self.name = name
self.type = type
self.size = size
self.hash = hash
self.fd = fd
@property
def hash(self):
return self.__dict__['hash']
@hash.setter
def hash(self, value):
self.__dict__['hash'] = None if value is None else FileSelectorHash(value)
@classmethod
def parse(cls, string):
name_match = cls._name_re.search(string)
size_match = cls._size_re.search(string)
type_match = cls._type_re.search(string)
hash_match = cls._hash_re.search(string)
name = name_match and name_match.group(1).decode('utf-8')
size = size_match and int(size_match.group(1))
type = type_match and type_match.group(1)
hash = hash_match and hash_match.group(1)
return cls(name, type, size, hash)
@classmethod
def for_file(cls, path, type=None, hash=None):
name = str(path)
fd = open(name, 'rb')
size = os.fstat(fd.fileno()).st_size
if type is None:
mime_type, encoding = mimetypes.guess_type(name)
if encoding is not None:
type = 'application/x-%s' % encoding
elif mime_type is not None:
type = mime_type
else:
type = 'application/octet-stream'
return cls(name, type, size, hash, fd)
@property
def sdp_repr(self):
items = [('name', self.name and '"%s"' % os.path.basename(self.name).encode('utf-8')), ('type', self.type), ('size', self.size), ('hash', self.hash)]
return ' '.join('%s:%s' % (name, value) for name, value in items if value is not None)
class UniqueFilenameGenerator(object):
@classmethod
def generate(cls, name):
yield name
prefix, extension = os.path.splitext(name)
for x in count(1):
yield "%s-%d%s" % (prefix, x, extension)
class FileMetadataEntry(object):
def __init__(self, hash, filename, partial_hash=None):
self.hash = hash
self.filename = filename
self.mtime = os.path.getmtime(self.filename)
self.partial_hash = partial_hash
@classmethod
def from_selector(cls, file_selector):
return cls(file_selector.hash.lower(), file_selector.name)
class FileTransfersMetadata(object):
__filename__ = 'transfer_metadata'
__lifetime__ = 60*60*24*7
def __init__(self):
self.data = {}
self.lock = Lock()
self.loaded = False
self.directory = None
def _load(self):
if self.loaded:
return
from sipsimple.application import SIPApplication
if ISIPSimpleApplicationDataStorage.providedBy(SIPApplication.storage):
self.directory = SIPApplication.storage.directory
if self.directory is not None:
try:
with open(os.path.join(self.directory, self.__filename__), 'rb') as f:
data = pickle.loads(f.read())
except Exception:
data = {}
now = time.time()
for hash, entry in list(data.items()):
try:
mtime = os.path.getmtime(entry.filename)
except OSError:
data.pop(hash)
else:
if mtime != entry.mtime or now - mtime > self.__lifetime__:
data.pop(hash)
self.data.update(data)
self.loaded = True
@run_in_thread('file-io')
def _save(self, data):
if self.directory is not None:
with open(os.path.join(self.directory, self.__filename__), 'wb') as f:
f.write(data)
def __enter__(self):
self.lock.acquire()
self._load()
return self.data
def __exit__(self, exc_type, exc_val, exc_tb):
if None is exc_type is exc_val is exc_tb:
self._save(pickle.dumps(self.data))
self.lock.release()
@implementer(IObserver)
class FileTransferHandler(object, metaclass=ABCMeta):
threadpool = ThreadPool(name='FileTransfers', min_threads=0, max_threads=100)
threadpool.start()
def __init__(self):
self.stream = None
self.session = None
self._started = False
self._session_started = False
self._initialize_done = False
self._initialize_successful = False
def initialize(self, stream, session):
self.stream = stream
self.session = session
notification_center = NotificationCenter()
notification_center.add_observer(self, sender=stream)
notification_center.add_observer(self, sender=session)
notification_center.add_observer(self, sender=self)
@property
def filename(self):
return self.stream.file_selector.name if self.stream is not None else None
@abstractmethod
def start(self):
raise NotImplementedError
@abstractmethod
def end(self):
raise NotImplementedError
@abstractmethod
def process_chunk(self, chunk):
raise NotImplementedError
def __terminate(self):
notification_center = NotificationCenter()
notification_center.remove_observer(self, sender=self.stream)
notification_center.remove_observer(self, sender=self.session)
notification_center.remove_observer(self, sender=self)
try:
self.stream.file_selector.fd.close()
except AttributeError: # when self.stream.file_selector.fd is None
pass
except IOError: # we can get this if we try to close while another thread is reading from it
pass
self.stream = None
self.session = None
@run_in_twisted_thread
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, None)
if handler is not None:
handler(notification)
def _NH_MediaStreamDidNotInitialize(self, notification):
if not self._initialize_done:
self.end()
self.__terminate()
def _NH_MediaStreamDidStart(self, notification):
self._started = True
self.start()
def _NH_MediaStreamWillEnd(self, notification):
if self._started:
self.end()
elif self._session_started:
notification.center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Refused'))
def _NH_SIPSessionWillStart(self, notification):
self._session_started = True
def _NH_SIPSessionDidFail(self, notification):
if not self._session_started and self._initialize_successful:
if notification.data.code == 487:
reason = 'Cancelled'
else:
reason = notification.data.reason or 'Failed'
notification.center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=reason))
def _NH_FileTransferHandlerDidInitialize(self, notification):
self._initialize_done = True
self._initialize_successful = True
def _NH_FileTransferHandlerDidNotInitialize(self, notification):
self._initialize_done = True
self._initialize_successful = False
def _NH_FileTransferHandlerDidEnd(self, notification):
self.__terminate()
class OffsetHeader(MSRPNamedHeader):
name = 'Offset'
type = IntegerHeaderType
class EndTransfer(metaclass=MarkerType): pass
class IncomingFileTransferHandler(FileTransferHandler):
metadata = FileTransfersMetadata()
def __init__(self):
super(IncomingFileTransferHandler, self).__init__()
self.hash = sha1()
self.queue = Queue()
self.offset = 0
self.received_chunks = 0
@property
def save_directory(self):
return self.__dict__.get('save_directory')
@save_directory.setter
def save_directory(self, value):
if self.stream is not None:
raise AttributeError('cannot set save_directory, transfer is in progress')
self.__dict__['save_directory'] = value
def initialize(self, stream, session):
super(IncomingFileTransferHandler, self).initialize(stream, session)
try:
directory = self.save_directory or SIPSimpleSettings().file_transfer.directory.normalized
makedirs(directory)
with self.metadata as metadata:
try:
prev_file = metadata.pop(stream.file_selector.hash.lower())
mtime = os.path.getmtime(prev_file.filename)
if mtime != prev_file.mtime:
raise ValueError('file was modified')
filename = os.path.join(directory, os.path.basename(stream.file_selector.name))
try:
os.link(prev_file.filename, filename)
except (AttributeError, OSError):
stream.file_selector.name = prev_file.filename
else:
stream.file_selector.name = filename
unlink(prev_file.filename)
stream.file_selector.fd = openfile(stream.file_selector.name, 'ab') # open doesn't seek to END in append mode on win32 until first write, but openfile does
self.offset = stream.file_selector.fd.tell()
self.hash = prev_file.partial_hash
except (KeyError, EnvironmentError, ValueError):
for name in UniqueFilenameGenerator.generate(os.path.join(directory, os.path.basename(stream.file_selector.name))):
try:
stream.file_selector.fd = openfile(name, 'xb')
except FileExistsError:
continue
else:
stream.file_selector.name = name
break
except Exception as e:
NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason=str(e)))
else:
NotificationCenter().post_notification('FileTransferHandlerDidInitialize', sender=self)
def end(self):
self.queue.put(EndTransfer)
def process_chunk(self, chunk):
if chunk.method == 'SEND':
if not self.received_chunks and chunk.byte_range.start == 1:
self.stream.file_selector.fd.truncate(0)
self.stream.file_selector.fd.seek(0)
self.hash = sha1()
self.offset = 0
self.received_chunks += 1
self.queue.put(chunk)
elif chunk.method == 'FILE_OFFSET':
if self.received_chunks > 0:
response = make_response(chunk, 413, 'Unwanted message')
else:
offset = self.stream.file_selector.fd.tell()
response = make_response(chunk, 200, 'OK')
response.add_header(OffsetHeader(offset))
self.stream.msrp_session.send_chunk(response)
@run_in_threadpool(FileTransferHandler.threadpool)
def start(self):
notification_center = NotificationCenter()
notification_center.post_notification('FileTransferHandlerDidStart', sender=self)
file_selector = self.stream.file_selector
fd = file_selector.fd
while True:
chunk = self.queue.get()
if chunk is EndTransfer:
break
try:
fd.write(chunk.data)
except EnvironmentError as e:
fd.close()
notification_center.post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e)))
notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=str(e)))
return
self.hash.update(chunk.data)
self.offset += chunk.size
transferred_bytes = chunk.byte_range.start + chunk.size - 1
total_bytes = file_selector.size = chunk.byte_range.total
notification_center.post_notification('FileTransferHandlerProgress', sender=self, data=NotificationData(transferred_bytes=transferred_bytes, total_bytes=total_bytes))
if transferred_bytes == total_bytes:
break
fd.close()
# Transfer is finished
if self.offset != self.stream.file_selector.size:
notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Incomplete file'))
return
if self.hash != self.stream.file_selector.hash:
unlink(self.filename) # something got corrupted, better delete the file
notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='File hash mismatch'))
return
notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=False, reason=None))
def _NH_MediaStreamDidNotInitialize(self, notification):
if self.stream.file_selector.fd is not None:
position = self.stream.file_selector.fd.tell()
self.stream.file_selector.fd.close()
if position == 0:
unlink(self.stream.file_selector.name)
super(IncomingFileTransferHandler, self)._NH_MediaStreamDidNotInitialize(notification)
def _NH_FileTransferHandlerDidEnd(self, notification):
if notification.data.error and self.stream.file_selector.hash is not None:
if os.path.getsize(self.stream.file_selector.name) == 0:
unlink(self.stream.file_selector.name)
else:
with self.metadata as metadata:
entry = FileMetadataEntry.from_selector(self.stream.file_selector)
entry.partial_hash = self.hash
metadata[entry.hash] = entry
super(IncomingFileTransferHandler, self)._NH_FileTransferHandlerDidEnd(notification)
class OutgoingFileTransferHandler(FileTransferHandler):
file_part_size = 64*1024
def __init__(self):
super(OutgoingFileTransferHandler, self).__init__()
self.stop_event = Event()
self.finished_event = Event()
self.file_offset_event = Event()
self.message_id = '%x' % random.getrandbits(64)
self.offset = 0
def initialize(self, stream, session):
super(OutgoingFileTransferHandler, self).initialize(stream, session)
if stream.file_selector.fd is None:
NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='file descriptor not specified'))
return
if stream.file_selector.size == 0:
NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='file is empty'))
return
if stream.file_selector.hash is None:
self._calculate_file_hash()
else:
NotificationCenter().post_notification('FileTransferHandlerDidInitialize', sender=self)
@run_in_threadpool(FileTransferHandler.threadpool)
def _calculate_file_hash(self):
file_hash = hashlib.sha1()
processed = 0
notification_center = NotificationCenter()
notification_center.post_notification('FileTransferHandlerHashProgress', sender=self, data=NotificationData(processed=0, total=self.stream.file_selector.size))
file_selector = self.stream.file_selector
fd = file_selector.fd
while not self.stop_event.is_set():
try:
content = fd.read(self.file_part_size)
except EnvironmentError as e:
fd.close()
notification_center.post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason=str(e)))
return
if not content:
file_selector.hash = file_hash
notification_center.post_notification('FileTransferHandlerDidInitialize', sender=self)
break
file_hash.update(content)
processed += len(content)
notification_center.post_notification('FileTransferHandlerHashProgress', sender=self, data=NotificationData(processed=processed, total=file_selector.size))
else:
fd.close()
notification_center.post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='Interrupted transfer'))
def end(self):
self.stop_event.set()
self.file_offset_event.set() # in case we are busy waiting on it
@run_in_threadpool(FileTransferHandler.threadpool)
def start(self):
notification_center = NotificationCenter()
notification_center.post_notification('FileTransferHandlerDidStart', sender=self)
if self.stream.file_offset_supported:
self._send_file_offset_chunk()
self.file_offset_event.wait()
finished = False
failure_reason = None
fd = self.stream.file_selector.fd
fd.seek(self.offset)
try:
while not self.stop_event.is_set():
try:
data = fd.read(self.file_part_size)
except EnvironmentError as e:
failure_reason = str(e)
break
if not data:
finished = True
break
self._send_chunk(data)
finally:
fd.close()
if not finished:
notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=failure_reason or 'Interrupted transfer'))
return
# Wait until the stream ends or we get all reports
self.stop_event.wait()
if self.finished_event.is_set():
notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=False, reason=None))
else:
notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Incomplete transfer'))
def _on_transaction_response(self, response):
if self.stop_event.is_set():
return
if response.code != 200:
NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=response.comment))
self.end()
@run_in_twisted_thread
def _send_chunk(self, data):
if self.stop_event.is_set():
return
data_len = len(data)
chunk = self.stream.msrp.make_send_request(message_id=self.message_id,
data=data,
start=self.offset+1,
end=self.offset+data_len,
length=self.stream.file_selector.size)
chunk.add_header(ContentTypeHeader(self.stream.file_selector.type))
chunk.add_header(SuccessReportHeader('yes'))
chunk.add_header(FailureReportHeader('yes'))
try:
self.stream.msrp_session.send_chunk(chunk, response_cb=self._on_transaction_response)
except Exception as e:
NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e)))
else:
self.offset += data_len
@run_in_twisted_thread
def _send_file_offset_chunk(self):
def response_cb(response):
if not self.stop_event.is_set() and response.code == 200:
try:
offset = response.headers['Offset'].decoded
except (KeyError, HeaderParsingError):
offset = 0
self.offset = offset
self.file_offset_event.set()
if self.stop_event.is_set():
self.file_offset_event.set()
return
chunk = self.stream.msrp.make_request('FILE_OFFSET') # TODO: _ is illegal in MSRP method names according to RFC 4975
try:
self.stream.msrp_session.send_chunk(chunk, response_cb=response_cb)
except Exception as e:
NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e)))
def process_chunk(self, chunk):
# here we process the REPORT chunks
notification_center = NotificationCenter()
if chunk.status.code == 200:
transferred_bytes = chunk.byte_range.end
total_bytes = chunk.byte_range.total
notification_center.post_notification('FileTransferHandlerProgress', sender=self, data=NotificationData(transferred_bytes=transferred_bytes, total_bytes=total_bytes))
if transferred_bytes == total_bytes:
self.finished_event.set()
self.end()
else:
notification_center.post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=chunk.status.comment))
self.end()
class FileTransferMSRPSession(MSRPSession):
def _handle_incoming_FILE_OFFSET(self, chunk):
self._on_incoming_cb(chunk)
class FileTransferStream(MSRPStreamBase):
type = 'file-transfer'
priority = 10
msrp_session_class = FileTransferMSRPSession
media_type = 'message'
accept_types = ['*']
accept_wrapped_types = None
IncomingTransferHandler = IncomingFileTransferHandler
OutgoingTransferHandler = OutgoingFileTransferHandler
def __init__(self, file_selector, direction, transfer_id=RandomID):
if direction not in ('sendonly', 'recvonly'):
raise ValueError("direction must be one of 'sendonly' or 'recvonly'")
super(FileTransferStream, self).__init__(direction=direction)
self.file_selector = file_selector
self.transfer_id = transfer_id if transfer_id is not RandomID else str(uuid.uuid4())
if direction == 'sendonly':
self.handler = self.OutgoingTransferHandler()
else:
self.handler = self.IncomingTransferHandler()
@classmethod
def new_from_sdp(cls, session, remote_sdp, stream_index):
remote_stream = remote_sdp.media[stream_index]
- if remote_stream.media != 'message' or 'file-selector' not in remote_stream.attributes:
+ if remote_stream.media != b'message' or b'file-selector' not in remote_stream.attributes:
raise UnknownStreamError
expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport == 'tls' else 'TCP/MSRP'
if remote_stream.transport != expected_transport:
raise InvalidStreamError("expected %s transport in file transfer stream, got %s" % (expected_transport, remote_stream.transport))
if remote_stream.formats != ['*']:
raise InvalidStreamError("wrong format list specified")
try:
file_selector = FileSelector.parse(remote_stream.attributes.getfirst('file-selector'))
except Exception as e:
raise InvalidStreamError("error parsing file-selector: {}".format(e))
transfer_id = remote_stream.attributes.getfirst('file-transfer-id', None)
if remote_stream.direction == 'sendonly':
stream = cls(file_selector, 'recvonly', transfer_id)
elif remote_stream.direction == 'recvonly':
stream = cls(file_selector, 'sendonly', transfer_id)
else:
raise InvalidStreamError("wrong stream direction specified")
stream.remote_role = remote_stream.attributes.getfirst('setup', 'active')
return stream
def initialize(self, session, direction):
self._initialize_args = session, direction
NotificationCenter().add_observer(self, sender=self.handler)
self.handler.initialize(self, session)
def _create_local_media(self, uri_path):
local_media = super(FileTransferStream, self)._create_local_media(uri_path)
- local_media.attributes.append(SDPAttribute('file-selector', self.file_selector.sdp_repr))
- local_media.attributes.append(SDPAttribute('x-file-offset', ''))
+ local_media.attributes.append(SDPAttribute(b'file-selector', self.file_selector.sdp_repr.encode()))
+ local_media.attributes.append(SDPAttribute(b'x-file-offset', b''))
if self.transfer_id is not None:
- local_media.attributes.append(SDPAttribute('file-transfer-id', self.transfer_id))
+ local_media.attributes.append(SDPAttribute(b'file-transfer-id', self.transfer_id.encode()))
return local_media
@property
def file_offset_supported(self):
try:
return 'x-file-offset' in self.remote_media.attributes
except AttributeError:
return False
@run_in_twisted_thread
def _NH_FileTransferHandlerDidInitialize(self, notification):
session, direction = self._initialize_args
del self._initialize_args
if not self._done:
super(FileTransferStream, self).initialize(session, direction)
@run_in_twisted_thread
def _NH_FileTransferHandlerDidNotInitialize(self, notification):
del self._initialize_args
if not self._done:
notification.center.post_notification('MediaStreamDidNotInitialize', sender=self, data=notification.data)
@run_in_twisted_thread
def _NH_FileTransferHandlerError(self, notification):
self._failure_reason = notification.data.error
notification.center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='transferring', reason=self._failure_reason))
def _NH_MediaStreamDidNotInitialize(self, notification):
notification.center.remove_observer(self, sender=self.handler)
def _NH_MediaStreamWillEnd(self, notification):
notification.center.remove_observer(self, sender=self.handler)
def _handle_REPORT(self, chunk):
# in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message.
self.handler.process_chunk(chunk)
def _handle_SEND(self, chunk):
notification_center = NotificationCenter()
if chunk.size == 0:
# keep-alive
self.msrp_session.send_report(chunk, 200, 'OK')
return
if self.direction=='sendonly':
self.msrp_session.send_report(chunk, 413, 'Unwanted Message')
return
if chunk.content_type.lower() == 'message/cpim':
# In order to properly support the CPIM wrapper, msrplib needs to be refactored. -Luci
self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type')
self._failure_reason = "CPIM wrapper is not supported"
notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason))
return
try:
self.msrp_session.send_report(chunk, 200, 'OK')
except Exception:
pass # Best effort approach: even if we couldn't send the REPORT keep writing the chunks, we might have them all -Saul
self.handler.process_chunk(chunk)
def _handle_FILE_OFFSET(self, chunk):
if self.direction != 'recvonly':
response = make_response(chunk, 413, 'Unwanted message')
self.msrp_session.send_chunk(response)
return
self.handler.process_chunk(chunk)
diff --git a/sipsimple/streams/msrp/screensharing.py b/sipsimple/streams/msrp/screensharing.py
index d991fedb..11f215ec 100644
--- a/sipsimple/streams/msrp/screensharing.py
+++ b/sipsimple/streams/msrp/screensharing.py
@@ -1,351 +1,353 @@
"""
This module provides classes to parse and generate SDP related to SIP sessions that negotiate Screen Sharing.
"""
__all__ = ['ScreenSharingStream', 'VNCConnectionError', 'ScreenSharingHandler', 'ScreenSharingServerHandler', 'ScreenSharingViewerHandler',
'InternalVNCViewerHandler', 'InternalVNCServerHandler', 'ExternalVNCViewerHandler', 'ExternalVNCServerHandler']
from abc import ABCMeta, abstractmethod, abstractproperty
from application.notification import NotificationCenter, NotificationData, IObserver
from application.python.descriptor import WriteOnceAttribute
from eventlib.coros import queue
from eventlib.greenio import GreenSocket
from eventlib.proc import spawn
from eventlib.util import tcp_socket, set_reuse_addr
from msrplib.protocol import FailureReportHeader, SuccessReportHeader, ContentTypeHeader
from msrplib.transport import make_response, make_report
from twisted.internet.error import ConnectionDone
from zope.interface import implementer
from sipsimple.core import SDPAttribute
from sipsimple.streams import InvalidStreamError, UnknownStreamError
from sipsimple.streams.msrp import MSRPStreamBase
from sipsimple.threading import run_in_twisted_thread
class VNCConnectionError(Exception): pass
@implementer(IObserver)
class ScreenSharingHandler(object, metaclass=ABCMeta):
def __init__(self):
self.incoming_msrp_queue = None
self.outgoing_msrp_queue = None
self.msrp_reader_thread = None
self.msrp_writer_thread = None
def initialize(self, stream):
self.incoming_msrp_queue = stream.incoming_queue
self.outgoing_msrp_queue = stream.outgoing_queue
NotificationCenter().add_observer(self, sender=stream)
@abstractproperty
def type(self):
raise NotImplementedError
@abstractmethod
def _msrp_reader(self):
raise NotImplementedError
@abstractmethod
def _msrp_writer(self):
raise NotImplementedError
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, None)
if handler is not None:
handler(notification)
def _NH_MediaStreamDidStart(self, notification):
self.msrp_reader_thread = spawn(self._msrp_reader)
self.msrp_writer_thread = spawn(self._msrp_writer)
def _NH_MediaStreamWillEnd(self, notification):
notification.center.remove_observer(self, sender=notification.sender)
if self.msrp_reader_thread is not None:
self.msrp_reader_thread.kill()
self.msrp_reader_thread = None
if self.msrp_writer_thread is not None:
self.msrp_writer_thread.kill()
self.msrp_writer_thread = None
class ScreenSharingServerHandler(ScreenSharingHandler):
type = property(lambda self: 'passive')
class ScreenSharingViewerHandler(ScreenSharingHandler):
type = property(lambda self: 'active')
class InternalVNCViewerHandler(ScreenSharingViewerHandler):
@run_in_twisted_thread
def send(self, data):
self.outgoing_msrp_queue.send(data)
def _msrp_reader(self):
notification_center = NotificationCenter()
while True:
data = self.incoming_msrp_queue.wait()
notification_center.post_notification('ScreenSharingStreamGotData', sender=self, data=NotificationData(data=data))
def _msrp_writer(self):
pass
class InternalVNCServerHandler(ScreenSharingServerHandler):
@run_in_twisted_thread
def send(self, data):
self.outgoing_msrp_queue.send(data)
def _msrp_reader(self):
notification_center = NotificationCenter()
while True:
data = self.incoming_msrp_queue.wait()
notification_center.post_notification('ScreenSharingStreamGotData', sender=self, data=NotificationData(data=data))
def _msrp_writer(self):
pass
class ExternalVNCViewerHandler(ScreenSharingViewerHandler):
address = ('localhost', 0)
connect_timeout = 5
def __init__(self):
super(ExternalVNCViewerHandler, self).__init__()
self.vnc_starter_thread = None
self.vnc_socket = GreenSocket(tcp_socket())
set_reuse_addr(self.vnc_socket)
self.vnc_socket.settimeout(self.connect_timeout)
self.vnc_socket.bind(self.address)
self.vnc_socket.listen(1)
self.address = self.vnc_socket.getsockname()
def _msrp_reader(self):
while True:
try:
data = self.incoming_msrp_queue.wait()
self.vnc_socket.sendall(data)
except Exception as e:
self.msrp_reader_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification
NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='sending', reason=str(e)))
break
def _msrp_writer(self):
while True:
try:
data = self.vnc_socket.recv(2048)
if not data:
raise VNCConnectionError("connection with the VNC viewer was closed")
self.outgoing_msrp_queue.send(data)
except Exception as e:
self.msrp_writer_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification
NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='reading', reason=str(e)))
break
def _start_vnc_connection(self):
try:
sock, addr = self.vnc_socket.accept()
self.vnc_socket.close()
self.vnc_socket = sock
self.vnc_socket.settimeout(None)
except Exception as e:
self.vnc_starter_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification
NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='connecting', reason=str(e)))
else:
self.msrp_reader_thread = spawn(self._msrp_reader)
self.msrp_writer_thread = spawn(self._msrp_writer)
finally:
self.vnc_starter_thread = None
def _NH_MediaStreamDidStart(self, notification):
self.vnc_starter_thread = spawn(self._start_vnc_connection)
def _NH_MediaStreamWillEnd(self, notification):
if self.vnc_starter_thread is not None:
self.vnc_starter_thread.kill()
self.vnc_starter_thread = None
super(ExternalVNCViewerHandler, self)._NH_MediaStreamWillEnd(notification)
self.vnc_socket.close()
class ExternalVNCServerHandler(ScreenSharingServerHandler):
address = ('localhost', 5900)
connect_timeout = 5
def __init__(self):
super(ExternalVNCServerHandler, self).__init__()
self.vnc_starter_thread = None
self.vnc_socket = None
def _msrp_reader(self):
while True:
try:
data = self.incoming_msrp_queue.wait()
self.vnc_socket.sendall(data)
except Exception as e:
self.msrp_reader_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification
NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='sending', reason=str(e)))
break
def _msrp_writer(self):
while True:
try:
data = self.vnc_socket.recv(2048)
if not data:
raise VNCConnectionError("connection to the VNC server was closed")
self.outgoing_msrp_queue.send(data)
except Exception as e:
self.msrp_writer_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification
NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='reading', reason=str(e)))
break
def _start_vnc_connection(self):
try:
self.vnc_socket = GreenSocket(tcp_socket())
self.vnc_socket.settimeout(self.connect_timeout)
self.vnc_socket.connect(self.address)
self.vnc_socket.settimeout(None)
except Exception as e:
self.vnc_starter_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification
NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='connecting', reason=str(e)))
else:
self.msrp_reader_thread = spawn(self._msrp_reader)
self.msrp_writer_thread = spawn(self._msrp_writer)
finally:
self.vnc_starter_thread = None
def _NH_MediaStreamDidStart(self, notification):
self.vnc_starter_thread = spawn(self._start_vnc_connection)
def _NH_MediaStreamWillEnd(self, notification):
if self.vnc_starter_thread is not None:
self.vnc_starter_thread.kill()
self.vnc_starter_thread = None
super(ExternalVNCServerHandler, self)._NH_MediaStreamWillEnd(notification)
if self.vnc_socket is not None:
self.vnc_socket.close()
class ScreenSharingStream(MSRPStreamBase):
type = 'screen-sharing'
priority = 1
media_type = 'application'
accept_types = ['application/x-rfb']
accept_wrapped_types = None
ServerHandler = InternalVNCServerHandler
ViewerHandler = InternalVNCViewerHandler
handler = WriteOnceAttribute()
def __init__(self, mode):
if mode not in ('viewer', 'server'):
raise ValueError("mode should be 'viewer' or 'server' not '%s'" % mode)
super(ScreenSharingStream, self).__init__(direction='sendrecv')
self.handler = self.ViewerHandler() if mode=='viewer' else self.ServerHandler()
self.incoming_queue = queue()
self.outgoing_queue = queue()
self.msrp_reader_thread = None
self.msrp_writer_thread = None
@classmethod
def new_from_sdp(cls, session, remote_sdp, stream_index):
remote_stream = remote_sdp.media[stream_index]
- if remote_stream.media != 'application':
+ if remote_stream.media != b'application':
raise UnknownStreamError
accept_types = remote_stream.attributes.getfirst('accept-types', None)
+ accept_types = accept_types.decode() if accept_types else None
if accept_types is None or 'application/x-rfb' not in accept_types.split():
raise UnknownStreamError
expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport=='tls' else 'TCP/MSRP'
- if remote_stream.transport != expected_transport:
+ if remote_stream.transport != expected_transport.encode():
raise InvalidStreamError("expected %s transport in chat stream, got %s" % (expected_transport, remote_stream.transport))
- if remote_stream.formats != ['*']:
+ if remote_stream.formats != [b'*']:
raise InvalidStreamError("wrong format list specified")
remote_rfbsetup = remote_stream.attributes.getfirst('rfbsetup', 'active')
+ remote_rfbsetup = remote_rfbsetup.decode() if remote_rfbsetup else None
if remote_rfbsetup == 'active':
stream = cls(mode='server')
elif remote_rfbsetup == 'passive':
stream = cls(mode='viewer')
else:
raise InvalidStreamError("unknown rfbsetup attribute in the remote screen sharing stream")
- stream.remote_role = remote_stream.attributes.getfirst('setup', 'active')
+ stream.remote_role = remote_stream.attributes.getfirst(b'setup', b'active')
return stream
def _create_local_media(self, uri_path):
local_media = super(ScreenSharingStream, self)._create_local_media(uri_path)
- local_media.attributes.append(SDPAttribute('rfbsetup', self.handler.type))
+ local_media.attributes.append(SDPAttribute(b'rfbsetup', self.handler.type.encode()))
return local_media
def _msrp_reader(self):
while True:
try:
chunk = self.msrp.read_chunk()
if chunk.method in (None, 'REPORT'):
continue
elif chunk.method == 'SEND':
if chunk.content_type in self.accept_types:
self.incoming_queue.send(chunk.data)
response = make_response(chunk, 200, 'OK')
report = make_report(chunk, 200, 'OK')
else:
response = make_response(chunk, 415, 'Invalid Content-Type')
report = None
else:
response = make_response(chunk, 501, 'Unknown method')
report = None
if response is not None:
self.msrp.write_chunk(response)
if report is not None:
self.msrp.write_chunk(report)
except Exception as e:
self.msrp_reader_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification
if self.shutting_down and isinstance(e, ConnectionDone):
break
self._failure_reason = str(e)
NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason))
break
def _msrp_writer(self):
while True:
try:
data = self.outgoing_queue.wait()
chunk = self.msrp.make_send_request(data=data)
chunk.add_header(SuccessReportHeader('no'))
chunk.add_header(FailureReportHeader('partial'))
chunk.add_header(ContentTypeHeader('application/x-rfb'))
self.msrp.write_chunk(chunk)
except Exception as e:
self.msrp_writer_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification
if self.shutting_down and isinstance(e, ConnectionDone):
break
self._failure_reason = str(e)
NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='sending', reason=self._failure_reason))
break
def _NH_MediaStreamDidInitialize(self, notification):
notification.center.add_observer(self, sender=self.handler)
self.handler.initialize(self)
def _NH_MediaStreamDidStart(self, notification):
self.msrp_reader_thread = spawn(self._msrp_reader)
self.msrp_writer_thread = spawn(self._msrp_writer)
def _NH_MediaStreamWillEnd(self, notification):
notification.center.remove_observer(self, sender=self.handler)
if self.msrp_reader_thread is not None:
self.msrp_reader_thread.kill()
self.msrp_reader_thread = None
if self.msrp_writer_thread is not None:
self.msrp_writer_thread.kill()
self.msrp_writer_thread = None
def _NH_ScreenSharingHandlerDidFail(self, notification):
self._failure_reason = notification.data.reason
notification.center.post_notification('MediaStreamDidFail', sender=self, data=notification.data)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 4:02 AM (20 h, 50 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3407159
Default Alt Text
(109 KB)

Event Timeline