diff --git a/sipsimple/streams/msrp/__init__.py b/sipsimple/streams/msrp/__init__.py index 2e91f495..aed97f03 100644 --- a/sipsimple/streams/msrp/__init__.py +++ b/sipsimple/streams/msrp/__init__.py @@ -1,407 +1,408 @@ """ Handling of MSRP media streams according to RFC4975, RFC4976, RFC5547 and RFC3994. """ __all__ = ['MSRPStreamError', 'MSRPStreamBase'] import traceback from application.notification import NotificationCenter, NotificationData, IObserver from application.python import Null from application.system import host from twisted.internet.error import ConnectionDone from zope.interface import implementer from eventlib import api from msrplib.connect import DirectConnector, DirectAcceptor, RelayConnection, MSRPRelaySettings from msrplib.protocol import URI from msrplib.session import contains_mime_type from sipsimple.account import Account, BonjourAccount from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SDPAttribute, SDPConnection, SDPMediaStream from sipsimple.streams import IMediaStream, MediaStreamType, StreamError from sipsimple.threading.green import run_in_green_thread class MSRPStreamError(StreamError): pass @implementer(IMediaStream, IObserver) class MSRPStreamBase(object, metaclass=MediaStreamType): # Attributes that need to be defined by each MSRP stream type type = None priority = None msrp_session_class = None media_type = None accept_types = None accept_wrapped_types = None # These attributes are always False for any MSRP stream hold_supported = False on_hold = False on_hold_by_local = False on_hold_by_remote = False def __new__(cls, *args, **kw): if cls is MSRPStreamBase: raise TypeError("MSRPStreamBase cannot be instantiated directly") return object.__new__(cls) def __init__(self, direction='sendrecv'): self.direction = direction self.greenlet = None self.local_media = None self.remote_media = None self.msrp = None # Placeholder for the MSRPTransport that will be set when started self.msrp_connector = None self.cpim_enabled = None # Boolean value. None means it was not negotiated yet self.session = None self.msrp_session = None self.shutting_down = False self.local_role = None self.remote_role = None self.transport = None self.remote_accept_types = None self.remote_accept_wrapped_types = None self._initialize_done = False self._done = False self._failure_reason = None @property def local_uri(self): msrp = self.msrp or self.msrp_connector return msrp.local_uri if msrp is not None else None def _create_local_media(self, uri_path): transport = "TCP/TLS/MSRP" if uri_path[-1].use_tls else "TCP/MSRP" attributes = [] path = " ".join(str(uri) for uri in uri_path) attributes.append(SDPAttribute(b"path", path.encode())) if self.direction not in [None, 'sendrecv']: attributes.append(SDPAttribute(self.direction.encode(), b'')) if self.accept_types is not None: a_types = " ".join(self.accept_types) attributes.append(SDPAttribute(b"accept-types", a_types.encode())) if self.accept_wrapped_types is not None: a_w_types = " ".join(self.accept_wrapped_types) attributes.append(SDPAttribute(b"accept-wrapped-types", a_w_types.encode())) attributes.append(SDPAttribute(b"setup", self.local_role.encode() if self.local_role else None)) local_ip = uri_path[-1].host connection = SDPConnection(local_ip.encode()) return SDPMediaStream(self.media_type.encode(), uri_path[-1].port or 2855, transport.encode(), connection=connection, formats=[b"*"], attributes=attributes) # The public API (the IMediaStream interface) # noinspection PyUnusedLocal def get_local_media(self, remote_sdp=None, index=0): return self.local_media def new_from_sdp(self, session, remote_sdp, stream_index): raise NotImplementedError @run_in_green_thread def initialize(self, session, direction): self.greenlet = api.getcurrent() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) try: self.session = session self.transport = self.session.account.msrp.transport outgoing = direction == 'outgoing' logger = NotificationProxyLogger() if self.session.account is BonjourAccount(): if outgoing: self.msrp_connector = DirectConnector(logger=logger) self.local_role = 'active' else: if self.transport == 'tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger) self.local_role = 'passive' else: if self.session.account.msrp.connection_model == 'relay': if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' elif outgoing and not self.session.account.nat_traversal.use_msrp_relay_for_outbound: self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if self.session.account.nat_traversal.msrp_relay is None: relay_host = relay_port = None else: if self.transport != self.session.account.nat_traversal.msrp_relay.transport: raise MSRPStreamError("MSRP relay transport conflicts with MSRP transport setting") relay_host = self.session.account.nat_traversal.msrp_relay.host relay_port = self.session.account.nat_traversal.msrp_relay.port relay = MSRPRelaySettings(domain=self.session.account.uri.host, username=self.session.account.uri.user, password=self.session.account.credentials.password, host=relay_host, port=relay_port, use_tls=self.transport=='tls') self.msrp_connector = RelayConnection(relay, 'passive', logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' else: if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if not outgoing and self.transport == 'tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' full_local_path = self.msrp_connector.prepare(local_uri=URI(host=host.default_ip, port=0, use_tls=self.transport=='tls', credentials=self.session.account.tls_credentials)) self.local_media = self._create_local_media(full_local_path) except Exception as e: traceback.print_exc() notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=str(e))) else: notification_center.post_notification('MediaStreamDidInitialize', sender=self) finally: self._initialize_done = True self.greenlet = None # noinspection PyUnusedLocal @run_in_green_thread def start(self, local_sdp, remote_sdp, stream_index): self.greenlet = api.getcurrent() notification_center = NotificationCenter() context = 'sdp_negotiation' try: remote_media = remote_sdp.media[stream_index] self.remote_media = remote_media - self.remote_accept_types = remote_media.attributes.getfirst('accept-types', '').split() - self.remote_accept_wrapped_types = remote_media.attributes.getfirst('accept-wrapped-types', '').split() + self.remote_accept_types = remote_media.attributes.getfirst(b'accept-types', b'').decode().split() + self.remote_accept_wrapped_types = remote_media.attributes.getfirst(b'accept-wrapped-types', b'').decode().split() self.cpim_enabled = contains_mime_type(self.accept_types, 'message/cpim') and contains_mime_type(self.remote_accept_types, 'message/cpim') - remote_uri_path = remote_media.attributes.getfirst('path') + remote_uri_path = remote_media.attributes.getfirst(b'path') if remote_uri_path is None: raise AttributeError("remote SDP media does not have 'path' attribute") - full_remote_path = [URI.parse(uri) for uri in remote_uri_path.split()] + full_remote_path = [URI.parse(uri) for uri in remote_uri_path.decode().split()] remote_transport = 'tls' if full_remote_path[0].use_tls else 'tcp' if self.transport != remote_transport: raise MSRPStreamError("remote transport ('%s') different from local transport ('%s')" % (remote_transport, self.transport)) if isinstance(self.session.account, Account) and self.local_role == 'actpass': remote_setup = remote_media.attributes.getfirst('setup', 'passive') + remote_setup = remote_setup.decode() if remote_setup else None if remote_setup == 'passive': # If actpass is offered connectors are always started as passive # We need to switch to active if the remote answers with passive if self.session.account.msrp.connection_model == 'relay': self.msrp_connector.mode = 'active' else: local_uri = self.msrp_connector.local_uri logger = self.msrp_connector.logger self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.msrp_connector.prepare(local_uri) context = 'start' self.msrp = self.msrp_connector.complete(full_remote_path) if self.msrp_session_class is not None: self.msrp_session = self.msrp_session_class(self.msrp, accept_types=self.accept_types, on_incoming_cb=self._handle_incoming, automatic_reports=False) self.msrp_connector = None except Exception as e: self._failure_reason = str(e) notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context=context, reason=self._failure_reason)) else: notification_center.post_notification('MediaStreamDidStart', sender=self) finally: self.greenlet = None def deactivate(self): self.shutting_down = True @run_in_green_thread def end(self): if self._done: return self._done = True notification_center = NotificationCenter() if not self._initialize_done: # we are in the middle of initialize() try: msrp_connector = self.msrp_connector if self.greenlet is not None: api.kill(self.greenlet) if msrp_connector is not None: msrp_connector.cleanup() finally: notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason='Interrupted')) notification_center.remove_observer(self, sender=self) self.msrp_connector = None self.greenlet = None else: notification_center.post_notification('MediaStreamWillEnd', sender=self) msrp = self.msrp msrp_session = self.msrp_session msrp_connector = self.msrp_connector try: if self.greenlet is not None: api.kill(self.greenlet) if msrp_session is not None: msrp_session.shutdown() elif msrp is not None: msrp.loseConnection(wait=False) if msrp_connector is not None: msrp_connector.cleanup() finally: notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason)) notification_center.remove_observer(self, sender=self) self.msrp = None self.msrp_session = None self.msrp_connector = None self.session = None self.greenlet = None # noinspection PyMethodMayBeStatic,PyUnusedLocal def validate_update(self, remote_sdp, stream_index): return True # TODO def update(self, local_sdp, remote_sdp, stream_index): pass # TODO def hold(self): pass def unhold(self): pass def reset(self, stream_index): pass # Internal IObserver interface def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Internal message handlers def _handle_incoming(self, chunk=None, error=None): notification_center = NotificationCenter() if error is not None: if self.shutting_down and isinstance(error.value, ConnectionDone): return self._failure_reason = error.getErrorMessage() notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason)) elif chunk is not None: method_handler = getattr(self, '_handle_%s' % chunk.method, None) if method_handler is not None: method_handler(chunk) def _handle_REPORT(self, chunk): pass def _handle_SEND(self, chunk): pass # temporary solution. to be replaced later by a better logging system in msrplib -Dan # class ChunkInfo(object): __slots__ = 'content_type', 'header', 'footer', 'data' def __init__(self, content_type, header='', footer='', data=''): self.content_type = content_type self.header = header self.footer = footer self.data = data def __repr__(self): return "{0.__class__.__name__}(content_type={0.content_type!r}, header={0.header!r}, footer={0.footer!r}, data={0.data!r})".format(self) @property def content(self): return self.header + self.data + self.footer @property def normalized_content(self): if not self.data: return self.header + self.footer elif self.content_type == 'message/cpim': headers, sep, body = self.data.partition('\r\n\r\n') if not sep: return self.header + self.data + self.footer mime_headers, mime_sep, mime_body = body.partition('\n\n') if not mime_sep: return self.header + self.data + self.footer for mime_header in mime_headers.lower().splitlines(): if mime_header.startswith('content-type:'): wrapped_content_type = mime_header[13:].partition(';')[0].strip() break else: wrapped_content_type = None if wrapped_content_type is None or wrapped_content_type == 'application/im-iscomposing+xml' or wrapped_content_type.startswith(('text/', 'message/')): data = self.data else: data = headers + sep + mime_headers + mime_sep + '<<>>' return self.header + data + self.footer elif self.content_type is None or self.content_type == 'application/im-iscomposing+xml' or self.content_type.startswith(('text/', 'message/')): return self.header + self.data + self.footer else: return self.header + '<<>>' + self.footer class NotificationProxyLogger(object): def __init__(self): from application import log self.level = log.level self.notification_center = NotificationCenter() self.log_settings = SIPSimpleSettings().logs def received_chunk(self, data, transport): if self.log_settings.trace_msrp: chunk_info = ChunkInfo(data.content_type, header=data.chunk_header, footer=data.chunk_footer, data=data.data) notification_data = NotificationData(direction='incoming', local_address=transport.getHost(), remote_address=transport.getPeer(), data=chunk_info.normalized_content, illegal=False) self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data) def sent_chunk(self, data, transport): if self.log_settings.trace_msrp: chunk_info = ChunkInfo(data.content_type, header=data.encoded_header, footer=data.encoded_footer, data=data.data) notification_data = NotificationData(direction='outgoing', local_address=transport.getHost(), remote_address=transport.getPeer(), data=chunk_info.normalized_content, illegal=False) self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data) def received_illegal_data(self, data, transport): if self.log_settings.trace_msrp: notification_data = NotificationData(direction='incoming', local_address=transport.getHost(), remote_address=transport.getPeer(), data=data, illegal=True) self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data) def debug(self, message, *args, **kw): pass def info(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.INFO)) def warning(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.WARNING)) warn = warning def error(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.ERROR)) def exception(self, message='', *args, **kw): if self.log_settings.trace_msrp: message = message % args if args else message exception = traceback.format_exc() self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message + '\n' + exception if message else exception, level=self.level.ERROR)) def critical(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.CRITICAL)) fatal = critical from sipsimple.streams.msrp import chat, filetransfer, screensharing diff --git a/sipsimple/streams/msrp/chat.py b/sipsimple/streams/msrp/chat.py index 36d33cc6..437b2a49 100644 --- a/sipsimple/streams/msrp/chat.py +++ b/sipsimple/streams/msrp/chat.py @@ -1,900 +1,900 @@ """ This module provides classes to parse and generate SDP related to SIP sessions that negotiate Instant Messaging, including CPIM as defined in RFC3862 """ import pickle as pickle import codecs import os import random import re from application.python.descriptor import WriteOnceAttribute from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from application.system import openfile from collections import defaultdict from email.message import Message as EmailMessage from email.parser import Parser as EmailParser from eventlib.coros import queue from eventlib.proc import spawn, ProcExit from functools import partial from msrplib.protocol import FailureReportHeader, SuccessReportHeader, UseNicknameHeader from msrplib.session import MSRPSession, contains_mime_type from otr import OTRSession, OTRTransport, OTRState, SMPStatus from otr.cryptography import DSAPrivateKey from otr.exceptions import IgnoreMessage, UnencryptedMessage, EncryptedMessageError, OTRError from zope.interface import implementer from sipsimple.core import SIPURI, BaseSIPURI from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, State, LastActive, Refresh, ContentType from sipsimple.storage import ISIPSimpleApplicationDataStorage from sipsimple.streams import InvalidStreamError, UnknownStreamError from sipsimple.streams.msrp import MSRPStreamError, MSRPStreamBase from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import MultilingualText, ISOTimestamp __all__ = ['ChatStream', 'ChatStreamError', 'ChatIdentity', 'CPIMPayload', 'CPIMHeader', 'CPIMNamespace', 'CPIMParserError', 'OTRState', 'SMPStatus'] class OTRTrustedPeer(object): fingerprint = WriteOnceAttribute() # in order to be hashable this needs to be immutable def __init__(self, fingerprint, description='', **kw): if not isinstance(fingerprint, str): raise TypeError("fingerprint must be a string") self.fingerprint = fingerprint self.description = description self.__dict__.update(kw) def __hash__(self): return hash(self.fingerprint) def __eq__(self, other): if isinstance(other, OTRTrustedPeer): return self.fingerprint == other.fingerprint elif isinstance(other, str): return self.fingerprint == other else: return NotImplemented def __ne__(self, other): return not (self == other) def __repr__(self): return "{0.__class__.__name__}({0.fingerprint!r}, description={0.description!r})".format(self) def __reduce__(self): return self.__class__, (self.fingerprint,), self.__dict__ class OTRTrustedPeerSet(object): def __init__(self, iterable=()): self.__data__ = {} self.update(iterable) def __repr__(self): return "{}({})".format(self.__class__.__name__, list(self.__data__.values())) def __contains__(self, item): return item in self.__data__ def __getitem__(self, item): return self.__data__[item] def __iter__(self): return iter(list(self.__data__.values())) def __len__(self): return len(self.__data__) def get(self, item, default=None): return self.__data__.get(item, default) def add(self, item): if not isinstance(item, OTRTrustedPeer): raise TypeError("item should be and instance of OTRTrustedPeer") self.__data__[item.fingerprint] = item def remove(self, item): del self.__data__[item] def discard(self, item): self.__data__.pop(item, None) def update(self, iterable=()): for item in iterable: self.add(item) class OTRCache(object, metaclass=Singleton): def __init__(self): from sipsimple.application import SIPApplication if SIPApplication.storage is None: raise RuntimeError("Cannot access the OTR cache before SIPApplication.storage is defined") if ISIPSimpleApplicationDataStorage.providedBy(SIPApplication.storage): self.key_file = os.path.join(SIPApplication.storage.directory, 'otr.key') self.trusted_file = os.path.join(SIPApplication.storage.directory, 'otr.trusted') try: self.private_key = DSAPrivateKey.load(self.key_file) if self.private_key.key_size != 1024: raise ValueError except (EnvironmentError, ValueError): self.private_key = DSAPrivateKey.generate() self.private_key.save(self.key_file) try: self.trusted_peers = pickle.load(open(self.trusted_file, 'rb')) if not isinstance(self.trusted_peers, OTRTrustedPeerSet) or not all(isinstance(item, OTRTrustedPeer) for item in self.trusted_peers): raise ValueError("invalid OTR trusted peers file") except Exception: self.trusted_peers = OTRTrustedPeerSet() self.save() else: self.key_file = self.trusted_file = None self.private_key = DSAPrivateKey.generate() self.trusted_peers = OTRTrustedPeerSet() # def generate_private_key(self): # self.private_key = DSAPrivateKey.generate() # if self.key_file: # self.private_key.save(self.key_file) @run_in_thread('file-io') def save(self): if self.trusted_file is not None: with openfile(self.trusted_file, 'wb', permissions=0o600) as trusted_file: pickle.dump(self.trusted_peers, trusted_file) @implementer(IObserver) class OTREncryption(object): def __init__(self, stream): self.stream = stream self.otr_cache = OTRCache() self.otr_session = OTRSession(self.otr_cache.private_key, self.stream, supported_versions={3}) # we need at least OTR-v3 for question based SMP notification_center = NotificationCenter() notification_center.add_observer(self, sender=stream) notification_center.add_observer(self, sender=self.otr_session) @property def active(self): try: return self.otr_session.encrypted except AttributeError: return False @property def cipher(self): return 'AES-128-CTR' if self.active else None @property def key_fingerprint(self): try: return self.otr_session.local_private_key.public_key.fingerprint except AttributeError: return None @property def peer_fingerprint(self): try: return self.otr_session.remote_public_key.fingerprint except AttributeError: return None @property def peer_name(self): try: return self.__dict__['peer_name'] except KeyError: trusted_peer = self.otr_cache.trusted_peers.get(self.peer_fingerprint, None) if trusted_peer is None: return '' else: return self.__dict__.setdefault('peer_name', trusted_peer.description) @peer_name.setter def peer_name(self, name): old_name = self.peer_name new_name = self.__dict__['peer_name'] = name if old_name != new_name: trusted_peer = self.otr_cache.trusted_peers.get(self.peer_fingerprint, None) if trusted_peer is not None: trusted_peer.description = new_name self.otr_cache.save() notification_center = NotificationCenter() notification_center.post_notification("ChatStreamOTRPeerNameChanged", sender=self.stream, data=NotificationData(name=name)) @property def verified(self): return self.peer_fingerprint in self.otr_cache.trusted_peers @verified.setter def verified(self, value): peer_fingerprint = self.peer_fingerprint old_verified = peer_fingerprint in self.otr_cache.trusted_peers new_verified = bool(value) if peer_fingerprint is None or new_verified == old_verified: return if new_verified: self.otr_cache.trusted_peers.add(OTRTrustedPeer(peer_fingerprint, description=self.peer_name)) else: self.otr_cache.trusted_peers.remove(peer_fingerprint) self.otr_cache.save() notification_center = NotificationCenter() notification_center.post_notification("ChatStreamOTRVerifiedStateChanged", sender=self.stream, data=NotificationData(verified=new_verified)) @run_in_twisted_thread def start(self): if self.otr_session is not None: self.otr_session.start() @run_in_twisted_thread def stop(self): if self.otr_session is not None: self.otr_session.stop() @run_in_twisted_thread def smp_verify(self, secret, question=None): self.otr_session.smp_verify(secret, question) @run_in_twisted_thread def smp_answer(self, secret): self.otr_session.smp_answer(secret) @run_in_twisted_thread def smp_abort(self): self.otr_session.smp_abort() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidStart(self, notification): if self.stream.start_otr: self.otr_session.start() def _NH_MediaStreamDidEnd(self, notification): notification.center.remove_observer(self, sender=self.stream) notification.center.remove_observer(self, sender=self.otr_session) self.otr_session.stop() self.otr_session = None self.stream = None _NH_MediaStreamDidNotInitialize = _NH_MediaStreamDidEnd def _NH_OTRSessionStateChanged(self, notification): notification.center.post_notification('ChatStreamOTREncryptionStateChanged', sender=self.stream, data=notification.data) def _NH_OTRSessionSMPVerificationDidStart(self, notification): notification.center.post_notification('ChatStreamSMPVerificationDidStart', sender=self.stream, data=notification.data) def _NH_OTRSessionSMPVerificationDidNotStart(self, notification): notification.center.post_notification('ChatStreamSMPVerificationDidNotStart', sender=self.stream, data=notification.data) def _NH_OTRSessionSMPVerificationDidEnd(self, notification): notification.center.post_notification('ChatStreamSMPVerificationDidEnd', sender=self.stream, data=notification.data) class ChatStreamError(MSRPStreamError): pass class ChatStream(MSRPStreamBase): type = 'chat' priority = 1 msrp_session_class = MSRPSession media_type = 'message' accept_types = ['message/cpim', 'text/*', 'image/*', 'application/im-iscomposing+xml'] accept_wrapped_types = ['text/*', 'image/*', 'application/im-iscomposing+xml'] prefer_cpim = True start_otr = True def __init__(self): super(ChatStream, self).__init__(direction='sendrecv') self.message_queue = queue() self.sent_messages = set() self.incoming_queue = defaultdict(list) self.message_queue_thread = None self.encryption = OTREncryption(self) @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): remote_stream = remote_sdp.media[stream_index] - if remote_stream.media != 'message': + if remote_stream.media != b'message': raise UnknownStreamError expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport=='tls' else 'TCP/MSRP' - if remote_stream.transport != expected_transport: + if remote_stream.transport != expected_transport.encode(): raise InvalidStreamError("expected %s transport in chat stream, got %s" % (expected_transport, remote_stream.transport)) - if remote_stream.formats != ['*']: + if remote_stream.formats != [b'*']: raise InvalidStreamError("wrong format list specified") stream = cls() stream.remote_role = remote_stream.attributes.getfirst('setup', 'active') - if remote_stream.direction != 'sendrecv': + if remote_stream.direction != b'sendrecv': raise InvalidStreamError("Unsupported direction for chat stream: %s" % remote_stream.direction) - remote_accept_types = remote_stream.attributes.getfirst('accept-types') + remote_accept_types = remote_stream.attributes.getfirst(b'accept-types') if remote_accept_types is None: raise InvalidStreamError("remote SDP media does not have 'accept-types' attribute") - if not any(contains_mime_type(cls.accept_types, mime_type) for mime_type in remote_accept_types.split()): + if not any(contains_mime_type(cls.accept_types, mime_type) for mime_type in remote_accept_types.decode().split()): raise InvalidStreamError("no compatible media types found") return stream @property def local_identity(self): try: return ChatIdentity(self.session.local_identity.uri, self.session.local_identity.display_name) except AttributeError: return None @property def remote_identity(self): try: return ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name) except AttributeError: return None @property def private_messages_allowed(self): return 'private-messages' in self.chatroom_capabilities @property def nickname_allowed(self): return 'nickname' in self.chatroom_capabilities @property def chatroom_capabilities(self): try: if self.cpim_enabled and self.session.remote_focus: return ' '.join(self.remote_media.attributes.getall('chatroom')).split() except AttributeError: pass return [] def _NH_MediaStreamDidStart(self, notification): self.message_queue_thread = spawn(self._message_queue_handler) def _NH_MediaStreamDidNotInitialize(self, notification): message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream was closed') notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _NH_MediaStreamDidEnd(self, notification): if self.message_queue_thread is not None: self.message_queue_thread.kill() else: message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _handle_REPORT(self, chunk): # in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message. if chunk.message_id in self.sent_messages: self.sent_messages.remove(chunk.message_id) notification_center = NotificationCenter() data = NotificationData(message_id=chunk.message_id, message=chunk, code=chunk.status.code, reason=chunk.status.comment) if chunk.status.code == 200: notification_center.post_notification('ChatStreamDidDeliverMessage', sender=self, data=data) else: notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _handle_SEND(self, chunk): if chunk.size == 0: # keep-alive self.msrp_session.send_report(chunk, 200, 'OK') return content_type = chunk.content_type.lower() if not contains_mime_type(self.accept_types, content_type): self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if chunk.contflag == '#': self.incoming_queue.pop(chunk.message_id, None) self.msrp_session.send_report(chunk, 200, 'OK') return elif chunk.contflag == '+': self.incoming_queue[chunk.message_id].append(chunk.data) self.msrp_session.send_report(chunk, 200, 'OK') return else: data = ''.join(self.incoming_queue.pop(chunk.message_id, [])) + chunk.data if content_type == 'message/cpim': try: payload = CPIMPayload.decode(data) except CPIMParserError: self.msrp_session.send_report(chunk, 400, 'CPIM Parser Error') return else: message = Message(**{name: getattr(payload, name) for name in Message.__slots__}) if not contains_mime_type(self.accept_wrapped_types, message.content_type): self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if message.timestamp is None: message.timestamp = ISOTimestamp.now() if message.sender is None: message.sender = self.remote_identity private = self.session.remote_focus and len(message.recipients) == 1 and message.recipients[0] != self.remote_identity else: payload = SimplePayload.decode(data, content_type) message = Message(payload.content, payload.content_type, sender=self.remote_identity, recipients=[self.local_identity], timestamp=ISOTimestamp.now()) private = False try: message.content = self.encryption.otr_session.handle_input(message.content.encode(), message.content_type) except IgnoreMessage: self.msrp_session.send_report(chunk, 200, 'OK') return except UnencryptedMessage: encrypted = False encryption_active = True except EncryptedMessageError as e: self.msrp_session.send_report(chunk, 400, str(e)) notification_center = NotificationCenter() notification_center.post_notification('ChatStreamOTRError', sender=self, data=NotificationData(error=str(e))) return except OTRError as e: self.msrp_session.send_report(chunk, 200, 'OK') notification_center = NotificationCenter() notification_center.post_notification('ChatStreamOTRError', sender=self, data=NotificationData(error=str(e))) return else: encrypted = encryption_active = self.encryption.active if payload.charset is not None: message.content = message.content.decode(payload.charset) elif payload.content_type.startswith('text/'): message.content.decode('utf8') notification_center = NotificationCenter() if message.content_type.lower() == IsComposingDocument.content_type: try: document = IsComposingDocument.parse(message.content) except ParserError as e: self.msrp_session.send_report(chunk, 400, str(e)) return self.msrp_session.send_report(chunk, 200, 'OK') data = NotificationData(state=document.state.value, refresh=document.refresh.value if document.refresh is not None else 120, content_type=document.content_type.value if document.content_type is not None else None, last_active=document.last_active.value if document.last_active is not None else None, sender=message.sender, recipients=message.recipients, private=private, encrypted=encrypted, encryption_active=encryption_active) notification_center.post_notification('ChatStreamGotComposingIndication', sender=self, data=data) else: self.msrp_session.send_report(chunk, 200, 'OK') data = NotificationData(message=message, private=private, encrypted=encrypted, encryption_active=encryption_active) notification_center.post_notification('ChatStreamGotMessage', sender=self, data=data) def _on_transaction_response(self, message_id, response): if message_id in self.sent_messages and response.code != 200: self.sent_messages.remove(message_id) data = NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment) NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _on_nickname_transaction_response(self, message_id, response): notification_center = NotificationCenter() if response.code == 200: notification_center.post_notification('ChatStreamDidSetNickname', sender=self, data=NotificationData(message_id=message_id, response=response)) else: notification_center.post_notification('ChatStreamDidNotSetNickname', sender=self, data=NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment)) def _message_queue_handler(self): notification_center = NotificationCenter() try: while True: message = self.message_queue.wait() if self.msrp_session is None: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) break try: if isinstance(message.content, str): message.content = message.content.encode('utf8') charset = 'utf8' else: charset = None if not isinstance(message, QueuedOTRInternalMessage): try: message.content = self.encryption.otr_session.handle_output(message.content, message.content_type) except OTRError as e: raise ChatStreamError(str(e)) message.sender = message.sender or self.local_identity message.recipients = message.recipients or [self.remote_identity] # check if we MUST use CPIM need_cpim = (message.sender != self.local_identity or message.recipients != [self.remote_identity] or message.courtesy_recipients or message.subject or message.timestamp or message.required or message.additional_headers) if need_cpim or not contains_mime_type(self.remote_accept_types, message.content_type): if not contains_mime_type(self.remote_accept_wrapped_types, message.content_type): raise ChatStreamError('Unsupported content_type for outgoing message: %r' % message.content_type) if not self.cpim_enabled: raise ChatStreamError('Additional message meta-data cannot be sent, because the CPIM wrapper is not used') if not self.private_messages_allowed and message.recipients != [self.remote_identity]: raise ChatStreamError('The remote end does not support private messages') if message.timestamp is None: message.timestamp = ISOTimestamp.now() payload = CPIMPayload(charset=charset, **{name: getattr(message, name) for name in Message.__slots__}) elif self.prefer_cpim and self.cpim_enabled and contains_mime_type(self.remote_accept_wrapped_types, message.content_type): if message.timestamp is None: message.timestamp = ISOTimestamp.now() payload = CPIMPayload(charset=charset, **{name: getattr(message, name) for name in Message.__slots__}) else: payload = SimplePayload(message.content, message.content_type, charset) except ChatStreamError as e: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason=e.args[0]) notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) continue else: content, content_type = payload.encode() message_id = message.id notify_progress = message.notify_progress report = 'yes' if notify_progress else 'no' chunk = self.msrp_session.make_message(content, content_type=content_type, message_id=message_id) chunk.add_header(FailureReportHeader(report)) chunk.add_header(SuccessReportHeader(report)) try: self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_transaction_response, message_id)) except Exception as e: if notify_progress: data = NotificationData(message_id=message_id, message=None, code=0, reason=str(e)) notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) except ProcExit: if notify_progress: data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) raise else: if notify_progress: self.sent_messages.add(message_id) notification_center.post_notification('ChatStreamDidSendMessage', sender=self, data=NotificationData(message=chunk)) finally: self.message_queue_thread = None while self.sent_messages: message_id = self.sent_messages.pop() data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) @run_in_twisted_thread def _enqueue_message(self, message): if self._done: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) else: self.message_queue.send(message) @run_in_green_thread def _set_local_nickname(self, nickname, message_id): if self.msrp_session is None: # should we generate ChatStreamDidNotSetNickname here? return chunk = self.msrp.make_request('NICKNAME') chunk.add_header(UseNicknameHeader(nickname or '')) try: self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_nickname_transaction_response, message_id)) except Exception as e: self._failure_reason = str(e) NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='sending', reason=self._failure_reason)) def inject_otr_message(self, data): message = QueuedOTRInternalMessage(data) self._enqueue_message(message) def send_message(self, content, content_type='text/plain', recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None): message = QueuedMessage(content, content_type, recipients=recipients, courtesy_recipients=courtesy_recipients, subject=subject, timestamp=timestamp, required=required, additional_headers=additional_headers, notify_progress=True) self._enqueue_message(message) return message.id def send_composing_indication(self, state, refresh=None, last_active=None, recipients=None): content = IsComposingDocument.create(state=State(state), refresh=Refresh(refresh) if refresh is not None else None, last_active=LastActive(last_active) if last_active is not None else None, content_type=ContentType('text')) message = QueuedMessage(content, IsComposingDocument.content_type, recipients=recipients, notify_progress=False) self._enqueue_message(message) return message.id def set_local_nickname(self, nickname): if not self.nickname_allowed: raise ChatStreamError('Setting nickname is not supported') message_id = '%x' % random.getrandbits(64) self._set_local_nickname(nickname, message_id) return message_id OTRTransport.register(ChatStream) # Chat related objects, including CPIM support as defined in RFC3862 # class ChatIdentity(object): _format_re = re.compile(r'^(?:"?(?P[^<]*[^"\s])"?)?\s*<(?Psips?:.+)>$') def __init__(self, uri, display_name=None): self.uri = uri self.display_name = display_name def __eq__(self, other): if isinstance(other, ChatIdentity): return self.uri.user == other.uri.user and self.uri.host == other.uri.host elif isinstance(other, BaseSIPURI): return self.uri.user == other.user and self.uri.host == other.host elif isinstance(other, str): try: other_uri = SIPURI.parse(other) except Exception: return False else: return self.uri.user == other_uri.user and self.uri.host == other_uri.host else: return NotImplemented def __ne__(self, other): return not (self == other) def __repr__(self): return '{0.__class__.__name__}(uri={0.uri!r}, display_name={0.display_name!r})'.format(self) def __str__(self): if self.display_name: return '{0.display_name} <{0.uri}>'.format(self) else: return '<{0.uri}>'.format(self) @classmethod def parse(cls, value): match = cls._format_re.match(value) if match is None: raise ValueError('Cannot parse identity value: %r' % value) return cls(SIPURI.parse(match.group('uri')), match.group('display_name')) class Message(object): __slots__ = 'content', 'content_type', 'sender', 'recipients', 'courtesy_recipients', 'subject', 'timestamp', 'required', 'additional_headers' def __init__(self, content, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None): self.content = content self.content_type = content_type self.sender = sender self.recipients = recipients or [] self.courtesy_recipients = courtesy_recipients or [] self.subject = subject self.timestamp = ISOTimestamp(timestamp) if timestamp is not None else None self.required = required or [] self.additional_headers = additional_headers or [] class QueuedMessage(Message): __slots__ = 'id', 'notify_progress' def __init__(self, content, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None, id=None, notify_progress=True): super(QueuedMessage, self).__init__(content, content_type, sender, recipients, courtesy_recipients, subject, timestamp, required, additional_headers) self.id = id or '%x' % random.getrandbits(64) self.notify_progress = notify_progress class QueuedOTRInternalMessage(QueuedMessage): def __init__(self, content): super(QueuedOTRInternalMessage, self).__init__(content, 'text/plain', notify_progress=False) class SimplePayload(object): def __init__(self, content, content_type, charset=None): if not isinstance(content, bytes): raise TypeError("content should be an instance of bytes") self.content = content self.content_type = content_type self.charset = charset def encode(self): if self.charset is not None: return self.content, '{0.content_type}; charset="{0.charset}"'.format(self) else: return self.content, str(self.content_type) @classmethod def decode(cls, content, content_type): if not isinstance(content, bytes): raise TypeError("content should be an instance of bytes") type_helper = EmailParser().parsestr('Content-Type: {}'.format(content_type)) content_type = type_helper.get_content_type() charset = type_helper.get_content_charset() return cls(content, content_type, charset) class CPIMPayload(object): standard_namespace = 'urn:ietf:params:cpim-headers:' headers_re = re.compile(r'(?:([^:]+?)\.)?(.+?):\s*(.+?)(?:\r\n|$)') subject_re = re.compile(r'^(?:;lang=([a-z]{1,8}(?:-[a-z0-9]{1,8})*)\s+)?(.*)$') namespace_re = re.compile(r'^(?:(\S+) ?)?<(.*)>$') def __init__(self, content, content_type, charset=None, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None): self.content = content self.content_type = content_type self.charset = charset self.sender = sender self.recipients = recipients or [] self.courtesy_recipients = courtesy_recipients or [] self.subject = subject if isinstance(subject, (MultilingualText, type(None))) else MultilingualText(subject) self.timestamp = ISOTimestamp(timestamp) if timestamp is not None else None self.required = required or [] self.additional_headers = additional_headers or [] def encode(self): namespaces = {'': CPIMNamespace(self.standard_namespace)} header_list = [] if self.sender is not None: header_list.append('From: {}'.format(self.sender)) header_list.extend('To: {}'.format(recipient) for recipient in self.recipients) header_list.extend('cc: {}'.format(recipient) for recipient in self.courtesy_recipients) if self.subject is not None: header_list.append('Subject: {}'.format(self.subject)) header_list.extend('Subject:;lang={} {}'.format(language, translation) for language, translation in list(self.subject.translations.items())) if self.timestamp is not None: header_list.append('DateTime: {}'.format(self.timestamp)) if self.required: header_list.append('Required: {}'.format(','.join(self.required))) for header in self.additional_headers: if namespaces.get(header.namespace.prefix) != header.namespace: if header.namespace.prefix: header_list.append('NS: {0.namespace.prefix} <{0.namespace}>'.format(header)) else: header_list.append('NS: <{0.namespace}>'.format(header)) namespaces[header.namespace.prefix] = header.namespace if header.namespace.prefix: header_list.append('{0.namespace.prefix}.{0.name}: {0.value}'.format(header)) else: header_list.append('{0.name}: {0.value}'.format(header)) headers = '\r\n'.join(header_list) mime_message = EmailMessage() mime_message.set_payload(self.content) mime_message.set_type(self.content_type) if self.charset is not None: mime_message.set_param('charset', self.charset) return headers + '\r\n\r\n' + mime_message.as_string(), 'message/cpim' @classmethod def decode(cls, message): headers, separator, body = message.partition('\r\n\r\n') if not separator: raise CPIMParserError('Invalid CPIM message') sender = None recipients = [] courtesy_recipients = [] subject = None timestamp = None required = [] additional_headers = [] namespaces = {'': CPIMNamespace(cls.standard_namespace)} subjects = {} for prefix, name, value in cls.headers_re.findall(headers): namespace = namespaces.get(prefix) if namespace is None or '.' in name: continue try: #value = value.decode('cpim-header') if namespace == cls.standard_namespace: if name == 'From': sender = ChatIdentity.parse(value) elif name == 'To': recipients.append(ChatIdentity.parse(value)) elif name == 'cc': courtesy_recipients.append(ChatIdentity.parse(value)) elif name == 'Subject': match = cls.subject_re.match(value) if match is None: raise ValueError('Illegal Subject header: %r' % value) lang, subject = match.groups() # language tags must be ASCII subjects[str(lang) if lang is not None else None] = subject elif name == 'DateTime': timestamp = ISOTimestamp(value) elif name == 'Required': required.extend(re.split(r'\s*,\s*', value)) elif name == 'NS': match = cls.namespace_re.match(value) if match is None: raise ValueError('Illegal NS header: %r' % value) prefix, uri = match.groups() namespaces[prefix] = CPIMNamespace(uri, prefix) else: additional_headers.append(CPIMHeader(name, namespace, value)) else: additional_headers.append(CPIMHeader(name, namespace, value)) except ValueError: pass if None in subjects: subject = MultilingualText(subjects.pop(None), **subjects) elif subjects: subject = MultilingualText(**subjects) mime_message = EmailParser().parsestr(body) content_type = mime_message.get_content_type() if content_type is None: raise CPIMParserError("CPIM message missing Content-Type MIME header") content = mime_message.get_payload() charset = mime_message.get_content_charset() return cls(content, content_type, charset, sender, recipients, courtesy_recipients, subject, timestamp, required, additional_headers) class CPIMParserError(Exception): pass class CPIMNamespace(str): def __new__(cls, value, prefix=''): obj = str.__new__(cls, value) obj.prefix = prefix return obj class CPIMHeader(object): def __init__(self, name, namespace, value): self.name = name self.namespace = namespace self.value = value class CPIMCodec(codecs.Codec): character_map = {c: '\\u{:04x}'.format(c) for c in list(range(32)) + [127]} character_map[ord('\\')] = '\\\\' @classmethod def encode(cls, input, errors='strict'): return input.translate(cls.character_map).encode('utf-8', errors), len(input) @classmethod def decode(cls, input, errors='strict'): return input.decode('utf-8', errors).encode('raw-unicode-escape', errors).decode('unicode-escape', errors), len(input) def cpim_codec_search(name): if name.lower() in ('cpim-header', 'cpim_header'): return codecs.CodecInfo(name='CPIM-header', encode=CPIMCodec.encode, decode=CPIMCodec.decode, incrementalencoder=codecs.IncrementalEncoder, incrementaldecoder=codecs.IncrementalDecoder, streamwriter=codecs.StreamWriter, streamreader=codecs.StreamReader) codecs.register(cpim_codec_search) del cpim_codec_search diff --git a/sipsimple/streams/msrp/filetransfer.py b/sipsimple/streams/msrp/filetransfer.py index d096e98d..225a02b2 100644 --- a/sipsimple/streams/msrp/filetransfer.py +++ b/sipsimple/streams/msrp/filetransfer.py @@ -1,735 +1,735 @@ """ This module provides classes to parse and generate SDP related to SIP sessions that negotiate File Transfer. """ __all__ = ['FileTransferStream', 'FileSelector'] import pickle as pickle import hashlib import mimetypes import os import random import re import time import uuid from abc import ABCMeta, abstractmethod from application.notification import NotificationCenter, NotificationData, IObserver from application.python.threadpool import ThreadPool, run_in_threadpool from application.python.types import MarkerType from application.system import FileExistsError, makedirs, openfile, unlink from itertools import count from msrplib.protocol import FailureReportHeader, SuccessReportHeader, ContentTypeHeader, IntegerHeaderType, MSRPNamedHeader, HeaderParsingError from msrplib.session import MSRPSession from msrplib.transport import make_response from queue import Queue from threading import Event, Lock from zope.interface import implementer from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SDPAttribute from sipsimple.storage import ISIPSimpleApplicationDataStorage from sipsimple.streams import InvalidStreamError, UnknownStreamError from sipsimple.streams.msrp import MSRPStreamBase from sipsimple.threading import run_in_twisted_thread, run_in_thread from sipsimple.util import sha1 HASH = type(hashlib.sha1()) class RandomID(metaclass=MarkerType): pass class FileSelectorHash(str): _hash_re = re.compile(r'^sha-1(:[0-9A-F]{2}){20}$') _byte_re = re.compile(r'..') def __new__(cls, value): if isinstance(value, str): if value.startswith('sha1:'): # backward compatibility hack (sort of). value = 'sha-1' + value[len('sha1'):] if not cls._hash_re.match(value): raise ValueError("Invalid hash value: {!r}".format(value)) return super(FileSelectorHash, cls).__new__(cls, value) elif isinstance(value, (HASH, sha1)): return super(FileSelectorHash, cls).__new__(cls, cls.encode_hash(value)) else: raise ValueError("Invalid hash value: {!r}".format(value)) def __eq__(self, other): if isinstance(other, str): return super(FileSelectorHash, self).__eq__(other) elif isinstance(other, (HASH, sha1)) and other.name.lower() == 'sha1': return super(FileSelectorHash, self).__eq__(self.encode_hash(other)) else: return NotImplemented def __ne__(self, other): return not self == other @classmethod def encode_hash(cls, hash_instance): if hash_instance.name.lower() != 'sha1': raise TypeError("Invalid hash type: {.name} (only sha1 hashes are supported).".format(hash_instance)) # unexpected as it may be, using a regular expression is the fastest method to do this return 'sha-1:' + ':'.join(cls._byte_re.findall(hash_instance.hexdigest().upper())) class FileSelector(object): _name_re = re.compile(r'name:"([^"]+)"') _size_re = re.compile(r'size:(\d+)') _type_re = re.compile(r'type:([^ ]+)') _hash_re = re.compile(r'hash:([^ ]+)') def __init__(self, name=None, type=None, size=None, hash=None, fd=None): # If present, hash should be a sha1 object or a string in the form: sha-1:72:24:5F:E8:65:3D:DA:F3:71:36:2F:86:D4:71:91:3E:E4:A2:CE:2E # According to the specification, only sha1 is supported ATM. self.name = name self.type = type self.size = size self.hash = hash self.fd = fd @property def hash(self): return self.__dict__['hash'] @hash.setter def hash(self, value): self.__dict__['hash'] = None if value is None else FileSelectorHash(value) @classmethod def parse(cls, string): name_match = cls._name_re.search(string) size_match = cls._size_re.search(string) type_match = cls._type_re.search(string) hash_match = cls._hash_re.search(string) name = name_match and name_match.group(1).decode('utf-8') size = size_match and int(size_match.group(1)) type = type_match and type_match.group(1) hash = hash_match and hash_match.group(1) return cls(name, type, size, hash) @classmethod def for_file(cls, path, type=None, hash=None): name = str(path) fd = open(name, 'rb') size = os.fstat(fd.fileno()).st_size if type is None: mime_type, encoding = mimetypes.guess_type(name) if encoding is not None: type = 'application/x-%s' % encoding elif mime_type is not None: type = mime_type else: type = 'application/octet-stream' return cls(name, type, size, hash, fd) @property def sdp_repr(self): items = [('name', self.name and '"%s"' % os.path.basename(self.name).encode('utf-8')), ('type', self.type), ('size', self.size), ('hash', self.hash)] return ' '.join('%s:%s' % (name, value) for name, value in items if value is not None) class UniqueFilenameGenerator(object): @classmethod def generate(cls, name): yield name prefix, extension = os.path.splitext(name) for x in count(1): yield "%s-%d%s" % (prefix, x, extension) class FileMetadataEntry(object): def __init__(self, hash, filename, partial_hash=None): self.hash = hash self.filename = filename self.mtime = os.path.getmtime(self.filename) self.partial_hash = partial_hash @classmethod def from_selector(cls, file_selector): return cls(file_selector.hash.lower(), file_selector.name) class FileTransfersMetadata(object): __filename__ = 'transfer_metadata' __lifetime__ = 60*60*24*7 def __init__(self): self.data = {} self.lock = Lock() self.loaded = False self.directory = None def _load(self): if self.loaded: return from sipsimple.application import SIPApplication if ISIPSimpleApplicationDataStorage.providedBy(SIPApplication.storage): self.directory = SIPApplication.storage.directory if self.directory is not None: try: with open(os.path.join(self.directory, self.__filename__), 'rb') as f: data = pickle.loads(f.read()) except Exception: data = {} now = time.time() for hash, entry in list(data.items()): try: mtime = os.path.getmtime(entry.filename) except OSError: data.pop(hash) else: if mtime != entry.mtime or now - mtime > self.__lifetime__: data.pop(hash) self.data.update(data) self.loaded = True @run_in_thread('file-io') def _save(self, data): if self.directory is not None: with open(os.path.join(self.directory, self.__filename__), 'wb') as f: f.write(data) def __enter__(self): self.lock.acquire() self._load() return self.data def __exit__(self, exc_type, exc_val, exc_tb): if None is exc_type is exc_val is exc_tb: self._save(pickle.dumps(self.data)) self.lock.release() @implementer(IObserver) class FileTransferHandler(object, metaclass=ABCMeta): threadpool = ThreadPool(name='FileTransfers', min_threads=0, max_threads=100) threadpool.start() def __init__(self): self.stream = None self.session = None self._started = False self._session_started = False self._initialize_done = False self._initialize_successful = False def initialize(self, stream, session): self.stream = stream self.session = session notification_center = NotificationCenter() notification_center.add_observer(self, sender=stream) notification_center.add_observer(self, sender=session) notification_center.add_observer(self, sender=self) @property def filename(self): return self.stream.file_selector.name if self.stream is not None else None @abstractmethod def start(self): raise NotImplementedError @abstractmethod def end(self): raise NotImplementedError @abstractmethod def process_chunk(self, chunk): raise NotImplementedError def __terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, sender=self) try: self.stream.file_selector.fd.close() except AttributeError: # when self.stream.file_selector.fd is None pass except IOError: # we can get this if we try to close while another thread is reading from it pass self.stream = None self.session = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, None) if handler is not None: handler(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if not self._initialize_done: self.end() self.__terminate() def _NH_MediaStreamDidStart(self, notification): self._started = True self.start() def _NH_MediaStreamWillEnd(self, notification): if self._started: self.end() elif self._session_started: notification.center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Refused')) def _NH_SIPSessionWillStart(self, notification): self._session_started = True def _NH_SIPSessionDidFail(self, notification): if not self._session_started and self._initialize_successful: if notification.data.code == 487: reason = 'Cancelled' else: reason = notification.data.reason or 'Failed' notification.center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=reason)) def _NH_FileTransferHandlerDidInitialize(self, notification): self._initialize_done = True self._initialize_successful = True def _NH_FileTransferHandlerDidNotInitialize(self, notification): self._initialize_done = True self._initialize_successful = False def _NH_FileTransferHandlerDidEnd(self, notification): self.__terminate() class OffsetHeader(MSRPNamedHeader): name = 'Offset' type = IntegerHeaderType class EndTransfer(metaclass=MarkerType): pass class IncomingFileTransferHandler(FileTransferHandler): metadata = FileTransfersMetadata() def __init__(self): super(IncomingFileTransferHandler, self).__init__() self.hash = sha1() self.queue = Queue() self.offset = 0 self.received_chunks = 0 @property def save_directory(self): return self.__dict__.get('save_directory') @save_directory.setter def save_directory(self, value): if self.stream is not None: raise AttributeError('cannot set save_directory, transfer is in progress') self.__dict__['save_directory'] = value def initialize(self, stream, session): super(IncomingFileTransferHandler, self).initialize(stream, session) try: directory = self.save_directory or SIPSimpleSettings().file_transfer.directory.normalized makedirs(directory) with self.metadata as metadata: try: prev_file = metadata.pop(stream.file_selector.hash.lower()) mtime = os.path.getmtime(prev_file.filename) if mtime != prev_file.mtime: raise ValueError('file was modified') filename = os.path.join(directory, os.path.basename(stream.file_selector.name)) try: os.link(prev_file.filename, filename) except (AttributeError, OSError): stream.file_selector.name = prev_file.filename else: stream.file_selector.name = filename unlink(prev_file.filename) stream.file_selector.fd = openfile(stream.file_selector.name, 'ab') # open doesn't seek to END in append mode on win32 until first write, but openfile does self.offset = stream.file_selector.fd.tell() self.hash = prev_file.partial_hash except (KeyError, EnvironmentError, ValueError): for name in UniqueFilenameGenerator.generate(os.path.join(directory, os.path.basename(stream.file_selector.name))): try: stream.file_selector.fd = openfile(name, 'xb') except FileExistsError: continue else: stream.file_selector.name = name break except Exception as e: NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason=str(e))) else: NotificationCenter().post_notification('FileTransferHandlerDidInitialize', sender=self) def end(self): self.queue.put(EndTransfer) def process_chunk(self, chunk): if chunk.method == 'SEND': if not self.received_chunks and chunk.byte_range.start == 1: self.stream.file_selector.fd.truncate(0) self.stream.file_selector.fd.seek(0) self.hash = sha1() self.offset = 0 self.received_chunks += 1 self.queue.put(chunk) elif chunk.method == 'FILE_OFFSET': if self.received_chunks > 0: response = make_response(chunk, 413, 'Unwanted message') else: offset = self.stream.file_selector.fd.tell() response = make_response(chunk, 200, 'OK') response.add_header(OffsetHeader(offset)) self.stream.msrp_session.send_chunk(response) @run_in_threadpool(FileTransferHandler.threadpool) def start(self): notification_center = NotificationCenter() notification_center.post_notification('FileTransferHandlerDidStart', sender=self) file_selector = self.stream.file_selector fd = file_selector.fd while True: chunk = self.queue.get() if chunk is EndTransfer: break try: fd.write(chunk.data) except EnvironmentError as e: fd.close() notification_center.post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e))) notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=str(e))) return self.hash.update(chunk.data) self.offset += chunk.size transferred_bytes = chunk.byte_range.start + chunk.size - 1 total_bytes = file_selector.size = chunk.byte_range.total notification_center.post_notification('FileTransferHandlerProgress', sender=self, data=NotificationData(transferred_bytes=transferred_bytes, total_bytes=total_bytes)) if transferred_bytes == total_bytes: break fd.close() # Transfer is finished if self.offset != self.stream.file_selector.size: notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Incomplete file')) return if self.hash != self.stream.file_selector.hash: unlink(self.filename) # something got corrupted, better delete the file notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='File hash mismatch')) return notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=False, reason=None)) def _NH_MediaStreamDidNotInitialize(self, notification): if self.stream.file_selector.fd is not None: position = self.stream.file_selector.fd.tell() self.stream.file_selector.fd.close() if position == 0: unlink(self.stream.file_selector.name) super(IncomingFileTransferHandler, self)._NH_MediaStreamDidNotInitialize(notification) def _NH_FileTransferHandlerDidEnd(self, notification): if notification.data.error and self.stream.file_selector.hash is not None: if os.path.getsize(self.stream.file_selector.name) == 0: unlink(self.stream.file_selector.name) else: with self.metadata as metadata: entry = FileMetadataEntry.from_selector(self.stream.file_selector) entry.partial_hash = self.hash metadata[entry.hash] = entry super(IncomingFileTransferHandler, self)._NH_FileTransferHandlerDidEnd(notification) class OutgoingFileTransferHandler(FileTransferHandler): file_part_size = 64*1024 def __init__(self): super(OutgoingFileTransferHandler, self).__init__() self.stop_event = Event() self.finished_event = Event() self.file_offset_event = Event() self.message_id = '%x' % random.getrandbits(64) self.offset = 0 def initialize(self, stream, session): super(OutgoingFileTransferHandler, self).initialize(stream, session) if stream.file_selector.fd is None: NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='file descriptor not specified')) return if stream.file_selector.size == 0: NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='file is empty')) return if stream.file_selector.hash is None: self._calculate_file_hash() else: NotificationCenter().post_notification('FileTransferHandlerDidInitialize', sender=self) @run_in_threadpool(FileTransferHandler.threadpool) def _calculate_file_hash(self): file_hash = hashlib.sha1() processed = 0 notification_center = NotificationCenter() notification_center.post_notification('FileTransferHandlerHashProgress', sender=self, data=NotificationData(processed=0, total=self.stream.file_selector.size)) file_selector = self.stream.file_selector fd = file_selector.fd while not self.stop_event.is_set(): try: content = fd.read(self.file_part_size) except EnvironmentError as e: fd.close() notification_center.post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason=str(e))) return if not content: file_selector.hash = file_hash notification_center.post_notification('FileTransferHandlerDidInitialize', sender=self) break file_hash.update(content) processed += len(content) notification_center.post_notification('FileTransferHandlerHashProgress', sender=self, data=NotificationData(processed=processed, total=file_selector.size)) else: fd.close() notification_center.post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='Interrupted transfer')) def end(self): self.stop_event.set() self.file_offset_event.set() # in case we are busy waiting on it @run_in_threadpool(FileTransferHandler.threadpool) def start(self): notification_center = NotificationCenter() notification_center.post_notification('FileTransferHandlerDidStart', sender=self) if self.stream.file_offset_supported: self._send_file_offset_chunk() self.file_offset_event.wait() finished = False failure_reason = None fd = self.stream.file_selector.fd fd.seek(self.offset) try: while not self.stop_event.is_set(): try: data = fd.read(self.file_part_size) except EnvironmentError as e: failure_reason = str(e) break if not data: finished = True break self._send_chunk(data) finally: fd.close() if not finished: notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=failure_reason or 'Interrupted transfer')) return # Wait until the stream ends or we get all reports self.stop_event.wait() if self.finished_event.is_set(): notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=False, reason=None)) else: notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Incomplete transfer')) def _on_transaction_response(self, response): if self.stop_event.is_set(): return if response.code != 200: NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=response.comment)) self.end() @run_in_twisted_thread def _send_chunk(self, data): if self.stop_event.is_set(): return data_len = len(data) chunk = self.stream.msrp.make_send_request(message_id=self.message_id, data=data, start=self.offset+1, end=self.offset+data_len, length=self.stream.file_selector.size) chunk.add_header(ContentTypeHeader(self.stream.file_selector.type)) chunk.add_header(SuccessReportHeader('yes')) chunk.add_header(FailureReportHeader('yes')) try: self.stream.msrp_session.send_chunk(chunk, response_cb=self._on_transaction_response) except Exception as e: NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e))) else: self.offset += data_len @run_in_twisted_thread def _send_file_offset_chunk(self): def response_cb(response): if not self.stop_event.is_set() and response.code == 200: try: offset = response.headers['Offset'].decoded except (KeyError, HeaderParsingError): offset = 0 self.offset = offset self.file_offset_event.set() if self.stop_event.is_set(): self.file_offset_event.set() return chunk = self.stream.msrp.make_request('FILE_OFFSET') # TODO: _ is illegal in MSRP method names according to RFC 4975 try: self.stream.msrp_session.send_chunk(chunk, response_cb=response_cb) except Exception as e: NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e))) def process_chunk(self, chunk): # here we process the REPORT chunks notification_center = NotificationCenter() if chunk.status.code == 200: transferred_bytes = chunk.byte_range.end total_bytes = chunk.byte_range.total notification_center.post_notification('FileTransferHandlerProgress', sender=self, data=NotificationData(transferred_bytes=transferred_bytes, total_bytes=total_bytes)) if transferred_bytes == total_bytes: self.finished_event.set() self.end() else: notification_center.post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=chunk.status.comment)) self.end() class FileTransferMSRPSession(MSRPSession): def _handle_incoming_FILE_OFFSET(self, chunk): self._on_incoming_cb(chunk) class FileTransferStream(MSRPStreamBase): type = 'file-transfer' priority = 10 msrp_session_class = FileTransferMSRPSession media_type = 'message' accept_types = ['*'] accept_wrapped_types = None IncomingTransferHandler = IncomingFileTransferHandler OutgoingTransferHandler = OutgoingFileTransferHandler def __init__(self, file_selector, direction, transfer_id=RandomID): if direction not in ('sendonly', 'recvonly'): raise ValueError("direction must be one of 'sendonly' or 'recvonly'") super(FileTransferStream, self).__init__(direction=direction) self.file_selector = file_selector self.transfer_id = transfer_id if transfer_id is not RandomID else str(uuid.uuid4()) if direction == 'sendonly': self.handler = self.OutgoingTransferHandler() else: self.handler = self.IncomingTransferHandler() @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): remote_stream = remote_sdp.media[stream_index] - if remote_stream.media != 'message' or 'file-selector' not in remote_stream.attributes: + if remote_stream.media != b'message' or b'file-selector' not in remote_stream.attributes: raise UnknownStreamError expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport == 'tls' else 'TCP/MSRP' if remote_stream.transport != expected_transport: raise InvalidStreamError("expected %s transport in file transfer stream, got %s" % (expected_transport, remote_stream.transport)) if remote_stream.formats != ['*']: raise InvalidStreamError("wrong format list specified") try: file_selector = FileSelector.parse(remote_stream.attributes.getfirst('file-selector')) except Exception as e: raise InvalidStreamError("error parsing file-selector: {}".format(e)) transfer_id = remote_stream.attributes.getfirst('file-transfer-id', None) if remote_stream.direction == 'sendonly': stream = cls(file_selector, 'recvonly', transfer_id) elif remote_stream.direction == 'recvonly': stream = cls(file_selector, 'sendonly', transfer_id) else: raise InvalidStreamError("wrong stream direction specified") stream.remote_role = remote_stream.attributes.getfirst('setup', 'active') return stream def initialize(self, session, direction): self._initialize_args = session, direction NotificationCenter().add_observer(self, sender=self.handler) self.handler.initialize(self, session) def _create_local_media(self, uri_path): local_media = super(FileTransferStream, self)._create_local_media(uri_path) - local_media.attributes.append(SDPAttribute('file-selector', self.file_selector.sdp_repr)) - local_media.attributes.append(SDPAttribute('x-file-offset', '')) + local_media.attributes.append(SDPAttribute(b'file-selector', self.file_selector.sdp_repr.encode())) + local_media.attributes.append(SDPAttribute(b'x-file-offset', b'')) if self.transfer_id is not None: - local_media.attributes.append(SDPAttribute('file-transfer-id', self.transfer_id)) + local_media.attributes.append(SDPAttribute(b'file-transfer-id', self.transfer_id.encode())) return local_media @property def file_offset_supported(self): try: return 'x-file-offset' in self.remote_media.attributes except AttributeError: return False @run_in_twisted_thread def _NH_FileTransferHandlerDidInitialize(self, notification): session, direction = self._initialize_args del self._initialize_args if not self._done: super(FileTransferStream, self).initialize(session, direction) @run_in_twisted_thread def _NH_FileTransferHandlerDidNotInitialize(self, notification): del self._initialize_args if not self._done: notification.center.post_notification('MediaStreamDidNotInitialize', sender=self, data=notification.data) @run_in_twisted_thread def _NH_FileTransferHandlerError(self, notification): self._failure_reason = notification.data.error notification.center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='transferring', reason=self._failure_reason)) def _NH_MediaStreamDidNotInitialize(self, notification): notification.center.remove_observer(self, sender=self.handler) def _NH_MediaStreamWillEnd(self, notification): notification.center.remove_observer(self, sender=self.handler) def _handle_REPORT(self, chunk): # in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message. self.handler.process_chunk(chunk) def _handle_SEND(self, chunk): notification_center = NotificationCenter() if chunk.size == 0: # keep-alive self.msrp_session.send_report(chunk, 200, 'OK') return if self.direction=='sendonly': self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if chunk.content_type.lower() == 'message/cpim': # In order to properly support the CPIM wrapper, msrplib needs to be refactored. -Luci self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') self._failure_reason = "CPIM wrapper is not supported" notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason)) return try: self.msrp_session.send_report(chunk, 200, 'OK') except Exception: pass # Best effort approach: even if we couldn't send the REPORT keep writing the chunks, we might have them all -Saul self.handler.process_chunk(chunk) def _handle_FILE_OFFSET(self, chunk): if self.direction != 'recvonly': response = make_response(chunk, 413, 'Unwanted message') self.msrp_session.send_chunk(response) return self.handler.process_chunk(chunk) diff --git a/sipsimple/streams/msrp/screensharing.py b/sipsimple/streams/msrp/screensharing.py index d991fedb..11f215ec 100644 --- a/sipsimple/streams/msrp/screensharing.py +++ b/sipsimple/streams/msrp/screensharing.py @@ -1,351 +1,353 @@ """ This module provides classes to parse and generate SDP related to SIP sessions that negotiate Screen Sharing. """ __all__ = ['ScreenSharingStream', 'VNCConnectionError', 'ScreenSharingHandler', 'ScreenSharingServerHandler', 'ScreenSharingViewerHandler', 'InternalVNCViewerHandler', 'InternalVNCServerHandler', 'ExternalVNCViewerHandler', 'ExternalVNCServerHandler'] from abc import ABCMeta, abstractmethod, abstractproperty from application.notification import NotificationCenter, NotificationData, IObserver from application.python.descriptor import WriteOnceAttribute from eventlib.coros import queue from eventlib.greenio import GreenSocket from eventlib.proc import spawn from eventlib.util import tcp_socket, set_reuse_addr from msrplib.protocol import FailureReportHeader, SuccessReportHeader, ContentTypeHeader from msrplib.transport import make_response, make_report from twisted.internet.error import ConnectionDone from zope.interface import implementer from sipsimple.core import SDPAttribute from sipsimple.streams import InvalidStreamError, UnknownStreamError from sipsimple.streams.msrp import MSRPStreamBase from sipsimple.threading import run_in_twisted_thread class VNCConnectionError(Exception): pass @implementer(IObserver) class ScreenSharingHandler(object, metaclass=ABCMeta): def __init__(self): self.incoming_msrp_queue = None self.outgoing_msrp_queue = None self.msrp_reader_thread = None self.msrp_writer_thread = None def initialize(self, stream): self.incoming_msrp_queue = stream.incoming_queue self.outgoing_msrp_queue = stream.outgoing_queue NotificationCenter().add_observer(self, sender=stream) @abstractproperty def type(self): raise NotImplementedError @abstractmethod def _msrp_reader(self): raise NotImplementedError @abstractmethod def _msrp_writer(self): raise NotImplementedError def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, None) if handler is not None: handler(notification) def _NH_MediaStreamDidStart(self, notification): self.msrp_reader_thread = spawn(self._msrp_reader) self.msrp_writer_thread = spawn(self._msrp_writer) def _NH_MediaStreamWillEnd(self, notification): notification.center.remove_observer(self, sender=notification.sender) if self.msrp_reader_thread is not None: self.msrp_reader_thread.kill() self.msrp_reader_thread = None if self.msrp_writer_thread is not None: self.msrp_writer_thread.kill() self.msrp_writer_thread = None class ScreenSharingServerHandler(ScreenSharingHandler): type = property(lambda self: 'passive') class ScreenSharingViewerHandler(ScreenSharingHandler): type = property(lambda self: 'active') class InternalVNCViewerHandler(ScreenSharingViewerHandler): @run_in_twisted_thread def send(self, data): self.outgoing_msrp_queue.send(data) def _msrp_reader(self): notification_center = NotificationCenter() while True: data = self.incoming_msrp_queue.wait() notification_center.post_notification('ScreenSharingStreamGotData', sender=self, data=NotificationData(data=data)) def _msrp_writer(self): pass class InternalVNCServerHandler(ScreenSharingServerHandler): @run_in_twisted_thread def send(self, data): self.outgoing_msrp_queue.send(data) def _msrp_reader(self): notification_center = NotificationCenter() while True: data = self.incoming_msrp_queue.wait() notification_center.post_notification('ScreenSharingStreamGotData', sender=self, data=NotificationData(data=data)) def _msrp_writer(self): pass class ExternalVNCViewerHandler(ScreenSharingViewerHandler): address = ('localhost', 0) connect_timeout = 5 def __init__(self): super(ExternalVNCViewerHandler, self).__init__() self.vnc_starter_thread = None self.vnc_socket = GreenSocket(tcp_socket()) set_reuse_addr(self.vnc_socket) self.vnc_socket.settimeout(self.connect_timeout) self.vnc_socket.bind(self.address) self.vnc_socket.listen(1) self.address = self.vnc_socket.getsockname() def _msrp_reader(self): while True: try: data = self.incoming_msrp_queue.wait() self.vnc_socket.sendall(data) except Exception as e: self.msrp_reader_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='sending', reason=str(e))) break def _msrp_writer(self): while True: try: data = self.vnc_socket.recv(2048) if not data: raise VNCConnectionError("connection with the VNC viewer was closed") self.outgoing_msrp_queue.send(data) except Exception as e: self.msrp_writer_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='reading', reason=str(e))) break def _start_vnc_connection(self): try: sock, addr = self.vnc_socket.accept() self.vnc_socket.close() self.vnc_socket = sock self.vnc_socket.settimeout(None) except Exception as e: self.vnc_starter_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='connecting', reason=str(e))) else: self.msrp_reader_thread = spawn(self._msrp_reader) self.msrp_writer_thread = spawn(self._msrp_writer) finally: self.vnc_starter_thread = None def _NH_MediaStreamDidStart(self, notification): self.vnc_starter_thread = spawn(self._start_vnc_connection) def _NH_MediaStreamWillEnd(self, notification): if self.vnc_starter_thread is not None: self.vnc_starter_thread.kill() self.vnc_starter_thread = None super(ExternalVNCViewerHandler, self)._NH_MediaStreamWillEnd(notification) self.vnc_socket.close() class ExternalVNCServerHandler(ScreenSharingServerHandler): address = ('localhost', 5900) connect_timeout = 5 def __init__(self): super(ExternalVNCServerHandler, self).__init__() self.vnc_starter_thread = None self.vnc_socket = None def _msrp_reader(self): while True: try: data = self.incoming_msrp_queue.wait() self.vnc_socket.sendall(data) except Exception as e: self.msrp_reader_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='sending', reason=str(e))) break def _msrp_writer(self): while True: try: data = self.vnc_socket.recv(2048) if not data: raise VNCConnectionError("connection to the VNC server was closed") self.outgoing_msrp_queue.send(data) except Exception as e: self.msrp_writer_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='reading', reason=str(e))) break def _start_vnc_connection(self): try: self.vnc_socket = GreenSocket(tcp_socket()) self.vnc_socket.settimeout(self.connect_timeout) self.vnc_socket.connect(self.address) self.vnc_socket.settimeout(None) except Exception as e: self.vnc_starter_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification NotificationCenter().post_notification('ScreenSharingHandlerDidFail', sender=self, data=NotificationData(context='connecting', reason=str(e))) else: self.msrp_reader_thread = spawn(self._msrp_reader) self.msrp_writer_thread = spawn(self._msrp_writer) finally: self.vnc_starter_thread = None def _NH_MediaStreamDidStart(self, notification): self.vnc_starter_thread = spawn(self._start_vnc_connection) def _NH_MediaStreamWillEnd(self, notification): if self.vnc_starter_thread is not None: self.vnc_starter_thread.kill() self.vnc_starter_thread = None super(ExternalVNCServerHandler, self)._NH_MediaStreamWillEnd(notification) if self.vnc_socket is not None: self.vnc_socket.close() class ScreenSharingStream(MSRPStreamBase): type = 'screen-sharing' priority = 1 media_type = 'application' accept_types = ['application/x-rfb'] accept_wrapped_types = None ServerHandler = InternalVNCServerHandler ViewerHandler = InternalVNCViewerHandler handler = WriteOnceAttribute() def __init__(self, mode): if mode not in ('viewer', 'server'): raise ValueError("mode should be 'viewer' or 'server' not '%s'" % mode) super(ScreenSharingStream, self).__init__(direction='sendrecv') self.handler = self.ViewerHandler() if mode=='viewer' else self.ServerHandler() self.incoming_queue = queue() self.outgoing_queue = queue() self.msrp_reader_thread = None self.msrp_writer_thread = None @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): remote_stream = remote_sdp.media[stream_index] - if remote_stream.media != 'application': + if remote_stream.media != b'application': raise UnknownStreamError accept_types = remote_stream.attributes.getfirst('accept-types', None) + accept_types = accept_types.decode() if accept_types else None if accept_types is None or 'application/x-rfb' not in accept_types.split(): raise UnknownStreamError expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport=='tls' else 'TCP/MSRP' - if remote_stream.transport != expected_transport: + if remote_stream.transport != expected_transport.encode(): raise InvalidStreamError("expected %s transport in chat stream, got %s" % (expected_transport, remote_stream.transport)) - if remote_stream.formats != ['*']: + if remote_stream.formats != [b'*']: raise InvalidStreamError("wrong format list specified") remote_rfbsetup = remote_stream.attributes.getfirst('rfbsetup', 'active') + remote_rfbsetup = remote_rfbsetup.decode() if remote_rfbsetup else None if remote_rfbsetup == 'active': stream = cls(mode='server') elif remote_rfbsetup == 'passive': stream = cls(mode='viewer') else: raise InvalidStreamError("unknown rfbsetup attribute in the remote screen sharing stream") - stream.remote_role = remote_stream.attributes.getfirst('setup', 'active') + stream.remote_role = remote_stream.attributes.getfirst(b'setup', b'active') return stream def _create_local_media(self, uri_path): local_media = super(ScreenSharingStream, self)._create_local_media(uri_path) - local_media.attributes.append(SDPAttribute('rfbsetup', self.handler.type)) + local_media.attributes.append(SDPAttribute(b'rfbsetup', self.handler.type.encode())) return local_media def _msrp_reader(self): while True: try: chunk = self.msrp.read_chunk() if chunk.method in (None, 'REPORT'): continue elif chunk.method == 'SEND': if chunk.content_type in self.accept_types: self.incoming_queue.send(chunk.data) response = make_response(chunk, 200, 'OK') report = make_report(chunk, 200, 'OK') else: response = make_response(chunk, 415, 'Invalid Content-Type') report = None else: response = make_response(chunk, 501, 'Unknown method') report = None if response is not None: self.msrp.write_chunk(response) if report is not None: self.msrp.write_chunk(report) except Exception as e: self.msrp_reader_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification if self.shutting_down and isinstance(e, ConnectionDone): break self._failure_reason = str(e) NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason)) break def _msrp_writer(self): while True: try: data = self.outgoing_queue.wait() chunk = self.msrp.make_send_request(data=data) chunk.add_header(SuccessReportHeader('no')) chunk.add_header(FailureReportHeader('partial')) chunk.add_header(ContentTypeHeader('application/x-rfb')) self.msrp.write_chunk(chunk) except Exception as e: self.msrp_writer_thread = None # avoid issues caused by the notification handler killing this greenlet during post_notification if self.shutting_down and isinstance(e, ConnectionDone): break self._failure_reason = str(e) NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='sending', reason=self._failure_reason)) break def _NH_MediaStreamDidInitialize(self, notification): notification.center.add_observer(self, sender=self.handler) self.handler.initialize(self) def _NH_MediaStreamDidStart(self, notification): self.msrp_reader_thread = spawn(self._msrp_reader) self.msrp_writer_thread = spawn(self._msrp_writer) def _NH_MediaStreamWillEnd(self, notification): notification.center.remove_observer(self, sender=self.handler) if self.msrp_reader_thread is not None: self.msrp_reader_thread.kill() self.msrp_reader_thread = None if self.msrp_writer_thread is not None: self.msrp_writer_thread.kill() self.msrp_writer_thread = None def _NH_ScreenSharingHandlerDidFail(self, notification): self._failure_reason = notification.data.reason notification.center.post_notification('MediaStreamDidFail', sender=self, data=notification.data)