diff --git a/sipsimple/streams/msrp/__init__.py b/sipsimple/streams/msrp/__init__.py index 4e092246..e78fedb0 100644 --- a/sipsimple/streams/msrp/__init__.py +++ b/sipsimple/streams/msrp/__init__.py @@ -1,409 +1,415 @@ """ Handling of MSRP media streams according to RFC4975, RFC4976, RFC5547 and RFC3994. """ __all__ = ['MSRPStreamError', 'MSRPStreamBase'] import traceback from application.notification import NotificationCenter, NotificationData, IObserver from application.python import Null from application.system import host from twisted.internet.error import ConnectionDone from zope.interface import implementer from eventlib import api from msrplib.connect import DirectConnector, DirectAcceptor, RelayConnection, MSRPRelaySettings from msrplib.protocol import URI from msrplib.session import contains_mime_type from sipsimple.account import Account, BonjourAccount from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SDPAttribute, SDPConnection, SDPMediaStream from sipsimple.streams import IMediaStream, MediaStreamType, StreamError from sipsimple.threading.green import run_in_green_thread class MSRPStreamError(StreamError): pass @implementer(IMediaStream, IObserver) class MSRPStreamBase(object, metaclass=MediaStreamType): # Attributes that need to be defined by each MSRP stream type type = None priority = None msrp_session_class = None media_type = None accept_types = None accept_wrapped_types = None # These attributes are always False for any MSRP stream hold_supported = False on_hold = False on_hold_by_local = False on_hold_by_remote = False def __new__(cls, *args, **kw): if cls is MSRPStreamBase: raise TypeError("MSRPStreamBase cannot be instantiated directly") return object.__new__(cls) def __init__(self, direction='sendrecv'): self.direction = direction self.greenlet = None self.local_media = None self.remote_media = None self.msrp = None # Placeholder for the MSRPTransport that will be set when started self.msrp_connector = None self.cpim_enabled = None # Boolean value. None means it was not negotiated yet self.session = None self.msrp_session = None self.shutting_down = False self.local_role = None self.remote_role = None self.transport = None self.remote_accept_types = None self.remote_accept_wrapped_types = None self._initialize_done = False self._done = False self._failure_reason = None @property def local_uri(self): msrp = self.msrp or self.msrp_connector return msrp.local_uri if msrp is not None else None def _create_local_media(self, uri_path): transport = "TCP/TLS/MSRP" if uri_path[-1].use_tls else "TCP/MSRP" attributes = [] path = " ".join(str(uri) for uri in uri_path) attributes.append(SDPAttribute(b"path", path.encode())) if self.direction not in [None, 'sendrecv']: attributes.append(SDPAttribute(self.direction.encode(), b'')) if self.accept_types is not None: a_types = " ".join(self.accept_types) attributes.append(SDPAttribute(b"accept-types", a_types.encode())) if self.accept_wrapped_types is not None: a_w_types = " ".join(self.accept_wrapped_types) attributes.append(SDPAttribute(b"accept-wrapped-types", a_w_types.encode())) attributes.append(SDPAttribute(b"setup", self.local_role.encode() if self.local_role else None)) local_ip = uri_path[-1].host connection = SDPConnection(local_ip.encode()) return SDPMediaStream(self.media_type.encode(), uri_path[-1].port or 2855, transport.encode(), connection=connection, formats=[b"*"], attributes=attributes) # The public API (the IMediaStream interface) # noinspection PyUnusedLocal def get_local_media(self, remote_sdp=None, index=0): return self.local_media def new_from_sdp(self, session, remote_sdp, stream_index): raise NotImplementedError @run_in_green_thread def initialize(self, session, direction): self.greenlet = api.getcurrent() notification_center = NotificationCenter() notification_center.add_observer(self, sender=self) try: self.session = session self.transport = self.session.account.msrp.transport outgoing = direction == 'outgoing' logger = NotificationProxyLogger() if self.session.account is BonjourAccount(): if outgoing: self.msrp_connector = DirectConnector(logger=logger) self.local_role = 'active' else: if self.transport == 'tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger) self.local_role = 'passive' else: if self.session.account.msrp.connection_model == 'relay': if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' elif outgoing and not self.session.account.nat_traversal.use_msrp_relay_for_outbound: self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if self.session.account.nat_traversal.msrp_relay is None: relay_host = relay_port = None else: if self.transport != self.session.account.nat_traversal.msrp_relay.transport: raise MSRPStreamError("MSRP relay transport conflicts with MSRP transport setting") relay_host = self.session.account.nat_traversal.msrp_relay.host relay_port = self.session.account.nat_traversal.msrp_relay.port - relay = MSRPRelaySettings(domain=self.session.account.uri.host, - username=self.session.account.uri.user, - password=self.session.account.credentials.password, + relay = MSRPRelaySettings(domain=self.session.account.uri.host.decode(), + username=self.session.account.uri.user.decode(), + password=self.session.account.credentials.password.decode(), host=relay_host, port=relay_port, use_tls=self.transport=='tls') self.msrp_connector = RelayConnection(relay, 'passive', logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' else: if not outgoing and self.remote_role in ('actpass', 'passive'): # 'passive' not allowed by the RFC but play nice for interoperability. -Saul self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.local_role = 'active' else: if not outgoing and self.transport == 'tls' and None in (self.session.account.tls_credentials.cert, self.session.account.tls_credentials.key): raise MSRPStreamError("Cannot accept MSRP connection without a TLS certificate") self.msrp_connector = DirectAcceptor(logger=logger, use_sessmatch=True) self.local_role = 'actpass' if outgoing else 'passive' full_local_path = self.msrp_connector.prepare(local_uri=URI(host=host.default_ip, port=0, use_tls=self.transport=='tls', credentials=self.session.account.tls_credentials)) self.local_media = self._create_local_media(full_local_path) except Exception as e: traceback.print_exc() notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=str(e))) else: notification_center.post_notification('MediaStreamDidInitialize', sender=self) finally: self._initialize_done = True self.greenlet = None # noinspection PyUnusedLocal @run_in_green_thread def start(self, local_sdp, remote_sdp, stream_index): self.greenlet = api.getcurrent() notification_center = NotificationCenter() context = 'sdp_negotiation' try: remote_media = remote_sdp.media[stream_index] self.remote_media = remote_media self.remote_accept_types = remote_media.attributes.getfirst(b'accept-types', b'').decode().split() self.remote_accept_wrapped_types = remote_media.attributes.getfirst(b'accept-wrapped-types', b'').decode().split() self.cpim_enabled = contains_mime_type(self.accept_types, 'message/cpim') and contains_mime_type(self.remote_accept_types, 'message/cpim') remote_uri_path = remote_media.attributes.getfirst(b'path') if remote_uri_path is None: raise AttributeError("remote SDP media does not have 'path' attribute") full_remote_path = [URI.parse(uri) for uri in remote_uri_path.decode().split()] remote_transport = 'tls' if full_remote_path[0].use_tls else 'tcp' if self.transport != remote_transport: raise MSRPStreamError("remote transport ('%s') different from local transport ('%s')" % (remote_transport, self.transport)) if isinstance(self.session.account, Account) and self.local_role == 'actpass': remote_setup = remote_media.attributes.getfirst('setup', 'passive') - remote_setup = remote_setup.decode() if remote_setup else None if remote_setup == 'passive': # If actpass is offered connectors are always started as passive # We need to switch to active if the remote answers with passive if self.session.account.msrp.connection_model == 'relay': self.msrp_connector.mode = 'active' else: local_uri = self.msrp_connector.local_uri logger = self.msrp_connector.logger self.msrp_connector = DirectConnector(logger=logger, use_sessmatch=True) self.msrp_connector.prepare(local_uri) context = 'start' self.msrp = self.msrp_connector.complete(full_remote_path) if self.msrp_session_class is not None: self.msrp_session = self.msrp_session_class(self.msrp, accept_types=self.accept_types, on_incoming_cb=self._handle_incoming, automatic_reports=False) self.msrp_connector = None except Exception as e: self._failure_reason = str(e) traceback.print_exc() notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context=context, reason=self._failure_reason)) else: notification_center.post_notification('MediaStreamDidStart', sender=self) finally: self.greenlet = None def deactivate(self): self.shutting_down = True @run_in_green_thread def end(self): if self._done: return self._done = True notification_center = NotificationCenter() if not self._initialize_done: # we are in the middle of initialize() try: msrp_connector = self.msrp_connector if self.greenlet is not None: api.kill(self.greenlet) if msrp_connector is not None: msrp_connector.cleanup() finally: notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason='Interrupted')) notification_center.remove_observer(self, sender=self) self.msrp_connector = None self.greenlet = None else: notification_center.post_notification('MediaStreamWillEnd', sender=self) msrp = self.msrp msrp_session = self.msrp_session msrp_connector = self.msrp_connector try: if self.greenlet is not None: api.kill(self.greenlet) if msrp_session is not None: msrp_session.shutdown() elif msrp is not None: msrp.loseConnection(wait=False) if msrp_connector is not None: msrp_connector.cleanup() finally: notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason)) notification_center.remove_observer(self, sender=self) self.msrp = None self.msrp_session = None self.msrp_connector = None self.session = None self.greenlet = None # noinspection PyMethodMayBeStatic,PyUnusedLocal def validate_update(self, remote_sdp, stream_index): return True # TODO def update(self, local_sdp, remote_sdp, stream_index): pass # TODO def hold(self): pass def unhold(self): pass def reset(self, stream_index): pass # Internal IObserver interface def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) # Internal message handlers def _handle_incoming(self, chunk=None, error=None): notification_center = NotificationCenter() if error is not None: if self.shutting_down and isinstance(error.value, ConnectionDone): return self._failure_reason = error.getErrorMessage() notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason)) elif chunk is not None: method_handler = getattr(self, '_handle_%s' % chunk.method, None) if method_handler is not None: method_handler(chunk) def _handle_REPORT(self, chunk): pass def _handle_SEND(self, chunk): pass # temporary solution. to be replaced later by a better logging system in msrplib -Dan # class ChunkInfo(object): __slots__ = 'content_type', 'header', 'footer', 'data' def __init__(self, content_type, header='', footer='', data=''): self.content_type = content_type self.header = header self.footer = footer self.data = data def __repr__(self): return "{0.__class__.__name__}(content_type={0.content_type!r}, header={0.header!r}, footer={0.footer!r}, data={0.data!r})".format(self) @property def content(self): return self.header + self.data + self.footer @property def normalized_content(self): - if not self.data: - return self.header + self.footer + header = self.header.decode() if isinstance(self.header, bytes) else self.header + footer = self.footer.decode() if isinstance(self.footer, bytes) else self.footer + try: + data = self.data.decode() if isinstance(self.data, bytes) else self.data + except UnicodeDecodeError: + data = '<<>>' + + if not data: + return header + footer elif self.content_type == 'message/cpim': - headers, sep, body = self.data.partition('\r\n\r\n') + headers, sep, body = data.partition('\r\n\r\n') if not sep: - return self.header + self.data + self.footer + return header + data + footer mime_headers, mime_sep, mime_body = body.partition('\n\n') if not mime_sep: - return self.header + self.data + self.footer + return header + data + footer for mime_header in mime_headers.lower().splitlines(): if mime_header.startswith('content-type:'): wrapped_content_type = mime_header[13:].partition(';')[0].strip() break else: wrapped_content_type = None if wrapped_content_type is None or wrapped_content_type == 'application/im-iscomposing+xml' or wrapped_content_type.startswith(('text/', 'message/')): - data = self.data + data = data else: data = headers + sep + mime_headers + mime_sep + '<<>>' - return self.header + data + self.footer + return header + data + footer elif self.content_type is None or self.content_type == 'application/im-iscomposing+xml' or self.content_type.startswith(('text/', 'message/')): - return self.header + self.data + self.footer + return header + data + footer else: - return self.header + '<<>>' + self.footer + return header + '<<>>' + footer class NotificationProxyLogger(object): def __init__(self): from application import log self.level = log.level self.notification_center = NotificationCenter() self.log_settings = SIPSimpleSettings().logs def received_chunk(self, data, transport): if self.log_settings.trace_msrp: chunk_info = ChunkInfo(data.content_type, header=data.chunk_header, footer=data.chunk_footer, data=data.data) notification_data = NotificationData(direction='incoming', local_address=transport.getHost(), remote_address=transport.getPeer(), data=chunk_info.normalized_content, illegal=False) self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data) def sent_chunk(self, data, transport): if self.log_settings.trace_msrp: chunk_info = ChunkInfo(data.content_type, header=data.encoded_header, footer=data.encoded_footer, data=data.data) notification_data = NotificationData(direction='outgoing', local_address=transport.getHost(), remote_address=transport.getPeer(), data=chunk_info.normalized_content, illegal=False) self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data) def received_illegal_data(self, data, transport): if self.log_settings.trace_msrp: notification_data = NotificationData(direction='incoming', local_address=transport.getHost(), remote_address=transport.getPeer(), data=data, illegal=True) self.notification_center.post_notification('MSRPTransportTrace', sender=transport, data=notification_data) def debug(self, message, *args, **kw): pass def info(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.INFO)) def warning(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.WARNING)) warn = warning def error(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.ERROR)) def exception(self, message='', *args, **kw): if self.log_settings.trace_msrp: message = message % args if args else message exception = traceback.format_exc() self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message + '\n' + exception if message else exception, level=self.level.ERROR)) def critical(self, message, *args, **kw): if self.log_settings.trace_msrp: self.notification_center.post_notification('MSRPLibraryLog', data=NotificationData(message=message % args if args else message, level=self.level.CRITICAL)) fatal = critical from sipsimple.streams.msrp import chat, filetransfer, screensharing diff --git a/sipsimple/streams/msrp/filetransfer.py b/sipsimple/streams/msrp/filetransfer.py index ff616143..ce991012 100644 --- a/sipsimple/streams/msrp/filetransfer.py +++ b/sipsimple/streams/msrp/filetransfer.py @@ -1,755 +1,760 @@ """ This module provides classes to parse and generate SDP related to SIP sessions that negotiate File Transfer. """ __all__ = ['FileTransferStream', 'FileSelector'] import pickle as pickle import hashlib import mimetypes import os import random import re import time import uuid from abc import ABCMeta, abstractmethod from application.notification import NotificationCenter, NotificationData, IObserver from application.python.threadpool import ThreadPool, run_in_threadpool from application.python.types import MarkerType from application.system import FileExistsError, makedirs, openfile, unlink from itertools import count from msrplib.protocol import FailureReportHeader, SuccessReportHeader, ContentTypeHeader, IntegerHeaderType, MSRPNamedHeader, HeaderParsingError from msrplib.session import MSRPSession from msrplib.transport import make_response from queue import Queue from threading import Event, Lock from zope.interface import implementer from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import SDPAttribute from sipsimple.storage import ISIPSimpleApplicationDataStorage from sipsimple.streams import InvalidStreamError, UnknownStreamError from sipsimple.streams.msrp import MSRPStreamBase from sipsimple.threading import run_in_twisted_thread, run_in_thread #sha1 is much faster on large file than python native implementation from sipsimple.util import sha1 HASH = type(hashlib.sha1()) class RandomID(metaclass=MarkerType): pass class FileSelectorHash(str): _hash_re = re.compile(r'^sha-1(:[0-9A-F]{2}){20}$') _byte_re = re.compile(r'..') def __new__(cls, value): if isinstance(value, str): if value.startswith('sha1:'): # backward compatibility hack (sort of). value = 'sha-1' + value[len('sha1'):] if not cls._hash_re.match(value): raise ValueError("Invalid hash value: {!r}".format(value)) return super(FileSelectorHash, cls).__new__(cls, value) elif isinstance(value, (HASH, sha1)): return super(FileSelectorHash, cls).__new__(cls, cls.encode_hash(value)) else: raise ValueError("Invalid hash value: {!r}".format(value)) def __eq__(self, other): if isinstance(other, str): return super(FileSelectorHash, self).__eq__(other) elif isinstance(other, (HASH, sha1)) and other.name.lower() == 'sha1': return super(FileSelectorHash, self).__eq__(self.encode_hash(other)) else: return NotImplemented def __ne__(self, other): return not self == other @classmethod def encode_hash(cls, hash_instance): if hash_instance.name.lower() != 'sha1': raise TypeError("Invalid hash type: {.name} (only sha1 hashes are supported).".format(hash_instance)) # unexpected as it may be, using a regular expression is the fastest method to do this return 'sha-1:' + ':'.join(cls._byte_re.findall(hash_instance.hexdigest().upper())) class FileSelector(object): _name_re = re.compile(r'name:"([^"]+)"') _size_re = re.compile(r'size:(\d+)') _type_re = re.compile(r'type:([^ ]+)') _hash_re = re.compile(r'hash:([^ ]+)') def __init__(self, name=None, type=None, size=None, hash=None, fd=None): # If present, hash should be a sha1 object or a string in the form: sha-1:72:24:5F:E8:65:3D:DA:F3:71:36:2F:86:D4:71:91:3E:E4:A2:CE:2E # According to the specification, only sha1 is supported ATM. self.name = name self.type = type self.size = size self.hash = hash self.fd = fd @property def hash(self): return self.__dict__['hash'] @hash.setter def hash(self, value): self.__dict__['hash'] = None if value is None else FileSelectorHash(value) @classmethod def parse(cls, string): + if isinstance(string, bytes): + string = string.decode() name_match = cls._name_re.search(string) size_match = cls._size_re.search(string) type_match = cls._type_re.search(string) hash_match = cls._hash_re.search(string) name = name_match and name_match.group(1) size = size_match and int(size_match.group(1)) type = type_match and type_match.group(1) hash = hash_match and hash_match.group(1) return cls(name, type, size, hash) @classmethod def for_file(cls, path, type=None, hash=None): name = str(path) fd = open(name, 'rb') size = os.fstat(fd.fileno()).st_size if type is None: mime_type, encoding = mimetypes.guess_type(name) if encoding is not None: type = 'application/x-%s' % encoding elif mime_type is not None: type = mime_type else: type = 'application/octet-stream' return cls(name, type, size, hash, fd) @property def sdp_repr(self): items = [('name', self.name and '"%s"' % os.path.basename(self.name)), ('type', self.type), ('size', self.size), ('hash', self.hash)] sdp = ' '.join('%s:%s' % (name, value) for name, value in items if value is not None) return sdp.encode() class UniqueFilenameGenerator(object): @classmethod def generate(cls, name): yield name prefix, extension = os.path.splitext(name) for x in count(1): yield "%s-%d%s" % (prefix, x, extension) class FileMetadataEntry(object): def __init__(self, hash, filename, partial_hash=None): self.hash = hash self.filename = filename self.mtime = os.path.getmtime(self.filename) self.partial_hash = partial_hash @classmethod def from_selector(cls, file_selector): return cls(file_selector.hash.lower(), file_selector.name) class FileTransfersMetadata(object): __filename__ = 'transfer_metadata' __lifetime__ = 60*60*24*7 def __init__(self): self.data = {} self.lock = Lock() self.loaded = False self.directory = None def _load(self): if self.loaded: return from sipsimple.application import SIPApplication if ISIPSimpleApplicationDataStorage.providedBy(SIPApplication.storage): self.directory = SIPApplication.storage.directory if self.directory is not None: try: with open(os.path.join(self.directory, self.__filename__), 'rb') as f: data = pickle.loads(f.read()) except Exception: data = {} now = time.time() for hash, entry in list(data.items()): try: mtime = os.path.getmtime(entry.filename) except OSError: data.pop(hash) else: if mtime != entry.mtime or now - mtime > self.__lifetime__: data.pop(hash) self.data.update(data) self.loaded = True @run_in_thread('file-io') def _save(self, data): if self.directory is not None: with open(os.path.join(self.directory, self.__filename__), 'wb') as f: f.write(data) def __enter__(self): self.lock.acquire() self._load() return self.data def __exit__(self, exc_type, exc_val, exc_tb): if None is exc_type is exc_val is exc_tb: self._save(pickle.dumps(self.data)) self.lock.release() @implementer(IObserver) class FileTransferHandler(object, metaclass=ABCMeta): threadpool = ThreadPool(name='FileTransfers', min_threads=0, max_threads=100) threadpool.start() def __init__(self): self.stream = None self.session = None self._started = False self._session_started = False self._initialize_done = False self._initialize_successful = False def initialize(self, stream, session): self.stream = stream self.session = session notification_center = NotificationCenter() notification_center.add_observer(self, sender=stream) notification_center.add_observer(self, sender=session) notification_center.add_observer(self, sender=self) @property def filename(self): return self.stream.file_selector.name if self.stream is not None else None @abstractmethod def start(self): raise NotImplementedError @abstractmethod def end(self): raise NotImplementedError @abstractmethod def process_chunk(self, chunk): raise NotImplementedError def __terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.stream) notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, sender=self) try: self.stream.file_selector.fd.close() except AttributeError: # when self.stream.file_selector.fd is None pass except IOError: # we can get this if we try to close while another thread is reading from it pass self.stream = None self.session = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, None) if handler is not None: handler(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if not self._initialize_done: self.end() self.__terminate() def _NH_MediaStreamDidStart(self, notification): self._started = True self.start() def _NH_MediaStreamWillEnd(self, notification): if self._started: self.end() elif self._session_started: notification.center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Refused')) def _NH_SIPSessionWillStart(self, notification): self._session_started = True def _NH_SIPSessionDidFail(self, notification): if not self._session_started and self._initialize_successful: if notification.data.code == 487: reason = 'Cancelled' else: reason = notification.data.reason or 'Failed' notification.center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=reason)) def _NH_FileTransferHandlerDidInitialize(self, notification): self._initialize_done = True self._initialize_successful = True def _NH_FileTransferHandlerDidNotInitialize(self, notification): self._initialize_done = True self._initialize_successful = False def _NH_FileTransferHandlerDidEnd(self, notification): self.__terminate() class OffsetHeader(MSRPNamedHeader): name = 'Offset' type = IntegerHeaderType class EndTransfer(metaclass=MarkerType): pass class IncomingFileTransferHandler(FileTransferHandler): metadata = FileTransfersMetadata() def __init__(self): super(IncomingFileTransferHandler, self).__init__() #self.hash = sha1() #TODO this fails - adi self.hash = hashlib.sha1() self.queue = Queue() self.offset = 0 self.received_chunks = 0 @property def save_directory(self): return self.__dict__.get('save_directory') @save_directory.setter def save_directory(self, value): if self.stream is not None: raise AttributeError('cannot set save_directory, transfer is in progress') self.__dict__['save_directory'] = value def initialize(self, stream, session): super(IncomingFileTransferHandler, self).initialize(stream, session) try: directory = self.save_directory or SIPSimpleSettings().file_transfer.directory.normalized makedirs(directory) with self.metadata as metadata: try: prev_file = metadata.pop(stream.file_selector.hash.lower()) mtime = os.path.getmtime(prev_file.filename) if mtime != prev_file.mtime: raise ValueError('file was modified') filename = os.path.join(directory, os.path.basename(stream.file_selector.name)) try: os.link(prev_file.filename, filename) except (AttributeError, OSError): stream.file_selector.name = prev_file.filename else: stream.file_selector.name = filename unlink(prev_file.filename) stream.file_selector.fd = openfile(stream.file_selector.name, 'ab') # open doesn't seek to END in append mode on win32 until first write, but openfile does self.offset = stream.file_selector.fd.tell() self.hash = prev_file.partial_hash except (KeyError, EnvironmentError, ValueError): for name in UniqueFilenameGenerator.generate(os.path.join(directory, os.path.basename(stream.file_selector.name))): try: stream.file_selector.fd = openfile(name, 'xb') except FileExistsError: continue else: stream.file_selector.name = name break except Exception as e: NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason=str(e))) else: NotificationCenter().post_notification('FileTransferHandlerDidInitialize', sender=self) def end(self): self.queue.put(EndTransfer) def process_chunk(self, chunk): if chunk.method == 'SEND': if not self.received_chunks and chunk.byte_range.start == 1: self.stream.file_selector.fd.truncate(0) self.stream.file_selector.fd.seek(0) #self.hash = sha1() #TODO this fails - adi self.hash = hashlib.sha1() self.offset = 0 self.received_chunks += 1 self.queue.put(chunk) elif chunk.method == 'FILE_OFFSET': if self.received_chunks > 0: response = make_response(chunk, 413, 'Unwanted message') else: offset = self.stream.file_selector.fd.tell() response = make_response(chunk, 200, 'OK') response.add_header(OffsetHeader(offset)) self.stream.msrp_session.send_chunk(response) @run_in_threadpool(FileTransferHandler.threadpool) def start(self): notification_center = NotificationCenter() notification_center.post_notification('FileTransferHandlerDidStart', sender=self) file_selector = self.stream.file_selector fd = file_selector.fd while True: chunk = self.queue.get() if chunk is EndTransfer: break data = chunk.data try: fd.write(data) except EnvironmentError as e: fd.close() notification_center.post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e))) notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=str(e))) return self.hash.update(data) self.offset += chunk.size transferred_bytes = chunk.byte_range.start + chunk.size - 1 total_bytes = file_selector.size = chunk.byte_range.total notification_center.post_notification('FileTransferHandlerProgress', sender=self, data=NotificationData(transferred_bytes=transferred_bytes, total_bytes=total_bytes)) if transferred_bytes == total_bytes: break fd.close() # Transfer is finished if self.offset != self.stream.file_selector.size: notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Incomplete file')) return if self.hash != self.stream.file_selector.hash: unlink(self.filename) # something got corrupted, better delete the file notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='File hash mismatch')) return notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=False, reason=None)) def _NH_MediaStreamDidNotInitialize(self, notification): if self.stream.file_selector.fd is not None: position = self.stream.file_selector.fd.tell() self.stream.file_selector.fd.close() if position == 0: unlink(self.stream.file_selector.name) super(IncomingFileTransferHandler, self)._NH_MediaStreamDidNotInitialize(notification) def _NH_FileTransferHandlerDidEnd(self, notification): if notification.data.error and self.stream.file_selector.hash is not None: if os.path.getsize(self.stream.file_selector.name) == 0: unlink(self.stream.file_selector.name) else: with self.metadata as metadata: entry = FileMetadataEntry.from_selector(self.stream.file_selector) entry.partial_hash = self.hash metadata[entry.hash] = entry super(IncomingFileTransferHandler, self)._NH_FileTransferHandlerDidEnd(notification) class OutgoingFileTransferHandler(FileTransferHandler): file_part_size = 64*1024 def __init__(self): super(OutgoingFileTransferHandler, self).__init__() self.stop_event = Event() self.finished_event = Event() self.file_offset_event = Event() self.message_id = '%x' % random.getrandbits(64) self.offset = 0 def initialize(self, stream, session): super(OutgoingFileTransferHandler, self).initialize(stream, session) if stream.file_selector.fd is None: NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='file descriptor not specified')) return if stream.file_selector.size == 0: NotificationCenter().post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='file is empty')) return if stream.file_selector.hash is None: self._calculate_file_hash() else: NotificationCenter().post_notification('FileTransferHandlerDidInitialize', sender=self) @run_in_threadpool(FileTransferHandler.threadpool) def _calculate_file_hash(self): file_hash = hashlib.sha1() processed = 0 notification_center = NotificationCenter() notification_center.post_notification('FileTransferHandlerHashProgress', sender=self, data=NotificationData(processed=0, total=self.stream.file_selector.size)) file_selector = self.stream.file_selector fd = file_selector.fd while not self.stop_event.is_set(): try: content = fd.read(self.file_part_size) except EnvironmentError as e: fd.close() notification_center.post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason=str(e))) return if not content: file_selector.hash = file_hash notification_center.post_notification('FileTransferHandlerDidInitialize', sender=self) break file_hash.update(content) processed += len(content) notification_center.post_notification('FileTransferHandlerHashProgress', sender=self, data=NotificationData(processed=processed, total=file_selector.size)) else: fd.close() notification_center.post_notification('FileTransferHandlerDidNotInitialize', sender=self, data=NotificationData(reason='Interrupted transfer')) def end(self): self.stop_event.set() self.file_offset_event.set() # in case we are busy waiting on it @run_in_threadpool(FileTransferHandler.threadpool) def start(self): notification_center = NotificationCenter() notification_center.post_notification('FileTransferHandlerDidStart', sender=self) if self.stream.file_offset_supported: self._send_file_offset_chunk() self.file_offset_event.wait() finished = False failure_reason = None fd = self.stream.file_selector.fd fd.seek(self.offset) try: while not self.stop_event.is_set(): try: data = fd.read(self.file_part_size) except EnvironmentError as e: failure_reason = str(e) break if not data: finished = True break self._send_chunk(data) finally: fd.close() if not finished: notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason=failure_reason or 'Interrupted transfer')) return # Wait until the stream ends or we get all reports self.stop_event.wait() if self.finished_event.is_set(): notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=False, reason=None)) else: notification_center.post_notification('FileTransferHandlerDidEnd', sender=self, data=NotificationData(error=True, reason='Incomplete transfer')) def _on_transaction_response(self, response): if self.stop_event.is_set(): return if response.code != 200: NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=response.comment)) self.end() @run_in_twisted_thread def _send_chunk(self, data): if self.stop_event.is_set(): return data_len = len(data) chunk = self.stream.msrp.make_send_request(message_id=self.message_id, data=data, start=self.offset+1, end=self.offset+data_len, length=self.stream.file_selector.size) chunk.add_header(ContentTypeHeader(self.stream.file_selector.type)) chunk.add_header(SuccessReportHeader('yes')) chunk.add_header(FailureReportHeader('yes')) try: self.stream.msrp_session.send_chunk(chunk, response_cb=self._on_transaction_response) except Exception as e: NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e))) else: self.offset += data_len @run_in_twisted_thread def _send_file_offset_chunk(self): def response_cb(response): if not self.stop_event.is_set() and response.code == 200: try: offset = response.headers['Offset'].decoded except (KeyError, HeaderParsingError): offset = 0 self.offset = offset self.file_offset_event.set() if self.stop_event.is_set(): self.file_offset_event.set() return chunk = self.stream.msrp.make_request('FILE_OFFSET') # TODO: _ is illegal in MSRP method names according to RFC 4975 try: self.stream.msrp_session.send_chunk(chunk, response_cb=response_cb) except Exception as e: NotificationCenter().post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=str(e))) def process_chunk(self, chunk): # here we process the REPORT chunks notification_center = NotificationCenter() if chunk.status.code == 200: transferred_bytes = chunk.byte_range.end total_bytes = chunk.byte_range.total notification_center.post_notification('FileTransferHandlerProgress', sender=self, data=NotificationData(transferred_bytes=transferred_bytes, total_bytes=total_bytes)) if transferred_bytes == total_bytes: self.finished_event.set() self.end() else: notification_center.post_notification('FileTransferHandlerError', sender=self, data=NotificationData(error=chunk.status.comment)) self.end() class FileTransferMSRPSession(MSRPSession): def _handle_incoming_FILE_OFFSET(self, chunk): self._on_incoming_cb(chunk) class FileTransferStream(MSRPStreamBase): type = 'file-transfer' priority = 10 msrp_session_class = FileTransferMSRPSession media_type = 'message' accept_types = ['*'] accept_wrapped_types = None IncomingTransferHandler = IncomingFileTransferHandler OutgoingTransferHandler = OutgoingFileTransferHandler def __init__(self, file_selector, direction, transfer_id=RandomID): if direction not in ('sendonly', 'recvonly'): raise ValueError("direction must be one of 'sendonly' or 'recvonly'") super(FileTransferStream, self).__init__(direction=direction) self.file_selector = file_selector self.transfer_id = transfer_id if transfer_id is not RandomID else str(uuid.uuid4()) if direction == 'sendonly': self.handler = self.OutgoingTransferHandler() else: self.handler = self.IncomingTransferHandler() @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): remote_stream = remote_sdp.media[stream_index] if remote_stream.media != b'message': raise UnknownStreamError if b'file-selector' not in remote_stream.attributes: raise UnknownStreamError expected_transport = 'TCP/TLS/MSRP' if session.account.msrp.transport == 'tls' else 'TCP/MSRP' if remote_stream.transport != expected_transport.encode(): raise InvalidStreamError("expected %s transport in file transfer stream, got %s" % (expected_transport, remote_stream.transport)) if remote_stream.formats != [b'*']: raise InvalidStreamError("wrong format list specified") file_selector_attr = remote_stream.attributes.getfirst(b'file-selector') try: - file_selector = FileSelector.parse(file_selector_attr.decode()) + file_selector = FileSelector.parse(file_selector_attr) except Exception as e: raise InvalidStreamError("error parsing file-selector: {}".format(e)) transfer_id = remote_stream.attributes.getfirst(b'file-transfer-id', None) transfer_id = transfer_id.decode() if transfer_id else None try: if remote_stream.direction == b'sendonly': stream = cls(file_selector, 'recvonly', transfer_id) elif remote_stream.direction == b'recvonly': stream = cls(file_selector, 'sendonly', transfer_id) else: raise InvalidStreamError("wrong stream direction specified") except Exception as e: traceback.print_exc() stream.remote_role = remote_stream.attributes.getfirst(b'setup', b'active') return stream def initialize(self, session, direction): self._initialize_args = session, direction NotificationCenter().add_observer(self, sender=self.handler) self.handler.initialize(self, session) def _create_local_media(self, uri_path): local_media = super(FileTransferStream, self)._create_local_media(uri_path) local_media.attributes.append(SDPAttribute(b'file-selector', self.file_selector.sdp_repr)) local_media.attributes.append(SDPAttribute(b'x-file-offset', b'')) if self.transfer_id is not None: local_media.attributes.append(SDPAttribute(b'file-transfer-id', self.transfer_id.encode())) return local_media @property def file_offset_supported(self): try: return 'x-file-offset' in self.remote_media.attributes except AttributeError: return False @run_in_twisted_thread def _NH_FileTransferHandlerDidInitialize(self, notification): session, direction = self._initialize_args del self._initialize_args if not self._done: super(FileTransferStream, self).initialize(session, direction) @run_in_twisted_thread def _NH_FileTransferHandlerDidNotInitialize(self, notification): del self._initialize_args if not self._done: notification.center.post_notification('MediaStreamDidNotInitialize', sender=self, data=notification.data) @run_in_twisted_thread def _NH_FileTransferHandlerError(self, notification): self._failure_reason = notification.data.error notification.center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='transferring', reason=self._failure_reason)) def _NH_MediaStreamDidNotInitialize(self, notification): notification.center.remove_observer(self, sender=self.handler) def _NH_MediaStreamWillEnd(self, notification): - notification.center.remove_observer(self, sender=self.handler) + try: + notification.center.remove_observer(self, sender=self.handler) + except KeyError: + pass def _handle_REPORT(self, chunk): # in theory, REPORT can come with Byte-Range which would limit the scope of the REPORT to the part of the message. self.handler.process_chunk(chunk) def _handle_SEND(self, chunk): notification_center = NotificationCenter() if chunk.size == 0: # keep-alive self.msrp_session.send_report(chunk, 200, 'OK') return if self.direction=='sendonly': self.msrp_session.send_report(chunk, 413, 'Unwanted Message') return if chunk.content_type.lower() == 'message/cpim': # In order to properly support the CPIM wrapper, msrplib needs to be refactored. -Luci self.msrp_session.send_report(chunk, 415, 'Invalid Content-Type') self._failure_reason = "CPIM wrapper is not supported" notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='reading', reason=self._failure_reason)) return try: self.msrp_session.send_report(chunk, 200, 'OK') except Exception: pass # Best effort approach: even if we couldn't send the REPORT keep writing the chunks, we might have them all -Saul self.handler.process_chunk(chunk) def _handle_FILE_OFFSET(self, chunk): if self.direction != 'recvonly': response = make_response(chunk, 413, 'Unwanted message') self.msrp_session.send_chunk(response) return self.handler.process_chunk(chunk)