diff --git a/sipsimple/streams/msrp/chat.py b/sipsimple/streams/msrp/chat.py index 437b2a49..59eca745 100644 --- a/sipsimple/streams/msrp/chat.py +++ b/sipsimple/streams/msrp/chat.py @@ -1,900 +1,900 @@ """ This module provides classes to parse and generate SDP related to SIP sessions that negotiate Instant Messaging, including CPIM as defined in RFC3862 """ import pickle as pickle import codecs import os import random import re from application.python.descriptor import WriteOnceAttribute from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from application.python.types import Singleton from application.system import openfile from collections import defaultdict from email.message import Message as EmailMessage from email.parser import Parser as EmailParser from eventlib.coros import queue from eventlib.proc import spawn, ProcExit from functools import partial from msrplib.protocol import FailureReportHeader, SuccessReportHeader, UseNicknameHeader from msrplib.session import MSRPSession, contains_mime_type from otr import OTRSession, OTRTransport, OTRState, SMPStatus from otr.cryptography import DSAPrivateKey from otr.exceptions import IgnoreMessage, UnencryptedMessage, EncryptedMessageError, OTRError from zope.interface import implementer from sipsimple.core import SIPURI, BaseSIPURI from sipsimple.payloads import ParserError from sipsimple.payloads.iscomposing import IsComposingDocument, State, LastActive, Refresh, ContentType from sipsimple.storage import ISIPSimpleApplicationDataStorage from sipsimple.streams import InvalidStreamError, UnknownStreamError from sipsimple.streams.msrp import MSRPStreamError, MSRPStreamBase from sipsimple.threading import run_in_thread, run_in_twisted_thread from sipsimple.threading.green import run_in_green_thread from sipsimple.util import MultilingualText, ISOTimestamp __all__ = ['ChatStream', 'ChatStreamError', 'ChatIdentity', 'CPIMPayload', 'CPIMHeader', 'CPIMNamespace', 'CPIMParserError', 'OTRState', 'SMPStatus'] class OTRTrustedPeer(object): fingerprint = WriteOnceAttribute() # in order to be hashable this needs to be immutable def __init__(self, fingerprint, description='', **kw): if not isinstance(fingerprint, str): raise TypeError("fingerprint must be a string") self.fingerprint = fingerprint self.description = description self.__dict__.update(kw) def __hash__(self): return hash(self.fingerprint) def __eq__(self, other): if isinstance(other, OTRTrustedPeer): return self.fingerprint == other.fingerprint elif isinstance(other, str): return self.fingerprint == other else: return NotImplemented def __ne__(self, other): return not (self == other) def __repr__(self): return "{0.__class__.__name__}({0.fingerprint!r}, description={0.description!r})".format(self) def __reduce__(self): return self.__class__, (self.fingerprint,), self.__dict__ class OTRTrustedPeerSet(object): def __init__(self, iterable=()): self.__data__ = {} self.update(iterable) def __repr__(self): return "{}({})".format(self.__class__.__name__, list(self.__data__.values())) def __contains__(self, item): return item in self.__data__ def __getitem__(self, item): return self.__data__[item] def __iter__(self): return iter(list(self.__data__.values())) def __len__(self): return len(self.__data__) def get(self, item, default=None): return self.__data__.get(item, default) def add(self, item): if not isinstance(item, OTRTrustedPeer): raise TypeError("item should be and instance of OTRTrustedPeer") self.__data__[item.fingerprint] = item def remove(self, item): del self.__data__[item] def discard(self, item): self.__data__.pop(item, None) def update(self, iterable=()): for item in iterable: self.add(item) class OTRCache(object, metaclass=Singleton): def __init__(self): from sipsimple.application import SIPApplication if SIPApplication.storage is None: raise RuntimeError("Cannot access the OTR cache before SIPApplication.storage is defined") if ISIPSimpleApplicationDataStorage.providedBy(SIPApplication.storage): self.key_file = os.path.join(SIPApplication.storage.directory, 'otr.key') self.trusted_file = os.path.join(SIPApplication.storage.directory, 'otr.trusted') try: self.private_key = DSAPrivateKey.load(self.key_file) if self.private_key.key_size != 1024: raise ValueError except (EnvironmentError, ValueError): self.private_key = DSAPrivateKey.generate() self.private_key.save(self.key_file) try: self.trusted_peers = pickle.load(open(self.trusted_file, 'rb')) if not isinstance(self.trusted_peers, OTRTrustedPeerSet) or not all(isinstance(item, OTRTrustedPeer) for item in self.trusted_peers): raise ValueError("invalid OTR trusted peers file") except Exception: self.trusted_peers = OTRTrustedPeerSet() self.save() else: self.key_file = self.trusted_file = None self.private_key = DSAPrivateKey.generate() self.trusted_peers = OTRTrustedPeerSet() # def generate_private_key(self): # self.private_key = DSAPrivateKey.generate() # if self.key_file: # self.private_key.save(self.key_file) @run_in_thread('file-io') def save(self): if self.trusted_file is not None: with openfile(self.trusted_file, 'wb', permissions=0o600) as trusted_file: pickle.dump(self.trusted_peers, trusted_file) @implementer(IObserver) class OTREncryption(object): def __init__(self, stream): self.stream = stream self.otr_cache = OTRCache() self.otr_session = OTRSession(self.otr_cache.private_key, self.stream, supported_versions={3}) # we need at least OTR-v3 for question based SMP notification_center = NotificationCenter() notification_center.add_observer(self, sender=stream) notification_center.add_observer(self, sender=self.otr_session) @property def active(self): try: return self.otr_session.encrypted except AttributeError: return False @property def cipher(self): return 'AES-128-CTR' if self.active else None @property def key_fingerprint(self): try: return self.otr_session.local_private_key.public_key.fingerprint except AttributeError: return None @property def peer_fingerprint(self): try: return self.otr_session.remote_public_key.fingerprint except AttributeError: return None @property def peer_name(self): try: return self.__dict__['peer_name'] except KeyError: trusted_peer = self.otr_cache.trusted_peers.get(self.peer_fingerprint, None) if trusted_peer is None: return '' else: return self.__dict__.setdefault('peer_name', trusted_peer.description) @peer_name.setter def peer_name(self, name): old_name = self.peer_name new_name = self.__dict__['peer_name'] = name if old_name != new_name: trusted_peer = self.otr_cache.trusted_peers.get(self.peer_fingerprint, None) if trusted_peer is not None: trusted_peer.description = new_name self.otr_cache.save() notification_center = NotificationCenter() notification_center.post_notification("ChatStreamOTRPeerNameChanged", sender=self.stream, data=NotificationData(name=name)) @property def verified(self): return self.peer_fingerprint in self.otr_cache.trusted_peers @verified.setter def verified(self, value): peer_fingerprint = self.peer_fingerprint old_verified = peer_fingerprint in self.otr_cache.trusted_peers new_verified = bool(value) if peer_fingerprint is None or new_verified == old_verified: return if new_verified: self.otr_cache.trusted_peers.add(OTRTrustedPeer(peer_fingerprint, description=self.peer_name)) else: self.otr_cache.trusted_peers.remove(peer_fingerprint) self.otr_cache.save() notification_center = NotificationCenter() notification_center.post_notification("ChatStreamOTRVerifiedStateChanged", sender=self.stream, data=NotificationData(verified=new_verified)) @run_in_twisted_thread def start(self): if self.otr_session is not None: self.otr_session.start() @run_in_twisted_thread def stop(self): if self.otr_session is not None: self.otr_session.stop() @run_in_twisted_thread def smp_verify(self, secret, question=None): self.otr_session.smp_verify(secret, question) @run_in_twisted_thread def smp_answer(self, secret): self.otr_session.smp_answer(secret) @run_in_twisted_thread def smp_abort(self): self.otr_session.smp_abort() def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidStart(self, notification): if self.stream.start_otr: self.otr_session.start() def _NH_MediaStreamDidEnd(self, notification): notification.center.remove_observer(self, sender=self.stream) notification.center.remove_observer(self, sender=self.otr_session) self.otr_session.stop() self.otr_session = None self.stream = None _NH_MediaStreamDidNotInitialize = _NH_MediaStreamDidEnd def _NH_OTRSessionStateChanged(self, notification): notification.center.post_notification('ChatStreamOTREncryptionStateChanged', sender=self.stream, data=notification.data) def _NH_OTRSessionSMPVerificationDidStart(self, notification): notification.center.post_notification('ChatStreamSMPVerificationDidStart', sender=self.stream, data=notification.data) def _NH_OTRSessionSMPVerificationDidNotStart(self, notification): notification.center.post_notification('ChatStreamSMPVerificationDidNotStart', sender=self.stream, data=notification.data) def _NH_OTRSessionSMPVerificationDidEnd(self, notification): notification.center.post_notification('ChatStreamSMPVerificationDidEnd', sender=self.stream, data=notification.data) class ChatStreamError(MSRPStreamError): pass class ChatStream(MSRPStreamBase): type = 'chat' priority = 1 msrp_session_class = MSRPSession media_type = 'message' accept_types = ['message/cpim', 'text/*', 'image/*', 'application/im-iscomposing+xml'] accept_wrapped_types = ['text/*', 'image/*', 'application/im-iscomposing+xml'] prefer_cpim = True start_otr = True def __init__(self): super(ChatStream, self).__init__(direction='sendrecv') self.message_queue = queue() self.sent_messages = set() self.incoming_queue = defaultdict(list) self.message_queue_thread = None self.encryption = OTREncryption(self) @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): remote_stream = remote_sdp.media[stream_index] if remote_stream.media != b'message': raise UnknownStreamError expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport=='tls' else 'TCP/MSRP' if remote_stream.transport != expected_transport.encode(): raise InvalidStreamError("expected %s transport in chat stream, got %s" % (expected_transport, remote_stream.transport)) if remote_stream.formats != [b'*']: raise InvalidStreamError("wrong format list specified") stream = cls() stream.remote_role = remote_stream.attributes.getfirst('setup', 'active') if remote_stream.direction != b'sendrecv': raise InvalidStreamError("Unsupported direction for chat stream: %s" % remote_stream.direction) remote_accept_types = remote_stream.attributes.getfirst(b'accept-types') if remote_accept_types is None: raise InvalidStreamError("remote SDP media does not have 'accept-types' attribute") if not any(contains_mime_type(cls.accept_types, mime_type) for mime_type in remote_accept_types.decode().split()): raise InvalidStreamError("no compatible media types found") return stream @property def local_identity(self): try: return ChatIdentity(self.session.local_identity.uri, self.session.local_identity.display_name) except AttributeError: return None @property def remote_identity(self): try: return ChatIdentity(self.session.remote_identity.uri, self.session.remote_identity.display_name) except AttributeError: return None @property def private_messages_allowed(self): return 'private-messages' in self.chatroom_capabilities @property def nickname_allowed(self): return 'nickname' in self.chatroom_capabilities @property def chatroom_capabilities(self): try: if self.cpim_enabled and self.session.remote_focus: return ' '.join(self.remote_media.attributes.getall('chatroom')).split() except AttributeError: pass return [] def _NH_MediaStreamDidStart(self, notification): self.message_queue_thread = spawn(self._message_queue_handler) def _NH_MediaStreamDidNotInitialize(self, notification): message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream was closed') notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _NH_MediaStreamDidEnd(self, notification): if self.message_queue_thread is not None: self.message_queue_thread.kill() else: message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification.center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _handle_REPORT(self, chunk): # in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message. if chunk.message_id in self.sent_messages: self.sent_messages.remove(chunk.message_id) notification_center = NotificationCenter() data = NotificationData(message_id=chunk.message_id, message=chunk, code=chunk.status.code, reason=chunk.status.comment) if chunk.status.code == 200: notification_center.post_notification('ChatStreamDidDeliverMessage', sender=self, data=data) else: notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _handle_SEND(self, chunk): if chunk.size == 0: # keep-alive self.msrp_session.send_report(chunk, 200, 'OK') return content_type = chunk.content_type.lower() if not contains_mime_type(self.accept_types, content_type): self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if chunk.contflag == '#': self.incoming_queue.pop(chunk.message_id, None) self.msrp_session.send_report(chunk, 200, 'OK') return elif chunk.contflag == '+': self.incoming_queue[chunk.message_id].append(chunk.data) self.msrp_session.send_report(chunk, 200, 'OK') return else: - data = ''.join(self.incoming_queue.pop(chunk.message_id, [])) + chunk.data + data = ''.join(self.incoming_queue.pop(chunk.message_id, [])) + chunk.data.decode() if content_type == 'message/cpim': try: payload = CPIMPayload.decode(data) except CPIMParserError: self.msrp_session.send_report(chunk, 400, 'CPIM Parser Error') return else: message = Message(**{name: getattr(payload, name) for name in Message.__slots__}) if not contains_mime_type(self.accept_wrapped_types, message.content_type): self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if message.timestamp is None: message.timestamp = ISOTimestamp.now() if message.sender is None: message.sender = self.remote_identity private = self.session.remote_focus and len(message.recipients) == 1 and message.recipients[0] != self.remote_identity else: payload = SimplePayload.decode(data, content_type) message = Message(payload.content, payload.content_type, sender=self.remote_identity, recipients=[self.local_identity], timestamp=ISOTimestamp.now()) private = False try: message.content = self.encryption.otr_session.handle_input(message.content.encode(), message.content_type) except IgnoreMessage: self.msrp_session.send_report(chunk, 200, 'OK') return except UnencryptedMessage: encrypted = False encryption_active = True except EncryptedMessageError as e: self.msrp_session.send_report(chunk, 400, str(e)) notification_center = NotificationCenter() notification_center.post_notification('ChatStreamOTRError', sender=self, data=NotificationData(error=str(e))) return except OTRError as e: self.msrp_session.send_report(chunk, 200, 'OK') notification_center = NotificationCenter() notification_center.post_notification('ChatStreamOTRError', sender=self, data=NotificationData(error=str(e))) return else: encrypted = encryption_active = self.encryption.active if payload.charset is not None: message.content = message.content.decode(payload.charset) elif payload.content_type.startswith('text/'): message.content.decode('utf8') notification_center = NotificationCenter() if message.content_type.lower() == IsComposingDocument.content_type: try: document = IsComposingDocument.parse(message.content) except ParserError as e: self.msrp_session.send_report(chunk, 400, str(e)) return self.msrp_session.send_report(chunk, 200, 'OK') data = NotificationData(state=document.state.value, refresh=document.refresh.value if document.refresh is not None else 120, content_type=document.content_type.value if document.content_type is not None else None, last_active=document.last_active.value if document.last_active is not None else None, sender=message.sender, recipients=message.recipients, private=private, encrypted=encrypted, encryption_active=encryption_active) notification_center.post_notification('ChatStreamGotComposingIndication', sender=self, data=data) else: self.msrp_session.send_report(chunk, 200, 'OK') data = NotificationData(message=message, private=private, encrypted=encrypted, encryption_active=encryption_active) notification_center.post_notification('ChatStreamGotMessage', sender=self, data=data) def _on_transaction_response(self, message_id, response): if message_id in self.sent_messages and response.code != 200: self.sent_messages.remove(message_id) data = NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment) NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) def _on_nickname_transaction_response(self, message_id, response): notification_center = NotificationCenter() if response.code == 200: notification_center.post_notification('ChatStreamDidSetNickname', sender=self, data=NotificationData(message_id=message_id, response=response)) else: notification_center.post_notification('ChatStreamDidNotSetNickname', sender=self, data=NotificationData(message_id=message_id, message=response, code=response.code, reason=response.comment)) def _message_queue_handler(self): notification_center = NotificationCenter() try: while True: message = self.message_queue.wait() if self.msrp_session is None: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) break try: if isinstance(message.content, str): message.content = message.content.encode('utf8') charset = 'utf8' else: charset = None if not isinstance(message, QueuedOTRInternalMessage): try: message.content = self.encryption.otr_session.handle_output(message.content, message.content_type) except OTRError as e: raise ChatStreamError(str(e)) message.sender = message.sender or self.local_identity message.recipients = message.recipients or [self.remote_identity] # check if we MUST use CPIM need_cpim = (message.sender != self.local_identity or message.recipients != [self.remote_identity] or message.courtesy_recipients or message.subject or message.timestamp or message.required or message.additional_headers) if need_cpim or not contains_mime_type(self.remote_accept_types, message.content_type): if not contains_mime_type(self.remote_accept_wrapped_types, message.content_type): raise ChatStreamError('Unsupported content_type for outgoing message: %r' % message.content_type) if not self.cpim_enabled: raise ChatStreamError('Additional message meta-data cannot be sent, because the CPIM wrapper is not used') if not self.private_messages_allowed and message.recipients != [self.remote_identity]: raise ChatStreamError('The remote end does not support private messages') if message.timestamp is None: message.timestamp = ISOTimestamp.now() payload = CPIMPayload(charset=charset, **{name: getattr(message, name) for name in Message.__slots__}) elif self.prefer_cpim and self.cpim_enabled and contains_mime_type(self.remote_accept_wrapped_types, message.content_type): if message.timestamp is None: message.timestamp = ISOTimestamp.now() payload = CPIMPayload(charset=charset, **{name: getattr(message, name) for name in Message.__slots__}) else: payload = SimplePayload(message.content, message.content_type, charset) except ChatStreamError as e: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason=e.args[0]) notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) continue else: content, content_type = payload.encode() message_id = message.id notify_progress = message.notify_progress report = 'yes' if notify_progress else 'no' chunk = self.msrp_session.make_message(content, content_type=content_type, message_id=message_id) chunk.add_header(FailureReportHeader(report)) chunk.add_header(SuccessReportHeader(report)) try: self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_transaction_response, message_id)) except Exception as e: if notify_progress: data = NotificationData(message_id=message_id, message=None, code=0, reason=str(e)) notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) except ProcExit: if notify_progress: data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) raise else: if notify_progress: self.sent_messages.add(message_id) notification_center.post_notification('ChatStreamDidSendMessage', sender=self, data=NotificationData(message=chunk)) finally: self.message_queue_thread = None while self.sent_messages: message_id = self.sent_messages.pop() data = NotificationData(message_id=message_id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) message_queue, self.message_queue = self.message_queue, queue() while message_queue: message = message_queue.wait() if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') notification_center.post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) @run_in_twisted_thread def _enqueue_message(self, message): if self._done: if message.notify_progress: data = NotificationData(message_id=message.id, message=None, code=0, reason='Stream ended') NotificationCenter().post_notification('ChatStreamDidNotDeliverMessage', sender=self, data=data) else: self.message_queue.send(message) @run_in_green_thread def _set_local_nickname(self, nickname, message_id): if self.msrp_session is None: # should we generate ChatStreamDidNotSetNickname here? return chunk = self.msrp.make_request('NICKNAME') chunk.add_header(UseNicknameHeader(nickname or '')) try: self.msrp_session.send_chunk(chunk, response_cb=partial(self._on_nickname_transaction_response, message_id)) except Exception as e: self._failure_reason = str(e) NotificationCenter().post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='sending', reason=self._failure_reason)) def inject_otr_message(self, data): message = QueuedOTRInternalMessage(data) self._enqueue_message(message) def send_message(self, content, content_type='text/plain', recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None): message = QueuedMessage(content, content_type, recipients=recipients, courtesy_recipients=courtesy_recipients, subject=subject, timestamp=timestamp, required=required, additional_headers=additional_headers, notify_progress=True) self._enqueue_message(message) return message.id def send_composing_indication(self, state, refresh=None, last_active=None, recipients=None): content = IsComposingDocument.create(state=State(state), refresh=Refresh(refresh) if refresh is not None else None, last_active=LastActive(last_active) if last_active is not None else None, content_type=ContentType('text')) message = QueuedMessage(content, IsComposingDocument.content_type, recipients=recipients, notify_progress=False) self._enqueue_message(message) return message.id def set_local_nickname(self, nickname): if not self.nickname_allowed: raise ChatStreamError('Setting nickname is not supported') message_id = '%x' % random.getrandbits(64) self._set_local_nickname(nickname, message_id) return message_id OTRTransport.register(ChatStream) # Chat related objects, including CPIM support as defined in RFC3862 # class ChatIdentity(object): _format_re = re.compile(r'^(?:"?(?P[^<]*[^"\s])"?)?\s*<(?Psips?:.+)>$') def __init__(self, uri, display_name=None): self.uri = uri self.display_name = display_name def __eq__(self, other): if isinstance(other, ChatIdentity): return self.uri.user == other.uri.user and self.uri.host == other.uri.host elif isinstance(other, BaseSIPURI): return self.uri.user == other.user and self.uri.host == other.host elif isinstance(other, str): try: other_uri = SIPURI.parse(other) except Exception: return False else: return self.uri.user == other_uri.user and self.uri.host == other_uri.host else: return NotImplemented def __ne__(self, other): return not (self == other) def __repr__(self): return '{0.__class__.__name__}(uri={0.uri!r}, display_name={0.display_name!r})'.format(self) def __str__(self): if self.display_name: return '{0.display_name} <{0.uri}>'.format(self) else: return '<{0.uri}>'.format(self) @classmethod def parse(cls, value): match = cls._format_re.match(value) if match is None: raise ValueError('Cannot parse identity value: %r' % value) return cls(SIPURI.parse(match.group('uri')), match.group('display_name')) class Message(object): __slots__ = 'content', 'content_type', 'sender', 'recipients', 'courtesy_recipients', 'subject', 'timestamp', 'required', 'additional_headers' def __init__(self, content, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None): self.content = content self.content_type = content_type self.sender = sender self.recipients = recipients or [] self.courtesy_recipients = courtesy_recipients or [] self.subject = subject self.timestamp = ISOTimestamp(timestamp) if timestamp is not None else None self.required = required or [] self.additional_headers = additional_headers or [] class QueuedMessage(Message): __slots__ = 'id', 'notify_progress' def __init__(self, content, content_type, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None, id=None, notify_progress=True): super(QueuedMessage, self).__init__(content, content_type, sender, recipients, courtesy_recipients, subject, timestamp, required, additional_headers) self.id = id or '%x' % random.getrandbits(64) self.notify_progress = notify_progress class QueuedOTRInternalMessage(QueuedMessage): def __init__(self, content): super(QueuedOTRInternalMessage, self).__init__(content, 'text/plain', notify_progress=False) class SimplePayload(object): def __init__(self, content, content_type, charset=None): if not isinstance(content, bytes): raise TypeError("content should be an instance of bytes") self.content = content self.content_type = content_type self.charset = charset def encode(self): if self.charset is not None: return self.content, '{0.content_type}; charset="{0.charset}"'.format(self) else: return self.content, str(self.content_type) @classmethod def decode(cls, content, content_type): if not isinstance(content, bytes): raise TypeError("content should be an instance of bytes") type_helper = EmailParser().parsestr('Content-Type: {}'.format(content_type)) content_type = type_helper.get_content_type() charset = type_helper.get_content_charset() return cls(content, content_type, charset) class CPIMPayload(object): standard_namespace = 'urn:ietf:params:cpim-headers:' headers_re = re.compile(r'(?:([^:]+?)\.)?(.+?):\s*(.+?)(?:\r\n|$)') subject_re = re.compile(r'^(?:;lang=([a-z]{1,8}(?:-[a-z0-9]{1,8})*)\s+)?(.*)$') namespace_re = re.compile(r'^(?:(\S+) ?)?<(.*)>$') def __init__(self, content, content_type, charset=None, sender=None, recipients=None, courtesy_recipients=None, subject=None, timestamp=None, required=None, additional_headers=None): self.content = content self.content_type = content_type self.charset = charset self.sender = sender self.recipients = recipients or [] self.courtesy_recipients = courtesy_recipients or [] self.subject = subject if isinstance(subject, (MultilingualText, type(None))) else MultilingualText(subject) self.timestamp = ISOTimestamp(timestamp) if timestamp is not None else None self.required = required or [] self.additional_headers = additional_headers or [] def encode(self): namespaces = {'': CPIMNamespace(self.standard_namespace)} header_list = [] if self.sender is not None: header_list.append('From: {}'.format(self.sender)) header_list.extend('To: {}'.format(recipient) for recipient in self.recipients) header_list.extend('cc: {}'.format(recipient) for recipient in self.courtesy_recipients) if self.subject is not None: header_list.append('Subject: {}'.format(self.subject)) header_list.extend('Subject:;lang={} {}'.format(language, translation) for language, translation in list(self.subject.translations.items())) if self.timestamp is not None: header_list.append('DateTime: {}'.format(self.timestamp)) if self.required: header_list.append('Required: {}'.format(','.join(self.required))) for header in self.additional_headers: if namespaces.get(header.namespace.prefix) != header.namespace: if header.namespace.prefix: header_list.append('NS: {0.namespace.prefix} <{0.namespace}>'.format(header)) else: header_list.append('NS: <{0.namespace}>'.format(header)) namespaces[header.namespace.prefix] = header.namespace if header.namespace.prefix: header_list.append('{0.namespace.prefix}.{0.name}: {0.value}'.format(header)) else: header_list.append('{0.name}: {0.value}'.format(header)) headers = '\r\n'.join(header_list) mime_message = EmailMessage() mime_message.set_payload(self.content) mime_message.set_type(self.content_type) if self.charset is not None: mime_message.set_param('charset', self.charset) return headers + '\r\n\r\n' + mime_message.as_string(), 'message/cpim' @classmethod def decode(cls, message): headers, separator, body = message.partition('\r\n\r\n') if not separator: raise CPIMParserError('Invalid CPIM message') sender = None recipients = [] courtesy_recipients = [] subject = None timestamp = None required = [] additional_headers = [] namespaces = {'': CPIMNamespace(cls.standard_namespace)} subjects = {} for prefix, name, value in cls.headers_re.findall(headers): namespace = namespaces.get(prefix) if namespace is None or '.' in name: continue try: #value = value.decode('cpim-header') if namespace == cls.standard_namespace: if name == 'From': sender = ChatIdentity.parse(value) elif name == 'To': recipients.append(ChatIdentity.parse(value)) elif name == 'cc': courtesy_recipients.append(ChatIdentity.parse(value)) elif name == 'Subject': match = cls.subject_re.match(value) if match is None: raise ValueError('Illegal Subject header: %r' % value) lang, subject = match.groups() # language tags must be ASCII subjects[str(lang) if lang is not None else None] = subject elif name == 'DateTime': timestamp = ISOTimestamp(value) elif name == 'Required': required.extend(re.split(r'\s*,\s*', value)) elif name == 'NS': match = cls.namespace_re.match(value) if match is None: raise ValueError('Illegal NS header: %r' % value) prefix, uri = match.groups() namespaces[prefix] = CPIMNamespace(uri, prefix) else: additional_headers.append(CPIMHeader(name, namespace, value)) else: additional_headers.append(CPIMHeader(name, namespace, value)) except ValueError: pass if None in subjects: subject = MultilingualText(subjects.pop(None), **subjects) elif subjects: subject = MultilingualText(**subjects) mime_message = EmailParser().parsestr(body) content_type = mime_message.get_content_type() if content_type is None: raise CPIMParserError("CPIM message missing Content-Type MIME header") content = mime_message.get_payload() charset = mime_message.get_content_charset() return cls(content, content_type, charset, sender, recipients, courtesy_recipients, subject, timestamp, required, additional_headers) class CPIMParserError(Exception): pass class CPIMNamespace(str): def __new__(cls, value, prefix=''): obj = str.__new__(cls, value) obj.prefix = prefix return obj class CPIMHeader(object): def __init__(self, name, namespace, value): self.name = name self.namespace = namespace self.value = value class CPIMCodec(codecs.Codec): character_map = {c: '\\u{:04x}'.format(c) for c in list(range(32)) + [127]} character_map[ord('\\')] = '\\\\' @classmethod def encode(cls, input, errors='strict'): return input.translate(cls.character_map).encode('utf-8', errors), len(input) @classmethod def decode(cls, input, errors='strict'): return input.decode('utf-8', errors).encode('raw-unicode-escape', errors).decode('unicode-escape', errors), len(input) def cpim_codec_search(name): if name.lower() in ('cpim-header', 'cpim_header'): return codecs.CodecInfo(name='CPIM-header', encode=CPIMCodec.encode, decode=CPIMCodec.decode, incrementalencoder=codecs.IncrementalEncoder, incrementaldecoder=codecs.IncrementalDecoder, streamwriter=codecs.StreamWriter, streamreader=codecs.StreamReader) codecs.register(cpim_codec_search) del cpim_codec_search diff --git a/sipsimple/streams/msrp/filetransfer.py b/sipsimple/streams/msrp/filetransfer.py index 413635ec..ff616143 100644 --- a/sipsimple/streams/msrp/filetransfer.py +++ b/sipsimple/streams/msrp/filetransfer.py @@ -1,754 +1,755 @@ """ This module provides classes to parse and generate SDP related to SIP sessions that negotiate File Transfer. """ __all__ = ['FileTransferStream', 'FileSelector'] import pickle as pickle import hashlib import mimetypes import os import random import re import time import uuid from abc import ABCMeta, abstractmethod from application.notification import NotificationCenter, NotificationData, IObserver from application.python.threadpool import ThreadPool, run_in_threadpool from application.python.types import MarkerType from application.system import FileExistsError, makedirs, openfile, unlink from itertools import count from msrplib.protocol import FailureReportHeader, SuccessReportHeader, ContentTypeHeader, IntegerHeaderType, MSRPNamedHeader, HeaderParsingError from msrplib.session import MSRPSession from msrplib.transport import make_response from queue import Queue from threading import Event, Lock from zope.interface import implementer from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SDPAttribute from sipsimple.storage import ISIPSimpleApplicationDataStorage from sipsimple.streams import InvalidStreamError, UnknownStreamError from sipsimple.streams.msrp import MSRPStreamBase from sipsimple.threading import run_in_twisted_thread, run_in_thread #sha1 is much faster on large file than python native implementation from sipsimple.util import sha1 HASH = type(hashlib.sha1()) class RandomID(metaclass=MarkerType): pass class FileSelectorHash(str): _hash_re = re.compile(r'^sha-1(:[0-9A-F]{2}){20}$') _byte_re = re.compile(r'..') def __new__(cls, value): if isinstance(value, str): if value.startswith('sha1:'): # backward compatibility hack (sort of). value = 'sha-1' + value[len('sha1'):] if not cls._hash_re.match(value): raise ValueError("Invalid hash value: {!r}".format(value)) return super(FileSelectorHash, cls).__new__(cls, value) elif isinstance(value, (HASH, sha1)): return super(FileSelectorHash, cls).__new__(cls, cls.encode_hash(value)) else: raise ValueError("Invalid hash value: {!r}".format(value)) def __eq__(self, other): if isinstance(other, str): return super(FileSelectorHash, self).__eq__(other) elif isinstance(other, (HASH, sha1)) and other.name.lower() == 'sha1': return super(FileSelectorHash, self).__eq__(self.encode_hash(other)) else: return NotImplemented def __ne__(self, other): return not self == other @classmethod def encode_hash(cls, hash_instance): if hash_instance.name.lower() != 'sha1': raise TypeError("Invalid hash type: {.name} (only sha1 hashes are supported).".format(hash_instance)) # unexpected as it may be, using a regular expression is the fastest method to do this return 'sha-1:' + ':'.join(cls._byte_re.findall(hash_instance.hexdigest().upper())) class FileSelector(object): _name_re = re.compile(r'name:"([^"]+)"') _size_re = re.compile(r'size:(\d+)') _type_re = re.compile(r'type:([^ ]+)') _hash_re = re.compile(r'hash:([^ ]+)') def __init__(self, name=None, type=None, size=None, hash=None, fd=None): # If present, hash should be a sha1 object or a string in the form: sha-1:72:24:5F:E8:65:3D:DA:F3:71:36:2F:86:D4:71:91:3E:E4:A2:CE:2E # According to the specification, only sha1 is supported ATM. self.name = name self.type = type self.size = size self.hash = hash self.fd = fd @property def hash(self): return self.__dict__['hash'] @hash.setter def hash(self, value): self.__dict__['hash'] = None if value is None else FileSelectorHash(value) @classmethod def parse(cls, string): name_match = cls._name_re.search(string) size_match = cls._size_re.search(string) type_match = cls._type_re.search(string) hash_match = cls._hash_re.search(string) name = name_match and name_match.group(1) size = size_match and int(size_match.group(1)) type = type_match and type_match.group(1) hash = hash_match and hash_match.group(1) return cls(name, type, size, hash) @classmethod def for_file(cls, path, type=None, hash=None): name = str(path) fd = open(name, 'rb') size = os.fstat(fd.fileno()).st_size if type is None: mime_type, encoding = mimetypes.guess_type(name) if encoding is not None: type = 'application/x-%s' % encoding elif mime_type is not None: type = mime_type else: type = 'application/octet-stream' return cls(name, type, size, hash, fd) @property def sdp_repr(self): - items = [('name', self.name and '"%s"' % os.path.basename(self.name).encode('utf-8')), ('type', self.type), ('size', self.size), ('hash', self.hash)] - return ' '.join('%s:%s' % (name, value) for name, value in items if value is not None) + items = [('name', self.name and '"%s"' % os.path.basename(self.name)), ('type', self.type), ('size', self.size), ('hash', self.hash)] + sdp = ' '.join('%s:%s' % (name, value) for name, value in items if value is not None) + return sdp.encode() class UniqueFilenameGenerator(object): @classmethod def generate(cls, name): yield name prefix, extension = os.path.splitext(name) for x in count(1): yield "%s-%d%s" % (prefix, x, extension) class FileMetadataEntry(object): def __init__(self, hash, filename, partial_hash=None): self.hash = hash self.filename = filename self.mtime = os.path.getmtime(self.filename) self.partial_hash = partial_hash @classmethod def from_selector(cls, file_selector): return cls(file_selector.hash.lower(), file_selector.name) class FileTransfersMetadata(object): __filename__ = 'transfer_metadata' __lifetime__ = 60*60*24*7 def __init__(self): self.data = {} self.lock = Lock() self.loaded = False self.directory = None def _load(self): if self.loaded: return from sipsimple.application import SIPApplication if ISIPSimpleApplicationDataStorage.providedBy(SIPApplication.storage): self.directory = SIPApplication.storage.directory if self.directory is not None: try: with open(os.path.join(self.directory, self.__filename__), 'rb') as f: data = pickle.loads(f.read()) except Exception: data = {} now = time.time() for hash, entry in list(data.items()): try: mtime = os.path.getmtime(entry.filename) except OSError: data.pop(hash) else: if mtime != entry.mtime or now - mtime > self.__lifetime__: data.pop(hash) self.data.update(data) self.loaded = True @run_in_thread('file-io') def _save(self, data): if self.directory is not None: with open(os.path.join(self.directory, self.__filename__), 'wb') as f: f.write(data) def __enter__(self): self.lock.acquire() self._load() return self.data def __exit__(self, exc_type, exc_val, exc_tb): if None is exc_type is exc_val is exc_tb: self._save(pickle.dumps(self.data)) self.lock.release() @implementer(IObserver) class FileTransferHandler(object, metaclass=ABCMeta): threadpool = ThreadPool(name='FileTransfers', min_threads=0, max_threads=100) threadpool.start() def __init__(self): self.stream = None self.session = None self._started = False self._session_started = False self._initialize_done = False self._initialize_successful = False def initialize(self, stream, session): self.stream = stream self.session = session notification_center = NotificationCenter() notification_center.add_observer(self, sender=stream) notification_center.add_observer(self, sender=session) notification_center.add_observer(self, sender=self) @property def filename(self): return self.stream.file_selector.name if self.stream is not None else None @abstractmethod def start(self): raise NotImplementedError @abstractmethod def end(self): raise NotImplementedError @abstractmethod def process_chunk(self, chunk): raise NotImplementedError def __terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, sender=self) try: self.stream.file_selector.fd.close() except AttributeError: # when self.stream.file_selector.fd is None pass except IOError: # we can get this if we try to close while another thread is reading from it pass self.stream = None self.session = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, None) if handler is not None: handler(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if not self._initialize_done: self.end() self.__terminate() def _NH_MediaStreamDidStart(self, notification): self._started = True self.start() def _NH_MediaStreamWillEnd(self, notification): if self._started: self.end() elif self._session_started: notification.center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Refused')) def _NH_SIPSessionWillStart(self, notification): self._session_started = True def _NH_SIPSessionDidFail(self, notification): if not self._session_started and self._initialize_successful: if notification.data.code == 487: reason = 'Cancelled' else: reason = notification.data.reason or 'Failed' notification.center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=reason)) def _NH_FileTransferHandlerDidInitialize(self, notification): self._initialize_done = True self._initialize_successful = True def _NH_FileTransferHandlerDidNotInitialize(self, notification): self._initialize_done = True self._initialize_successful = False def _NH_FileTransferHandlerDidEnd(self, notification): self.__terminate() class OffsetHeader(MSRPNamedHeader): name = 'Offset' type = IntegerHeaderType class EndTransfer(metaclass=MarkerType): pass class IncomingFileTransferHandler(FileTransferHandler): metadata = FileTransfersMetadata() def __init__(self): super(IncomingFileTransferHandler, self).__init__() #self.hash = sha1() #TODO this fails - adi self.hash = hashlib.sha1() self.queue = Queue() self.offset = 0 self.received_chunks = 0 @property def save_directory(self): return self.__dict__.get('save_directory') @save_directory.setter def save_directory(self, value): if self.stream is not None: raise AttributeError('cannot set save_directory, transfer is in progress') self.__dict__['save_directory'] = value def initialize(self, stream, session): super(IncomingFileTransferHandler, self).initialize(stream, session) try: directory = self.save_directory or SIPSimpleSettings().file_transfer.directory.normalized makedirs(directory) with self.metadata as metadata: try: prev_file = metadata.pop(stream.file_selector.hash.lower()) mtime = os.path.getmtime(prev_file.filename) if mtime != prev_file.mtime: raise ValueError('file was modified') filename = os.path.join(directory, os.path.basename(stream.file_selector.name)) try: os.link(prev_file.filename, filename) except (AttributeError, OSError): stream.file_selector.name = prev_file.filename else: stream.file_selector.name = filename unlink(prev_file.filename) stream.file_selector.fd = openfile(stream.file_selector.name, 'ab') # open doesn't seek to END in append mode on win32 until first write, but openfile does self.offset = stream.file_selector.fd.tell() self.hash = prev_file.partial_hash except (KeyError, EnvironmentError, ValueError): for name in UniqueFilenameGenerator.generate(os.path.join(directory, os.path.basename(stream.file_selector.name))): try: stream.file_selector.fd = openfile(name, 'xb') except FileExistsError: continue else: stream.file_selector.name = name break except Exception as e: NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason=str(e))) else: NotificationCenter().post_notification('FileTransferHandlerDidInitialize', sender=self) def end(self): self.queue.put(EndTransfer) def process_chunk(self, chunk): if chunk.method == 'SEND': if not self.received_chunks and chunk.byte_range.start == 1: self.stream.file_selector.fd.truncate(0) self.stream.file_selector.fd.seek(0) #self.hash = sha1() #TODO this fails - adi self.hash = hashlib.sha1() self.offset = 0 self.received_chunks += 1 self.queue.put(chunk) elif chunk.method == 'FILE_OFFSET': if self.received_chunks > 0: response = make_response(chunk, 413, 'Unwanted message') else: offset = self.stream.file_selector.fd.tell() response = make_response(chunk, 200, 'OK') response.add_header(OffsetHeader(offset)) self.stream.msrp_session.send_chunk(response) @run_in_threadpool(FileTransferHandler.threadpool) def start(self): notification_center = NotificationCenter() notification_center.post_notification('FileTransferHandlerDidStart', sender=self) file_selector = self.stream.file_selector fd = file_selector.fd while True: chunk = self.queue.get() if chunk is EndTransfer: break - data = chunk.data.encode() + data = chunk.data try: fd.write(data) except EnvironmentError as e: fd.close() notification_center.post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e))) notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=str(e))) return self.hash.update(data) self.offset += chunk.size transferred_bytes = chunk.byte_range.start + chunk.size - 1 total_bytes = file_selector.size = chunk.byte_range.total notification_center.post_notification('FileTransferHandlerProgress', sender=self, data=NotificationData(transferred_bytes=transferred_bytes, total_bytes=total_bytes)) if transferred_bytes == total_bytes: break fd.close() # Transfer is finished if self.offset != self.stream.file_selector.size: notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Incomplete file')) return if self.hash != self.stream.file_selector.hash: unlink(self.filename) # something got corrupted, better delete the file notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='File hash mismatch')) return notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=False, reason=None)) def _NH_MediaStreamDidNotInitialize(self, notification): if self.stream.file_selector.fd is not None: position = self.stream.file_selector.fd.tell() self.stream.file_selector.fd.close() if position == 0: unlink(self.stream.file_selector.name) super(IncomingFileTransferHandler, self)._NH_MediaStreamDidNotInitialize(notification) def _NH_FileTransferHandlerDidEnd(self, notification): if notification.data.error and self.stream.file_selector.hash is not None: if os.path.getsize(self.stream.file_selector.name) == 0: unlink(self.stream.file_selector.name) else: with self.metadata as metadata: entry = FileMetadataEntry.from_selector(self.stream.file_selector) entry.partial_hash = self.hash metadata[entry.hash] = entry super(IncomingFileTransferHandler, self)._NH_FileTransferHandlerDidEnd(notification) class OutgoingFileTransferHandler(FileTransferHandler): file_part_size = 64*1024 def __init__(self): super(OutgoingFileTransferHandler, self).__init__() self.stop_event = Event() self.finished_event = Event() self.file_offset_event = Event() self.message_id = '%x' % random.getrandbits(64) self.offset = 0 def initialize(self, stream, session): super(OutgoingFileTransferHandler, self).initialize(stream, session) if stream.file_selector.fd is None: NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='file descriptor not specified')) return if stream.file_selector.size == 0: NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='file is empty')) return if stream.file_selector.hash is None: self._calculate_file_hash() else: NotificationCenter().post_notification('FileTransferHandlerDidInitialize', sender=self) @run_in_threadpool(FileTransferHandler.threadpool) def _calculate_file_hash(self): file_hash = hashlib.sha1() processed = 0 notification_center = NotificationCenter() notification_center.post_notification('FileTransferHandlerHashProgress', sender=self, data=NotificationData(processed=0, total=self.stream.file_selector.size)) file_selector = self.stream.file_selector fd = file_selector.fd while not self.stop_event.is_set(): try: content = fd.read(self.file_part_size) except EnvironmentError as e: fd.close() notification_center.post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason=str(e))) return if not content: file_selector.hash = file_hash notification_center.post_notification('FileTransferHandlerDidInitialize', sender=self) break file_hash.update(content) processed += len(content) notification_center.post_notification('FileTransferHandlerHashProgress', sender=self, data=NotificationData(processed=processed, total=file_selector.size)) else: fd.close() notification_center.post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='Interrupted transfer')) def end(self): self.stop_event.set() self.file_offset_event.set() # in case we are busy waiting on it @run_in_threadpool(FileTransferHandler.threadpool) def start(self): notification_center = NotificationCenter() notification_center.post_notification('FileTransferHandlerDidStart', sender=self) if self.stream.file_offset_supported: self._send_file_offset_chunk() self.file_offset_event.wait() finished = False failure_reason = None fd = self.stream.file_selector.fd fd.seek(self.offset) try: while not self.stop_event.is_set(): try: data = fd.read(self.file_part_size) except EnvironmentError as e: failure_reason = str(e) break if not data: finished = True break self._send_chunk(data) finally: fd.close() if not finished: notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=failure_reason or 'Interrupted transfer')) return # Wait until the stream ends or we get all reports self.stop_event.wait() if self.finished_event.is_set(): notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=False, reason=None)) else: notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Incomplete transfer')) def _on_transaction_response(self, response): if self.stop_event.is_set(): return if response.code != 200: NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=response.comment)) self.end() @run_in_twisted_thread def _send_chunk(self, data): if self.stop_event.is_set(): return data_len = len(data) chunk = self.stream.msrp.make_send_request(message_id=self.message_id, data=data, start=self.offset+1, end=self.offset+data_len, length=self.stream.file_selector.size) chunk.add_header(ContentTypeHeader(self.stream.file_selector.type)) chunk.add_header(SuccessReportHeader('yes')) chunk.add_header(FailureReportHeader('yes')) try: self.stream.msrp_session.send_chunk(chunk, response_cb=self._on_transaction_response) except Exception as e: NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e))) else: self.offset += data_len @run_in_twisted_thread def _send_file_offset_chunk(self): def response_cb(response): if not self.stop_event.is_set() and response.code == 200: try: offset = response.headers['Offset'].decoded except (KeyError, HeaderParsingError): offset = 0 self.offset = offset self.file_offset_event.set() if self.stop_event.is_set(): self.file_offset_event.set() return chunk = self.stream.msrp.make_request('FILE_OFFSET') # TODO: _ is illegal in MSRP method names according to RFC 4975 try: self.stream.msrp_session.send_chunk(chunk, response_cb=response_cb) except Exception as e: NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e))) def process_chunk(self, chunk): # here we process the REPORT chunks notification_center = NotificationCenter() if chunk.status.code == 200: transferred_bytes = chunk.byte_range.end total_bytes = chunk.byte_range.total notification_center.post_notification('FileTransferHandlerProgress', sender=self, data=NotificationData(transferred_bytes=transferred_bytes, total_bytes=total_bytes)) if transferred_bytes == total_bytes: self.finished_event.set() self.end() else: notification_center.post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=chunk.status.comment)) self.end() class FileTransferMSRPSession(MSRPSession): def _handle_incoming_FILE_OFFSET(self, chunk): self._on_incoming_cb(chunk) class FileTransferStream(MSRPStreamBase): type = 'file-transfer' priority = 10 msrp_session_class = FileTransferMSRPSession media_type = 'message' accept_types = ['*'] accept_wrapped_types = None IncomingTransferHandler = IncomingFileTransferHandler OutgoingTransferHandler = OutgoingFileTransferHandler def __init__(self, file_selector, direction, transfer_id=RandomID): if direction not in ('sendonly', 'recvonly'): raise ValueError("direction must be one of 'sendonly' or 'recvonly'") super(FileTransferStream, self).__init__(direction=direction) self.file_selector = file_selector self.transfer_id = transfer_id if transfer_id is not RandomID else str(uuid.uuid4()) if direction == 'sendonly': self.handler = self.OutgoingTransferHandler() else: self.handler = self.IncomingTransferHandler() @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): remote_stream = remote_sdp.media[stream_index] if remote_stream.media != b'message': raise UnknownStreamError if b'file-selector' not in remote_stream.attributes: raise UnknownStreamError expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport == 'tls' else 'TCP/MSRP' if remote_stream.transport != expected_transport.encode(): raise InvalidStreamError("expected %s transport in file transfer stream, got %s" % (expected_transport, remote_stream.transport)) if remote_stream.formats != [b'*']: raise InvalidStreamError("wrong format list specified") file_selector_attr = remote_stream.attributes.getfirst(b'file-selector') try: file_selector = FileSelector.parse(file_selector_attr.decode()) except Exception as e: raise InvalidStreamError("error parsing file-selector: {}".format(e)) transfer_id = remote_stream.attributes.getfirst(b'file-transfer-id', None) transfer_id = transfer_id.decode() if transfer_id else None try: if remote_stream.direction == b'sendonly': stream = cls(file_selector, 'recvonly', transfer_id) elif remote_stream.direction == b'recvonly': stream = cls(file_selector, 'sendonly', transfer_id) else: raise InvalidStreamError("wrong stream direction specified") except Exception as e: traceback.print_exc() stream.remote_role = remote_stream.attributes.getfirst(b'setup', b'active') return stream def initialize(self, session, direction): self._initialize_args = session, direction NotificationCenter().add_observer(self, sender=self.handler) self.handler.initialize(self, session) def _create_local_media(self, uri_path): local_media = super(FileTransferStream, self)._create_local_media(uri_path) - local_media.attributes.append(SDPAttribute(b'file-selector', self.file_selector.sdp_repr.encode())) + local_media.attributes.append(SDPAttribute(b'file-selector', self.file_selector.sdp_repr)) local_media.attributes.append(SDPAttribute(b'x-file-offset', b'')) if self.transfer_id is not None: local_media.attributes.append(SDPAttribute(b'file-transfer-id', self.transfer_id.encode())) return local_media @property def file_offset_supported(self): try: return 'x-file-offset' in self.remote_media.attributes except AttributeError: return False @run_in_twisted_thread def _NH_FileTransferHandlerDidInitialize(self, notification): session, direction = self._initialize_args del self._initialize_args if not self._done: super(FileTransferStream, self).initialize(session, direction) @run_in_twisted_thread def _NH_FileTransferHandlerDidNotInitialize(self, notification): del self._initialize_args if not self._done: notification.center.post_notification('MediaStreamDidNotInitialize', sender=self, data=notification.data) @run_in_twisted_thread def _NH_FileTransferHandlerError(self, notification): self._failure_reason = notification.data.error notification.center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='transferring', reason=self._failure_reason)) def _NH_MediaStreamDidNotInitialize(self, notification): notification.center.remove_observer(self, sender=self.handler) def _NH_MediaStreamWillEnd(self, notification): notification.center.remove_observer(self, sender=self.handler) def _handle_REPORT(self, chunk): # in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message. self.handler.process_chunk(chunk) def _handle_SEND(self, chunk): notification_center = NotificationCenter() if chunk.size == 0: # keep-alive self.msrp_session.send_report(chunk, 200, 'OK') return if self.direction=='sendonly': self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if chunk.content_type.lower() == 'message/cpim': # In order to properly support the CPIM wrapper, msrplib needs to be refactored. -Luci self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') self._failure_reason = "CPIM wrapper is not supported" notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason)) return try: self.msrp_session.send_report(chunk, 200, 'OK') except Exception: pass # Best effort approach: even if we couldn't send the REPORT keep writing the chunks, we might have them all -Saul self.handler.process_chunk(chunk) def _handle_FILE_OFFSET(self, chunk): if self.direction != 'recvonly': response = make_response(chunk, 413, 'Unwanted message') self.msrp_session.send_chunk(response) return self.handler.process_chunk(chunk)