diff --git a/sipsimple/core/_core.mediatransport.pxi b/sipsimple/core/_core.mediatransport.pxi index ee61d10a..a0496cfb 100644 --- a/sipsimple/core/_core.mediatransport.pxi +++ b/sipsimple/core/_core.mediatransport.pxi @@ -1,2558 +1,2558 @@ import sys from errno import EADDRINUSE # classes cdef class RTPTransport: def __cinit__(self, *args, **kwargs): cdef int status cdef pj_pool_t *pool cdef bytes pool_name cdef char* c_pool_name cdef PJSIPUA ua ua = _get_ua() pool_name = b"RTPTransport_%d" % id(self) self.weakref = weakref.ref(self) Py_INCREF(self.weakref) self._af = pj_AF_INET() status = pj_mutex_create_recursive(ua._pjsip_endpoint._pool, "rtp_transport_lock", &self._lock) if status != 0: raise PJSIPError("failed to create lock", status) pool = ua.create_memory_pool(pool_name, 4096, 4096) self._pool = pool self.state = "NULL" def __init__(self, encryption=None, use_ice=False, ice_stun_address=None, ice_stun_port=PJ_STUN_PORT): cdef PJSIPUA ua = _get_ua() if self.state != "NULL": raise SIPCoreError("RTPTransport.__init__() was already called") self._rtp_valid_pair = None self._encryption = encryption self.use_ice = use_ice self.ice_stun_address = ice_stun_address self.ice_stun_port = ice_stun_port def __dealloc__(self): cdef PJSIPUA ua cdef pjmedia_transport *transport cdef Timer timer try: ua = _get_ua() except: return transport = self._obj if transport != NULL: transport.user_data = NULL if self._wrapped_transport != NULL: self._wrapped_transport.user_data = NULL with nogil: pjmedia_transport_media_stop(transport) pjmedia_transport_close(transport) self._obj = NULL self._wrapped_transport = NULL ua.release_memory_pool(self._pool) self._pool = NULL if self._lock != NULL: pj_mutex_destroy(self._lock) timer = Timer() try: timer.schedule(60, deallocate_weakref, self.weakref) except SIPCoreError: pass cdef PJSIPUA _check_ua(self): cdef PJSIPUA ua try: ua = _get_ua() return ua except: self.state = "INVALID" self._obj = NULL self._wrapped_transport = NULL self._pool = NULL return None cdef void _get_info(self, pjmedia_transport_info *info): cdef int status cdef pjmedia_transport *transport transport = self._obj with nogil: pjmedia_transport_info_init(info) status = pjmedia_transport_get_info(transport, info) if status != 0: raise PJSIPError("Could not get transport info", status) property local_rtp_port: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) if pj_sockaddr_has_addr(&info.sock_info.rtp_addr_name): return pj_sockaddr_get_port(&info.sock_info.rtp_addr_name) else: return None finally: with nogil: pj_mutex_unlock(lock) property local_rtp_address: def __get__(self): cdef char buf[PJ_INET6_ADDRSTRLEN] cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) if pj_sockaddr_has_addr(&info.sock_info.rtp_addr_name): return pj_sockaddr_print(&info.sock_info.rtp_addr_name, buf, PJ_INET6_ADDRSTRLEN, 0) else: return None finally: with nogil: pj_mutex_unlock(lock) property local_rtp_candidate: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._rtp_valid_pair: return self._rtp_valid_pair.local_candidate return None finally: with nogil: pj_mutex_unlock(lock) property remote_rtp_port: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None if self._ice_active() and self._rtp_valid_pair: return self._rtp_valid_pair.remote_candidate.port self._get_info(&info) if pj_sockaddr_has_addr(&info.src_rtp_name): return pj_sockaddr_get_port(&info.src_rtp_name) else: return None finally: with nogil: pj_mutex_unlock(lock) property remote_rtp_address: def __get__(self): cdef char buf[PJ_INET6_ADDRSTRLEN] cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None if self._ice_active() and self._rtp_valid_pair: return self._rtp_valid_pair.remote_candidate.address self._get_info(&info) if pj_sockaddr_has_addr(&info.src_rtp_name): return pj_sockaddr_print(&info.src_rtp_name, buf, PJ_INET6_ADDRSTRLEN, 0) else: return None finally: with nogil: pj_mutex_unlock(lock) property remote_rtp_candidate: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._rtp_valid_pair: return self._rtp_valid_pair.remote_candidate return None finally: with nogil: pj_mutex_unlock(lock) property srtp_active: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_srtp_info *srtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return False with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return False self._get_info(&info) srtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_SRTP) if srtp_info != NULL: return bool(srtp_info.active) return False finally: with nogil: pj_mutex_unlock(lock) property srtp_cipher: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_srtp_info *srtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) srtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_SRTP) if srtp_info == NULL or not bool(srtp_info.active): return None return _pj_str_to_bytes(srtp_info.tx_policy.name) finally: with nogil: pj_mutex_unlock(lock) property zrtp_active: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return False with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return False self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info != NULL: return bool(zrtp_info.active) return False finally: with nogil: pj_mutex_unlock(lock) cdef int _ice_active(self): # this function needs to be called with the lock held cdef pjmedia_transport_info info cdef pjmedia_ice_transport_info *ice_info if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return 0 self._get_info(&info) ice_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ICE) if ice_info != NULL and ice_info.sess_state == PJ_ICE_STRANS_STATE_RUNNING: return 1 return 0 property ice_active: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return False with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: return bool(self._ice_active()) finally: with nogil: pj_mutex_unlock(lock) cdef int _init_local_sdp(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index): cdef int status cdef pj_pool_t *pool cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_transport *transport pool = self._pool transport = self._obj pj_local_sdp = local_sdp.get_sdp_session() if remote_sdp is not None: pj_remote_sdp = remote_sdp.get_sdp_session() else: pj_remote_sdp = NULL if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if sdp_index >= pj_local_sdp.media_count: raise ValueError("sdp_index argument out of range") with nogil: status = pjmedia_transport_media_create(transport, pool, 0, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not create media transport", status) return 0 def set_LOCAL(self, SDPSession local_sdp, int sdp_index): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if local_sdp is None: raise SIPCoreError("local_sdp argument cannot be None") if self.state == "LOCAL": return if self.state != "INIT": raise SIPCoreError('set_LOCAL can only be called in the "INIT" state, current state is "%s"' % self.state) self._init_local_sdp(local_sdp, None, sdp_index) self.state = "LOCAL" finally: with nogil: pj_mutex_unlock(lock) def set_REMOTE(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if None in [local_sdp, remote_sdp]: raise SIPCoreError("SDP arguments cannot be None") if self.state == "REMOTE": return if self.state != "INIT": raise SIPCoreError('set_REMOTE can only be called in the "INIT" state, current state is "%s"' % self.state) self._init_local_sdp(local_sdp, remote_sdp, sdp_index) self.state = "REMOTE" finally: with nogil: pj_mutex_unlock(lock) def set_ESTABLISHED(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_transport *transport = self._obj _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: transport = self._obj if None in [local_sdp, remote_sdp]: raise SIPCoreError("SDP arguments cannot be None") pj_local_sdp = local_sdp.get_sdp_session() pj_remote_sdp = remote_sdp.get_sdp_session() if self.state == "ESTABLISHED": return if self.state not in ["LOCAL", "REMOTE"]: raise SIPCoreError('set_ESTABLISHED can only be called in the "INIT" and "LOCAL" states, ' + 'current state is "%s"' % self.state) with nogil: status = pjmedia_transport_media_start(transport, self._pool, pj_local_sdp, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not start media transport", status) self.state = "ESTABLISHED" finally: with nogil: pj_mutex_unlock(lock) def set_INIT(self): global _ice_cb cdef int af cdef int i cdef int status cdef int port cdef pj_caching_pool *caching_pool cdef pj_ice_strans_cfg ice_cfg cdef pj_ice_strans *ice_st cdef pj_ice_strans_state ice_state cdef pj_mutex_t *lock = self._lock cdef pj_str_t local_ip cdef pj_str_t *local_ip_address cdef pjmedia_endpt *media_endpoint cdef pjmedia_srtp_setting srtp_setting cdef pjmedia_transport **transport_address cdef pjmedia_transport *wrapped_transport cdef pjsip_endpoint *sip_endpoint cdef bytes zid_file cdef char *c_zid_file cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: af = self._af caching_pool = &ua._caching_pool._obj media_endpoint = ua._pjmedia_endpoint._obj sip_endpoint = ua._pjsip_endpoint._obj transport_address = &self._obj if self.state == "INIT": return if self.state in ["LOCAL", "ESTABLISHED"]: with nogil: status = pjmedia_transport_media_stop(transport_address[0]) if status != 0: raise PJSIPError("Could not stop media transport", status) self.state = "INIT" elif self.state == "NULL": if ua.ip_address is None: local_ip_address = NULL else: _str_to_pj_str(ua.ip_address, &local_ip) local_ip_address = &local_ip if self.use_ice: with nogil: pj_ice_strans_cfg_default(&ice_cfg) ice_cfg.af = self._af with nogil: pj_stun_config_init(&ice_cfg.stun_cfg, &caching_pool.factory, 0, pjmedia_endpt_get_ioqueue(media_endpoint), pjsip_endpt_get_timer_heap(sip_endpoint)) if self.ice_stun_address is not None: _str_to_pj_str(self.ice_stun_address, &ice_cfg.stun.server) ice_cfg.stun.port = self.ice_stun_port # IIRC we can't choose the port for ICE with nogil: status = pj_sockaddr_init(ice_cfg.af, &ice_cfg.stun.cfg.bound_addr, local_ip_address, 0) if status != 0: raise PJSIPError("Could not init ICE bound address", status) with nogil: status = pjmedia_ice_create2(media_endpoint, NULL, 2, &ice_cfg, &_ice_cb, 0, transport_address) if status != 0: raise PJSIPError("Could not create ICE media transport", status) else: status = PJ_EBUG for i in xrange(ua._rtp_port_index, ua._rtp_port_index + ua._rtp_port_usable_count, 2): port = ua._rtp_port_start + i % ua._rtp_port_usable_count with nogil: status = pjmedia_transport_udp_create3(media_endpoint, af, NULL, local_ip_address, port, 0, transport_address) if status != PJ_ERRNO_START_SYS + EADDRINUSE: ua._rtp_port_index = (i + 2) % ua._rtp_port_usable_count break if status != 0: raise PJSIPError("Could not create UDP/RTP media transport", status) self._obj.user_data = self.weakref if self._encryption is not None: wrapped_transport = self._wrapped_transport = self._obj self._obj = NULL if self._encryption.startswith('sdes'): with nogil: pjmedia_srtp_setting_default(&srtp_setting) if self._encryption == 'sdes_mandatory': srtp_setting.use = PJMEDIA_SRTP_MANDATORY with nogil: status = pjmedia_transport_srtp_create(media_endpoint, wrapped_transport, &srtp_setting, transport_address) if status != 0: with nogil: pjmedia_transport_close(wrapped_transport) self._wrapped_transport = NULL raise PJSIPError("Could not create SRTP media transport", status) elif self._encryption == 'zrtp': with nogil: status = pjmedia_transport_zrtp_create(media_endpoint, pjsip_endpt_get_timer_heap(sip_endpoint), wrapped_transport, transport_address, 1) if status == 0: zid_file = ua.zrtp_cache.encode(sys.getfilesystemencoding()) c_zid_file = zid_file with nogil: # Auto-enable is deactivated status = pjmedia_transport_zrtp_initialize(self._obj, c_zid_file, 0, &_zrtp_cb) if status != 0: with nogil: pjmedia_transport_close(wrapped_transport) self._wrapped_transport = NULL raise PJSIPError("Could not create ZRTP media transport", status) else: raise RuntimeError('invalid SRTP key negotiation specified: %s' % self._encryption) self._obj.user_data = self.weakref if not self.use_ice or self.ice_stun_address is None: self.state = "INIT" _add_event("RTPTransportDidInitialize", dict(obj=self)) else: self.state = "WAIT_STUN" if self.use_ice: _add_event("RTPTransportICENegotiationStateDidChange", dict(obj=self, prev_state="NULL", state="GATHERING")) ice_st = pjmedia_ice_get_strans(transport_address[0]) if ice_st != NULL: ice_state = pj_ice_strans_get_state(ice_st) if ice_state == PJ_ICE_STRANS_STATE_READY: _add_event("RTPTransportICENegotiationStateDidChange", dict(obj=self, prev_state="GATHERING", state="GATHERING_COMPLETE")) else: raise SIPCoreError('set_INIT can only be called in the "NULL", "LOCAL" and "ESTABLISHED" states, ' + 'current state is "%s"' % self.state) finally: with nogil: pj_mutex_unlock(lock) def set_zrtp_sas_verified(self, verified): cdef int status cdef int c_verified cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return False with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return False self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return False c_verified = int(verified) with nogil: pjmedia_transport_zrtp_setSASVerified(self._obj, c_verified) return True finally: with nogil: pj_mutex_unlock(lock) def set_zrtp_enabled(self, enabled, object master_stream): cdef int status cdef int c_enabled cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua cdef bytes multistream_params cdef char *c_multistream_params cdef int length cdef RTPTransport master_transport ua = self._check_ua() if ua is None: return with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL: return if master_stream is not None: master_transport = master_stream._rtp_transport assert master_transport is not None # extract the multistream parameters multistream_params = master_transport.zrtp_multistream_parameters if multistream_params: # set multistream mode in ourselves c_multistream_params = multistream_params length = len(multistream_params) with nogil: pjmedia_transport_zrtp_setMultiStreamParameters(self._obj, c_multistream_params, length, master_transport._obj) c_enabled = int(enabled) with nogil: pjmedia_transport_zrtp_setEnableZrtp(self._obj, c_enabled) finally: with nogil: pj_mutex_unlock(lock) property zrtp_multistream_parameters: def __get__(self): cdef int status cdef char* c_name cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua cdef char *multistr_params cdef int length ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return None with nogil: multistr_params = pjmedia_transport_zrtp_getMultiStreamParameters(self._obj, &length) if length > 0: ret = _pj_buf_len_to_str(multistr_params, length) free(multistr_params) return ret else: return None finally: with nogil: pj_mutex_unlock(lock) property zrtp_cipher: def __get__(self): cdef int status cdef char* c_name cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return None return _buf_to_str(zrtp_info.cipher) finally: with nogil: pj_mutex_unlock(lock) property zrtp_peer_name: def __get__(self): cdef int status cdef char* c_name cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return '' with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return '' self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return '' with nogil: c_name = pjmedia_transport_zrtp_getPeerName(self._obj) if c_name == NULL: return '' else: name = PyUnicode_FromString(c_name) or u'' free(c_name) return name finally: with nogil: pj_mutex_unlock(lock) def __set__(self, basestring name): cdef int status cdef char* c_name cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return name = name.encode('utf-8') c_name = name with nogil: pjmedia_transport_zrtp_putPeerName(self._obj, c_name) finally: with nogil: pj_mutex_unlock(lock) property zrtp_peer_id: def __get__(self): cdef int status cdef unsigned char name[12] # IDENTIFIER_LEN, 96bits cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return None with nogil: status = pjmedia_transport_zrtp_getPeerZid(self._obj, name) if status <= 0: return None else: return _pj_buf_len_to_str(name, 12) finally: with nogil: pj_mutex_unlock(lock) def update_local_sdp(self, SDPSession local_sdp, BaseSDPSession remote_sdp=None, int sdp_index=0): cdef int status cdef pj_pool_t *pool cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_transport *transport cdef SDPMediaStream local_media pool = self._pool transport = self._obj pj_local_sdp = local_sdp.get_sdp_session() if remote_sdp is not None: pj_remote_sdp = remote_sdp.get_sdp_session() else: pj_remote_sdp = NULL if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if sdp_index >= pj_local_sdp.media_count: raise ValueError("sdp_index argument out of range") # Remove ICE and SRTP/ZRTP related attributes from SDP, they will be added by pjmedia_transport_encode_sdp local_media = local_sdp.media[sdp_index] local_media.attributes = [ attr for attr in local_media.attributes if attr.name not in ('crypto', 'zrtp-hash', 'ice-ufrag', 'ice-pwd', 'ice-mismatch', 'candidate', 'remote-candidates')] pj_local_sdp = local_sdp.get_sdp_session() with nogil: status = pjmedia_transport_encode_sdp(transport, pool, pj_local_sdp, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not update SDP for media transport", status) local_sdp._update() return 0 cdef class MediaCheckTimer(Timer): def __init__(self, media_check_interval): self.media_check_interval = media_check_interval cdef class SDPInfo: def __init__(self, BaseSDPMediaStream local_media=None, BaseSDPSession local_sdp=None, BaseSDPSession remote_sdp=None, int index=0): self.local_media = local_media self.local_sdp = local_sdp self.remote_sdp = remote_sdp self.index = index property local_media: def __get__(self): return self._local_media def __set__(self, local_media): if local_media is not None: local_media = SDPMediaStream.new(local_media) self._local_media = local_media property local_sdp: def __get__(self): return self._local_sdp def __set__(self, local_sdp): if local_sdp is not None: local_sdp = SDPSession.new(local_sdp) self._local_sdp = local_sdp property remote_sdp: def __get__(self): return self._remote_sdp def __set__(self, remote_sdp): if remote_sdp is not None: remote_sdp = SDPSession.new(remote_sdp) self._remote_sdp = remote_sdp cdef class AudioTransport: def __cinit__(self, *args, **kwargs): cdef int status cdef pj_pool_t *pool cdef bytes pool_name cdef char* c_pool_name cdef PJSIPUA ua ua = _get_ua() pool_name = b"AudioTransport_%d" % id(self) self.weakref = weakref.ref(self) Py_INCREF(self.weakref) status = pj_mutex_create_recursive(ua._pjsip_endpoint._pool, "audio_transport_lock", &self._lock) if status != 0: raise PJSIPError("failed to create lock", status) pool = ua.create_memory_pool(pool_name, 4096, 4096) self._pool = pool self._slot = -1 self._timer = None self._volume = 100 def __init__(self, AudioMixer mixer, RTPTransport transport, BaseSDPSession remote_sdp=None, int sdp_index=0, enable_silence_detection=False, list codecs=None): cdef int status cdef pj_pool_t *pool cdef pjmedia_endpt *media_endpoint cdef pjmedia_sdp_media *local_media_c cdef pjmedia_sdp_session *local_sdp_c cdef pj_sockaddr *addr cdef pjmedia_transport_info info cdef list global_codecs cdef SDPMediaStream local_media cdef SDPSession local_sdp cdef PJSIPUA ua ua = _get_ua() media_endpoint = ua._pjmedia_endpoint._obj pool = self._pool if self.transport is not None: raise SIPCoreError("AudioTransport.__init__() was already called") if mixer is None: raise ValueError("mixer argument may not be None") if transport is None: raise ValueError("transport argument cannot be None") if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if transport.state != "INIT": raise SIPCoreError('RTPTransport object provided is not in the "INIT" state, but in the "%s" state' % transport.state) self._vad = int(bool(enable_silence_detection)) self.mixer = mixer self.transport = transport transport._get_info(&info) global_codecs = ua._pjmedia_endpoint._get_current_codecs() if codecs is None: codecs = global_codecs try: ua._pjmedia_endpoint._set_codecs(codecs) addr = &info.sock_info.rtp_addr_name with nogil: status = pjmedia_endpt_create_base_sdp(media_endpoint, pool, NULL, addr, &local_sdp_c) if status != 0: raise PJSIPError("Could not generate base SDP", status) with nogil: status = pjmedia_endpt_create_audio_sdp(media_endpoint, pool, &info.sock_info, 0, &local_media_c) if status != 0: raise PJSIPError("Could not generate SDP audio stream", status) # Create a 'fake' SDP, which only contains the audio stream, then the m line is extracted because the full # SDP is built by the Session local_sdp_c.media_count = 1 local_sdp_c.media[0] = local_media_c finally: ua._pjmedia_endpoint._set_codecs(global_codecs) local_sdp = SDPSession_create(local_sdp_c) local_media = local_sdp.media[0] if remote_sdp is None: self._is_offer = 1 self.transport.set_LOCAL(local_sdp, 0) else: self._is_offer = 0 if sdp_index != 0: local_sdp.media = [None] * (sdp_index+1) local_sdp.media[sdp_index] = local_media self.transport.set_REMOTE(local_sdp, remote_sdp, sdp_index) self._sdp_info = SDPInfo(local_media, local_sdp, remote_sdp, sdp_index) def __dealloc__(self): cdef PJSIPUA ua cdef Timer timer try: ua = _get_ua() except: return if self._obj != NULL: self.stop() ua.release_memory_pool(self._pool) self._pool = NULL if self._lock != NULL: pj_mutex_destroy(self._lock) timer = Timer() try: timer.schedule(60, deallocate_weakref, self.weakref) except SIPCoreError: pass cdef PJSIPUA _check_ua(self): cdef PJSIPUA ua try: ua = _get_ua() return ua except: self._obj = NULL self._pool = NULL return None property is_active: def __get__(self): self._check_ua() return bool(self._obj != NULL) property is_started: def __get__(self): return bool(self._is_started) property codec: def __get__(self): self._check_ua() if self._obj == NULL: return None else: return _pj_str_to_bytes(self._stream_info.fmt.encoding_name) property sample_rate: def __get__(self): self._check_ua() if self._obj == NULL: return None else: return self._stream_info.fmt.clock_rate property enable_silence_detection: def __get__(self): return bool(self._vad) property statistics: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_rtcp_stat stat cdef pjmedia_stream *stream cdef dict statistics = dict() cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream == NULL: return None with nogil: status = pjmedia_stream_get_stat(stream, &stat) if status != 0: raise PJSIPError("Could not get RTP statistics", status) statistics["rtt"] = _pj_math_stat_to_dict(&stat.rtt) statistics["rx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.rx) statistics["tx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.tx) return statistics finally: with nogil: pj_mutex_unlock(lock) property volume: def __get__(self): return self._volume def __set__(self, value): cdef int slot cdef int status cdef int volume cdef pj_mutex_t *lock = self._lock cdef pjmedia_conf *conf_bridge cdef PJSIPUA ua ua = self._check_ua() if ua is not None: with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: conf_bridge = self.mixer._obj slot = self._slot if value < 0: raise ValueError("volume attribute cannot be negative") if ua is not None and self._obj != NULL: volume = int(value * 1.28 - 128) with nogil: status = pjmedia_conf_adjust_rx_level(conf_bridge, slot, volume) if status != 0: raise PJSIPError("Could not set volume of audio transport", status) self._volume = value finally: if ua is not None: with nogil: pj_mutex_unlock(lock) property slot: def __get__(self): self._check_ua() if self._slot == -1: return None else: return self._slot def get_local_media(self, BaseSDPSession remote_sdp=None, int index=0, direction="sendrecv"): global valid_sdp_directions cdef int status cdef pj_mutex_t *lock = self._lock cdef object direction_attr cdef SDPAttribute attr cdef SDPSession local_sdp cdef SDPMediaStream local_media cdef pjmedia_sdp_media *c_local_media _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: is_offer = remote_sdp is None if is_offer and direction not in valid_sdp_directions: raise SIPCoreError("Unknown direction: %s" % direction) self._sdp_info.index = index local_sdp = self._sdp_info.local_sdp local_media = self._sdp_info.local_media local_sdp.media = [None] * (index+1) local_sdp.media[index] = local_media self.transport.update_local_sdp(local_sdp, remote_sdp, index) # updating the local SDP might have modified the connection line if local_sdp.connection is not None and local_media.connection is None: local_media.connection = SDPConnection.new(local_sdp.connection) local_media.attributes = [ attr for attr in local_media.attributes if attr.name not in valid_sdp_directions] if is_offer: direction_attr = direction else: if self.direction is None or "recv" in self.direction.decode(): direction_attr = b"sendrecv" else: direction_attr = b"sendonly" local_media.attributes.append(SDPAttribute(direction_attr, b"")) for attribute in local_media.attributes: if attribute.name == b'rtcp': attribute.value = (attribute.value.decode().split(' ', 1)[0]).encode() self._sdp_info.local_media = local_media return local_media finally: with nogil: pj_mutex_unlock(lock) def start(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index, int timeout=30): cdef int status cdef object desired_state cdef pj_mutex_t *lock = self._lock cdef pj_pool_t *pool cdef pjmedia_endpt *media_endpoint cdef pjmedia_port *media_port cdef pjmedia_sdp_media *local_media cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_stream **stream_address cdef pjmedia_stream_info *stream_info_address cdef pjmedia_transport *transport cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: pool = self._pool media_endpoint = ua._pjmedia_endpoint._obj stream_address = &self._obj stream_info_address = &self._stream_info transport = self.transport._obj if self._is_started: raise SIPCoreError("This AudioTransport was already started once") desired_state = ("LOCAL" if self._is_offer else "REMOTE") if self.transport.state != desired_state: raise SIPCoreError('RTPTransport object provided is not in the "%s" state, but in the "%s" state' % (desired_state, self.transport.state)) if None in [local_sdp, remote_sdp]: raise ValueError("SDP arguments cannot be None") pj_local_sdp = local_sdp.get_sdp_session() pj_remote_sdp = remote_sdp.get_sdp_session() if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if local_sdp.media[sdp_index].port == 0 or remote_sdp.media[sdp_index].port == 0: raise SIPCoreError("Cannot start a rejected audio stream") if timeout < 0: raise ValueError("timeout value cannot be negative") self.transport.set_ESTABLISHED(local_sdp, remote_sdp, sdp_index) with nogil: status = pjmedia_stream_info_from_sdp(stream_info_address, pool, media_endpoint, pj_local_sdp, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not parse SDP for audio session", status) if self._stream_info.param == NULL: raise SIPCoreError("Could not parse SDP for audio session") self._stream_info.param.setting.vad = self._vad self._stream_info.use_ka = 1 with nogil: status = pjmedia_stream_create(media_endpoint, pool, stream_info_address, transport, NULL, stream_address) if status != 0: raise PJSIPError("Could not initialize RTP for audio session", status) with nogil: status = pjmedia_stream_set_dtmf_callback(stream_address[0], _AudioTransport_cb_dtmf, self.weakref) if status != 0: with nogil: pjmedia_stream_destroy(stream_address[0]) self._obj = NULL raise PJSIPError("Could not set DTMF callback for audio session", status) with nogil: status = pjmedia_stream_start(stream_address[0]) if status != 0: with nogil: pjmedia_stream_destroy(stream_address[0]) self._obj = NULL raise PJSIPError("Could not start RTP for audio session", status) with nogil: status = pjmedia_stream_get_port(stream_address[0], &media_port) if status != 0: with nogil: pjmedia_stream_destroy(stream_address[0]) self._obj = NULL raise PJSIPError("Could not get audio port for audio session", status) try: self._slot = self.mixer._add_port(ua, pool, media_port) if self._volume != 100: self.volume = self._volume except: with nogil: pjmedia_stream_destroy(stream_address[0]) self._obj = NULL raise self.update_direction(local_sdp.media[sdp_index].direction) self._sdp_info.local_media = local_sdp.media[sdp_index] self._sdp_info.local_sdp = local_sdp self._sdp_info.remote_sdp = remote_sdp self._sdp_info.index = sdp_index self._is_started = 1 if timeout > 0: self._timer = MediaCheckTimer(timeout) self._timer.schedule(timeout, self._cb_check_rtp, self) self.mixer.reset_ec() finally: with nogil: pj_mutex_unlock(lock) def stop(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_stream *stream cdef PJSIPUA ua ua = self._check_ua() if ua is not None: with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._timer is not None: self._timer.cancel() self._timer = None if self._obj == NULL: return self._obj = NULL self.mixer._remove_port(ua, self._slot) with nogil: pjmedia_stream_destroy(stream) self.transport.set_INIT() finally: if ua is not None: with nogil: pj_mutex_unlock(lock) def update_direction(self, direction): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._obj == NULL: raise SIPCoreError("Stream is not active") if direction not in valid_sdp_directions: raise SIPCoreError("Unknown direction: %s" % direction) if direction != self.direction: self.mixer.reset_ec() self.direction = direction finally: with nogil: pj_mutex_unlock(lock) def update_sdp(self, local_sdp, remote_sdp, index): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._obj == NULL: raise SIPCoreError("Stream is not active") self._sdp_info.local_media = local_sdp.media[index] self._sdp_info.local_sdp = local_sdp self._sdp_info.remote_sdp = remote_sdp self._sdp_info.index = index finally: with nogil: pj_mutex_unlock(lock) def send_dtmf(self, digit): cdef int status cdef pj_mutex_t *lock = self._lock cdef pj_str_t digit_pj cdef pjmedia_stream *stream cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._obj == NULL: raise SIPCoreError("Stream is not active") if len(digit) != 1 or digit not in "0123456789*#ABCD": raise SIPCoreError("Not a valid DTMF digit: %s" % digit) _str_to_pj_str(digit.encode(), &digit_pj) if not self._stream_info.tx_event_pt < 0: # If the remote doesn't support telephone-event just don't send DTMF with nogil: status = pjmedia_stream_dial_dtmf(stream, &digit_pj) if status != 0: raise PJSIPError("Could not send DTMF digit on audio stream", status) finally: with nogil: pj_mutex_unlock(lock) cdef int _cb_check_rtp(self, MediaCheckTimer timer) except -1: cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_rtcp_stat stat cdef pjmedia_stream *stream with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream == NULL: return 0 if self._timer is None: return 0 self._timer = None with nogil: status = pjmedia_stream_get_stat(stream, &stat) if status == 0: if self._packets_received == stat.rx.pkt and self.direction == "sendrecv": _add_event("RTPAudioTransportDidTimeout", dict(obj=self)) self._packets_received = stat.rx.pkt if timer.media_check_interval > 0: self._timer = MediaCheckTimer(timer.media_check_interval) self._timer.schedule(timer.media_check_interval, self._cb_check_rtp, self) finally: with nogil: pj_mutex_unlock(lock) cdef class VideoTransport: def __cinit__(self, *args, **kwargs): cdef int status cdef pj_pool_t *pool cdef bytes pool_name cdef PJSIPUA ua ua = _get_ua() endpoint = ua._pjsip_endpoint._obj pool_name = b"VideoTransport_%d" % id(self) self.weakref = weakref.ref(self) Py_INCREF(self.weakref) pool = ua.create_memory_pool(pool_name, 4096, 4096) self._pool = pool status = pj_mutex_create_recursive(pool, "video_transport_lock", &self._lock) if status != 0: raise PJSIPError("failed to create lock", status) self._timer = None def __init__(self, RTPTransport transport, BaseSDPSession remote_sdp=None, int sdp_index=0, list codecs=None): cdef int status cdef pj_pool_t *pool cdef pjmedia_endpt *media_endpoint cdef pjmedia_sdp_media *local_media_c cdef pjmedia_sdp_session *local_sdp_c cdef pjmedia_transport_info info cdef pj_sockaddr *addr cdef list global_codecs cdef SDPMediaStream local_media cdef SDPSession local_sdp cdef PJSIPUA ua ua = _get_ua() media_endpoint = ua._pjmedia_endpoint._obj pool = self._pool if self.transport is not None: raise SIPCoreError("VideoTransport.__init__() was already called") if transport is None: raise ValueError("transport argument cannot be None") if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if transport.state != "INIT": raise SIPCoreError('RTPTransport object provided is not in the "INIT" state, but in the "%s" state' % transport.state) self.transport = transport transport._get_info(&info) global_codecs = ua._pjmedia_endpoint._get_current_video_codecs() if codecs is None: codecs = global_codecs try: ua._pjmedia_endpoint._set_video_codecs(codecs) addr = &(info.sock_info.rtp_addr_name) with nogil: status = pjmedia_endpt_create_base_sdp(media_endpoint, pool, NULL, addr, &local_sdp_c) if status != 0: raise PJSIPError("Could not generate base SDP", status) with nogil: status = pjmedia_endpt_create_video_sdp(media_endpoint, pool, &info.sock_info, 0, &local_media_c) if status != 0: raise PJSIPError("Could not generate SDP video stream", status) # Create a 'fake' SDP, which only contains the video stream, then the m line is extracted because the full # SDP is built by the Session local_sdp_c.media_count = 1 local_sdp_c.media[0] = local_media_c finally: ua._pjmedia_endpoint._set_video_codecs(global_codecs) local_sdp = SDPSession_create(local_sdp_c) local_media = local_sdp.media[0] if remote_sdp is None: self._is_offer = 1 self.transport.set_LOCAL(local_sdp, 0) else: self._is_offer = 0 if sdp_index != 0: local_sdp.media = [None] * (sdp_index+1) local_sdp.media[sdp_index] = local_media self.transport.set_REMOTE(local_sdp, remote_sdp, sdp_index) self._sdp_info = SDPInfo(local_media, local_sdp, remote_sdp, sdp_index) self.local_video = None self.remote_video = None def __dealloc__(self): cdef PJSIPUA ua cdef Timer timer try: ua = _get_ua() except SIPCoreError: return if self._obj != NULL: self.stop() if self._lock != NULL: pj_mutex_destroy(self._lock) ua.release_memory_pool(self._pool) self._pool = NULL timer = Timer() try: timer.schedule(60, deallocate_weakref, self.weakref) except SIPCoreError: pass cdef PJSIPUA _check_ua(self): cdef PJSIPUA ua try: ua = _get_ua() return ua except: self._obj = NULL self._pool = NULL return None property is_active: def __get__(self): self._check_ua() return bool(self._obj != NULL) property is_started: def __get__(self): return bool(self._is_started) property codec: def __get__(self): self._check_ua() if self._obj == NULL: return None else: return _pj_str_to_bytes(self._stream_info.codec_info.encoding_name) property sample_rate: def __get__(self): self._check_ua() if self._obj == NULL: return None else: return self._stream_info.codec_info.clock_rate property statistics: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_rtcp_stat stat cdef pjmedia_vid_stream *stream cdef dict statistics = dict() cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream == NULL: return None with nogil: status = pjmedia_vid_stream_get_stat(stream, &stat) if status != 0: raise PJSIPError("Could not get RTP statistics", status) statistics["rtt"] = _pj_math_stat_to_dict(&stat.rtt) statistics["rx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.rx) statistics["tx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.tx) return statistics finally: with nogil: pj_mutex_unlock(lock) def get_local_media(self, BaseSDPSession remote_sdp=None, int index=0, direction="sendrecv"): global valid_sdp_directions cdef int status cdef pj_mutex_t *lock = self._lock cdef object direction_attr cdef SDPAttribute attr cdef SDPSession local_sdp cdef SDPMediaStream local_media cdef pjmedia_sdp_media *c_local_media _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: is_offer = remote_sdp is None if is_offer and direction not in valid_sdp_directions: raise SIPCoreError("Unknown direction: %s" % direction) self._sdp_info.index = index local_sdp = self._sdp_info.local_sdp local_media = self._sdp_info.local_media local_sdp.media = [None] * (index+1) local_sdp.media[index] = local_media self.transport.update_local_sdp(local_sdp, remote_sdp, index) # updating the local SDP might have modified the connection line if local_sdp.connection is not None and local_media.connection is None: local_media.connection = SDPConnection.new(local_sdp.connection) local_media.attributes = [ attr for attr in local_media.attributes if attr.name not in valid_sdp_directions] if is_offer: direction_attr = direction else: if self.direction is None or "recv" in self.direction.decode(): direction_attr = b"sendrecv" else: direction_attr = b"sendonly" local_media.attributes.append(SDPAttribute(direction_attr, b"")) for attribute in local_media.attributes: if attribute.name == 'rtcp': attribute.value = (attribute.value.decode().split(' ', 1)[0]).encode() return local_media finally: with nogil: pj_mutex_unlock(lock) def start(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index, int timeout=30): cdef int status cdef object desired_state cdef pj_mutex_t *lock = self._lock cdef pj_pool_t *pool cdef pjmedia_endpt *media_endpoint cdef pjmedia_sdp_media *local_media cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_vid_stream *stream cdef pjmedia_vid_stream_info *stream_info cdef pjmedia_transport *transport cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: pool = self._pool media_endpoint = ua._pjmedia_endpoint._obj stream_info = &self._stream_info transport = self.transport._obj if self._is_started: raise SIPCoreError("This VideoTransport was already started once") desired_state = ("LOCAL" if self._is_offer else "REMOTE") if self.transport.state != desired_state: raise SIPCoreError('RTPTransport object provided is not in the "%s" state, but in the "%s" state' % (desired_state, self.transport.state)) if None in (local_sdp, remote_sdp): raise ValueError("SDP arguments cannot be None") pj_local_sdp = local_sdp.get_sdp_session() pj_remote_sdp = remote_sdp.get_sdp_session() if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if local_sdp.media[sdp_index].port == 0 or remote_sdp.media[sdp_index].port == 0: raise SIPCoreError("Cannot start a rejected video stream") if timeout < 0: raise ValueError("timeout value cannot be negative") self.transport.set_ESTABLISHED(local_sdp, remote_sdp, sdp_index) with nogil: status = pjmedia_vid_stream_info_from_sdp(stream_info, pool, media_endpoint, pj_local_sdp, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not parse SDP for video session", status) if self._stream_info.codec_param == NULL: raise SIPCoreError("Could not parse SDP for video session") self._stream_info.use_ka = 1 with nogil: status = pjmedia_vid_stream_create(media_endpoint, pool, stream_info, transport, NULL, &stream) if status != 0: raise PJSIPError("Could not initialize RTP for video session", status) self._obj = stream with nogil: status = pjmedia_vid_stream_start(stream) if status != 0: with nogil: pjmedia_vid_stream_destroy(stream) self._obj = NULL raise PJSIPError("Could not start RTP for video session", status) with nogil: pjmedia_vid_stream_send_rtcp_sdes(stream) try: local_video = LocalVideoStream_create(stream) remote_video = RemoteVideoStream_create(stream, self._remote_video_event_handler) except PJSIPError: with nogil: pjmedia_vid_stream_destroy(stream) self._obj = NULL self.local_video = None self.remote_video = None raise self.local_video = local_video self.remote_video = remote_video self.update_direction(local_sdp.media[sdp_index].direction) self._sdp_info.local_media = local_sdp.media[sdp_index] self._sdp_info.local_sdp = local_sdp self._sdp_info.remote_sdp = remote_sdp self._sdp_info.index = sdp_index self._is_started = 1 if timeout > 0: self._timer = MediaCheckTimer(timeout) self._timer.schedule(timeout, self._cb_check_rtp, self) finally: with nogil: pj_mutex_unlock(lock) def stop(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream cdef PJSIPUA ua ua = self._check_ua() if ua is not None: with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._timer is not None: self._timer.cancel() self._timer = None if self._obj == NULL: return self._obj = NULL if self.local_video is not None: self.local_video.close() self.local_video = None if self.remote_video is not None: self.remote_video.close() self.remote_video = None with nogil: pjmedia_vid_stream_send_rtcp_bye(stream) pjmedia_vid_stream_destroy(stream) self.transport.set_INIT() finally: if ua is not None: with nogil: pj_mutex_unlock(lock) def update_direction(self, direction): global valid_sdp_directions cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._obj == NULL: raise SIPCoreError("Stream is not active") if direction not in valid_sdp_directions: raise SIPCoreError("Unknown direction: %s" % direction) self.direction = direction finally: with nogil: pj_mutex_unlock(lock) def update_sdp(self, local_sdp, remote_sdp, index): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._obj == NULL: raise SIPCoreError("Stream is not active") self._sdp_info.local_media = local_sdp.media[index] self._sdp_info.local_sdp = local_sdp self._sdp_info.remote_sdp = remote_sdp self._sdp_info.index = index finally: with nogil: pj_mutex_unlock(lock) def pause(self, direction="both"): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream cdef pjmedia_dir pj_dir _get_ua() if direction not in ("incoming", "outgoing", "both"): raise ValueError("direction can only be one of 'incoming', 'outgoing' or 'both'") if direction == "incoming": pj_dir = PJMEDIA_DIR_RENDER elif direction == "outgoing": pj_dir = PJMEDIA_DIR_CAPTURE else: pj_dir = PJMEDIA_DIR_CAPTURE_RENDER with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._obj == NULL: raise SIPCoreError("Stream is not active") with nogil: status = pjmedia_vid_stream_pause(stream, pj_dir) if status != 0: raise PJSIPError("failed to pause video stream", status) finally: with nogil: pj_mutex_unlock(lock) def resume(self, direction="both"): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream cdef pjmedia_dir pj_dir _get_ua() if direction not in ("incoming", "outgoing", "both"): raise ValueError("direction can only be one of 'incoming', 'outgoing' or 'both'") if direction == "incoming": pj_dir = PJMEDIA_DIR_RENDER elif direction == "outgoing": pj_dir = PJMEDIA_DIR_CAPTURE else: pj_dir = PJMEDIA_DIR_CAPTURE_RENDER with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._obj == NULL: raise SIPCoreError("Stream is not active") with nogil: status = pjmedia_vid_stream_resume(stream, pj_dir) if status != 0: raise PJSIPError("failed to resume video stream", status) finally: with nogil: pj_mutex_unlock(lock) def send_keyframe(self): cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream != NULL: # Do not check for errors, it's OK if we can't send it pjmedia_vid_stream_send_keyframe(stream) finally: with nogil: pj_mutex_unlock(lock) def request_keyframe(self): cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream != NULL: # Do not check for errors, it's OK if we can't send it pjmedia_vid_stream_send_rtcp_pli(stream) finally: with nogil: pj_mutex_unlock(lock) cdef int _cb_check_rtp(self, MediaCheckTimer timer) except -1: cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_rtcp_stat stat cdef pjmedia_vid_stream *stream with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream == NULL: return 0 if self._timer is None: return 0 self._timer = None with nogil: status = pjmedia_vid_stream_get_stat(stream, &stat) if status == 0: if self._packets_received == stat.rx.pkt and self.direction == "sendrecv": _add_event("RTPVideoTransportDidTimeout", dict(obj=self)) self._packets_received = stat.rx.pkt if timer.media_check_interval > 0: self._timer = MediaCheckTimer(timer.media_check_interval) self._timer.schedule(timer.media_check_interval, self._cb_check_rtp, self) finally: with nogil: pj_mutex_unlock(lock) def _remote_video_event_handler(self, str name, object data): if name == "FORMAT_CHANGED": size, framerate = data _add_event("RTPVideoTransportRemoteFormatDidChange", dict(obj=self, size=size, framerate=framerate)) elif name == "RECEIVED_KEYFRAME": _add_event("RTPVideoTransportReceivedKeyFrame", dict(obj=self)) elif name == "MISSED_KEYFRAME": _add_event("RTPVideoTransportMissedKeyFrame", dict(obj=self)) elif name == "REQUESTED_KEYFRAME": _add_event("RTPVideoTransportRequestedKeyFrame", dict(obj=self)) cdef class ICECandidate: def __init__(self, component, cand_type, address, port, priority, rel_addr=''): self.component = component self.type = cand_type self.address = address self.port = port self.priority = priority self.rel_address = rel_addr def __str__(self): return '(%s) %s:%d%s priority=%d type=%s' % (self.component, self.address, self.port, ' rel_addr=%s' % self.rel_address if self.rel_address else '', self.priority, self.type.lower()) cdef ICECandidate ICECandidate_create(pj_ice_sess_cand *cand): cdef char buf[PJ_INET6_ADDRSTRLEN] cdef str address cdef str cand_type cdef int port if cand.type == PJ_ICE_CAND_TYPE_HOST: cand_type = 'HOST' elif cand.type == PJ_ICE_CAND_TYPE_SRFLX: cand_type = 'SRFLX' elif cand.type == PJ_ICE_CAND_TYPE_PRFLX: cand_type = 'PRFLX' elif cand.type == PJ_ICE_CAND_TYPE_RELAYED: cand_type = 'RELAY' else: cand_type = 'UNKNOWN' pj_sockaddr_print(&cand.addr, buf, PJ_INET6_ADDRSTRLEN, 0) address = _buf_to_str(buf) port = pj_sockaddr_get_port(&cand.addr) if pj_sockaddr_has_addr(&cand.rel_addr): pj_sockaddr_print(&cand.rel_addr, buf, PJ_INET6_ADDRSTRLEN, 0) rel_addr = _buf_to_str(buf) else: rel_addr = '' return ICECandidate('RTP' if cand.comp_id==1 else 'RTCP', cand_type, address, port, cand.prio, rel_addr) cdef class ICECheck: def __init__(self, local_candidate, remote_candidate, state, nominated): self.local_candidate = local_candidate self.remote_candidate = remote_candidate self.state = state self.nominated = nominated def __str__(self): return '%s:%d -> %s:%d (%s, %s)' % (self.local_candidate.address, self.local_candidate.port, self.remote_candidate.address, self.remote_candidate.port, self.state.lower(), 'nominated' if self.nominated else 'not nominated') cdef ICECheck ICECheck_create(pj_ice_sess_check *check): cdef str state cdef ICECandidate lcand cdef ICECandidate rcand if check.state == PJ_ICE_SESS_CHECK_STATE_FROZEN: state = 'FROZEN' elif check.state == PJ_ICE_SESS_CHECK_STATE_WAITING: state = 'WAITING' elif check.state == PJ_ICE_SESS_CHECK_STATE_IN_PROGRESS: state = 'IN_PROGRESS' elif check.state == PJ_ICE_SESS_CHECK_STATE_SUCCEEDED: state = 'SUCCEEDED' elif check.state == PJ_ICE_SESS_CHECK_STATE_FAILED: state = 'FAILED' else: state = 'UNKNOWN' lcand = ICECandidate_create(check.lcand) rcand = ICECandidate_create(check.rcand) return ICECheck(lcand, rcand, state, bool(check.nominated)) cdef ICECheck _get_rtp_valid_pair(pj_ice_strans *ice_st): cdef pj_ice_sess_check_ptr_const ice_check ice_check = pj_ice_strans_get_valid_pair(ice_st, 1) if ice_check == NULL: return None return ICECheck_create(ice_check) # helper functions cdef dict _pj_math_stat_to_dict(pj_math_stat *stat): cdef dict retval = dict() retval["count"] = stat.n retval["max"] = stat.max retval["min"] = stat.min retval["last"] = stat.last retval["avg"] = stat.mean return retval cdef dict _pjmedia_rtcp_stream_stat_to_dict(pjmedia_rtcp_stream_stat *stream_stat): cdef dict retval = dict() retval["packets"] = stream_stat.pkt retval["bytes"] = stream_stat.bytes retval["packets_discarded"] = stream_stat.discard retval["packets_lost"] = stream_stat.loss retval["packets_reordered"] = stream_stat.reorder retval["packets_duplicate"] = stream_stat.dup retval["loss_period"] = _pj_math_stat_to_dict(&stream_stat.loss_period) retval["burst_loss"] = bool(stream_stat.loss_type.burst) retval["random_loss"] = bool(stream_stat.loss_type.random) retval["jitter"] = _pj_math_stat_to_dict(&stream_stat.jitter) return retval cdef str _ice_state_to_str(int state): if state == PJ_ICE_STRANS_STATE_NULL: return 'NULL' elif state == PJ_ICE_STRANS_STATE_INIT: return 'GATHERING' elif state == PJ_ICE_STRANS_STATE_READY: return 'GATHERING_COMPLETE' elif state == PJ_ICE_STRANS_STATE_SESS_READY: return 'NEGOTIATION_START' elif state == PJ_ICE_STRANS_STATE_NEGO: return 'NEGOTIATING' elif state == PJ_ICE_STRANS_STATE_RUNNING: return 'RUNNING' elif state == PJ_ICE_STRANS_STATE_FAILED: return 'FAILED' else: return 'UNKNOWN' cdef dict _extract_ice_session_data(pj_ice_sess *ice_sess): cdef dict data = dict() cdef pj_ice_sess_cand *cand cdef pj_ice_sess_check *check # Process local candidates local_candidates = [] for i in range(ice_sess.lcand_cnt): cand = &ice_sess.lcand[i] local_candidates.append(ICECandidate_create(cand)) data['local_candidates'] = local_candidates # Process remote candidates remote_candidates = [] for i in range(ice_sess.rcand_cnt): cand = &ice_sess.rcand[i] remote_candidates.append(ICECandidate_create(cand)) data['remote_candidates'] = remote_candidates # Process valid pairs valid_pairs = [] for i in range(ice_sess.comp_cnt): check = ice_sess.comp[i].valid_check valid_pairs.append(ICECheck_create(check)) data['valid_pairs'] = valid_pairs # Process valid list valid_list = [] for i in range(ice_sess.valid_list.count): check = &ice_sess.valid_list.checks[i] valid_list.append(ICECheck_create(check)) data['valid_list'] = valid_list return data cdef object _extract_rtp_transport(pjmedia_transport *tp): cdef void *rtp_transport_ptr = NULL if tp != NULL: rtp_transport_ptr = tp.user_data if rtp_transport_ptr == NULL: return None return ( rtp_transport_ptr)() # callback functions cdef void _RTPTransport_cb_ice_complete(pjmedia_transport *tp, pj_ice_strans_op op, int status) with gil: # Despite the name this callback is not only called when ICE negotiation ends, it depends on the # op parameter cdef int lock_status cdef double duration cdef pj_ice_strans *ice_st cdef pj_ice_sess *ice_sess cdef pj_time_val tv, start_time cdef pj_mutex_t *lock cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return lock = rtp_transport._lock with nogil: lock_status = pj_mutex_lock(lock) if lock_status != 0: raise PJSIPError("failed to acquire lock", status) try: if op == PJ_ICE_STRANS_OP_NEGOTIATION: if status == 0: ice_st = pjmedia_ice_get_strans(tp) if ice_st == NULL: return ice_sess = pj_ice_strans_get_session(ice_st) if ice_sess == NULL: return start_time = pj_ice_strans_get_start_time(ice_st) pj_gettimeofday(&tv) tv.sec -= start_time.sec tv.msec -= start_time.msec pj_time_val_normalize(&tv) duration = (tv.sec*1000 + tv.msec)/1000.0 data = _extract_ice_session_data(ice_sess) rtp_transport._rtp_valid_pair = _get_rtp_valid_pair(ice_st) _add_event("RTPTransportICENegotiationDidSucceed", dict(obj=rtp_transport, duration=duration, local_candidates=data['local_candidates'], remote_candidates=data['remote_candidates'], valid_pairs=data['valid_pairs'], valid_list=data['valid_list'])) else: rtp_transport._rtp_valid_pair = None _add_event("RTPTransportICENegotiationDidFail", dict(obj=rtp_transport, reason=_pj_status_to_str(status))) elif op == PJ_ICE_STRANS_OP_INIT: if status == 0: rtp_transport.state = "INIT" _add_event("RTPTransportDidInitialize", dict(obj=rtp_transport)) else: rtp_transport.state = "INVALID" _add_event("RTPTransportDidFail", dict(obj=rtp_transport, reason=_pj_status_to_str(status))) else: # silence compiler warning pass finally: with nogil: pj_mutex_unlock(lock) cdef void _RTPTransport_cb_ice_state(pjmedia_transport *tp, pj_ice_strans_state prev, pj_ice_strans_state curr) with gil: cdef int status cdef pj_mutex_t *lock cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return lock = rtp_transport._lock with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: _add_event("RTPTransportICENegotiationStateDidChange", dict(obj=rtp_transport, prev_state=_ice_state_to_str(prev), state=_ice_state_to_str(curr))) finally: with nogil: pj_mutex_unlock(lock) cdef void _RTPTransport_cb_ice_stop(pjmedia_transport *tp, char *reason, int err) with gil: cdef int status cdef pj_mutex_t *lock cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return lock = rtp_transport._lock with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: rtp_transport._rtp_valid_pair = None _reason = reason if _reason != b"media stop requested": _add_event("RTPTransportICENegotiationDidFail", dict(obj=rtp_transport, reason=_reason)) finally: with nogil: pj_mutex_unlock(lock) cdef void _RTPTransport_cb_zrtp_secure_on(pjmedia_transport *tp, char* cipher) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return _add_event("RTPTransportZRTPSecureOn", dict(obj=rtp_transport, cipher=bytes(cipher))) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_secure_off(pjmedia_transport *tp) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return _add_event("RTPTransportZRTPSecureOff", dict(obj=rtp_transport)) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_show_sas(pjmedia_transport *tp, char* sas, int verified) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return - _add_event("RTPTransportZRTPReceivedSAS", dict(obj=rtp_transport, sas=str(sas), verified=bool(verified))) + _add_event("RTPTransportZRTPReceivedSAS", dict(obj=rtp_transport, sas=bytes(sas), verified=bool(verified))) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_confirm_goclear(pjmedia_transport *tp) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return # TODO: not yet implemented by PJSIP's ZRTP transport except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_show_message(pjmedia_transport *tp, int severity, int sub_code) with gil: global zrtp_message_levels, zrtp_error_messages cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return level = zrtp_message_levels.get(severity, 1) message = zrtp_error_messages[level].get(sub_code, 'Unknown') _add_event("RTPTransportZRTPLog", dict(obj=rtp_transport, level=level, message=message)) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_negotiation_failed(pjmedia_transport *tp, int severity, int sub_code) with gil: global zrtp_message_levels, zrtp_error_messages cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return level = zrtp_message_levels.get(severity, 1) reason = zrtp_error_messages[level].get(sub_code, 'Unknown') _add_event("RTPTransportZRTPNegotiationFailed", dict(obj=rtp_transport, reason=reason)) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_not_supported_by_other(pjmedia_transport *tp) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return _add_event("RTPTransportZRTPNotSupportedByRemote", dict(obj=rtp_transport)) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_ask_enrollment(pjmedia_transport *tp, int info) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return # TODO: implement PBX enrollment except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_inform_enrollment(pjmedia_transport *tp, int info) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return # TODO: implement PBX enrollment except: ua._handle_exception(1) cdef void _AudioTransport_cb_dtmf(pjmedia_stream *stream, void *user_data, int digit) with gil: cdef AudioTransport audio_stream = ( user_data)() cdef PJSIPUA ua try: ua = _get_ua() except: return if audio_stream is None: return try: _add_event("RTPAudioStreamGotDTMF", dict(obj=audio_stream, digit=chr(digit))) except: ua._handle_exception(1) # globals cdef pjmedia_ice_cb _ice_cb _ice_cb.on_ice_complete = _RTPTransport_cb_ice_complete _ice_cb.on_ice_state = _RTPTransport_cb_ice_state _ice_cb.on_ice_stop = _RTPTransport_cb_ice_stop valid_sdp_directions = (b"sendrecv", b"sendonly", b"recvonly", b"inactive") # ZRTP cdef pjmedia_zrtp_cb _zrtp_cb _zrtp_cb.secure_on = _RTPTransport_cb_zrtp_secure_on _zrtp_cb.secure_off = _RTPTransport_cb_zrtp_secure_off _zrtp_cb.show_sas = _RTPTransport_cb_zrtp_show_sas _zrtp_cb.confirm_go_clear = _RTPTransport_cb_zrtp_confirm_goclear _zrtp_cb.show_message = _RTPTransport_cb_zrtp_show_message _zrtp_cb.negotiation_failed = _RTPTransport_cb_zrtp_negotiation_failed _zrtp_cb.not_supported_by_other = _RTPTransport_cb_zrtp_not_supported_by_other _zrtp_cb.ask_enrollment = _RTPTransport_cb_zrtp_ask_enrollment _zrtp_cb.inform_enrollment = _RTPTransport_cb_zrtp_inform_enrollment _zrtp_cb.sign_sas = NULL _zrtp_cb.check_sas_signature = NULL # Keep these aligned with ZrtpCodes.h cdef dict zrtp_message_levels = {1: 'INFO', 2: 'WARNING', 3: 'SEVERE', 4: 'ERROR'} cdef dict zrtp_error_messages = { 'INFO': { 0: "Unknown", 1: "Hello received and prepared a Commit, ready to get peer's hello hash", #InfoHelloReceived 2: "Commit: Generated a public DH key", #InfoCommitDHGenerated 3: "Responder: Commit received, preparing DHPart1", #InfoRespCommitReceived 4: "DH1Part: Generated a public DH key", #InfoDH1DHGenerated 5: "Initiator: DHPart1 received, preparing DHPart2", #InfoInitDH1Received 6: "Responder: DHPart2 received, preparing Confirm1", #InfoRespDH2Received 7: "Initiator: Confirm1 received, preparing Confirm2", #InfoInitConf1Received 8: "Responder: Confirm2 received, preparing Conf2Ack", #InfoRespConf2Received 9: "At least one retained secrets matches - security OK", #InfoRSMatchFound 10: "Entered secure state", #InfoSecureStateOn 11: "No more security for this session", #InfoSecureStateOff }, 'WARNING': { 0: "Unknown", 1: "WarningDHAESmismatch = 1, //!< Commit contains an AES256 cipher but does not offer a Diffie-Helman 4096 - not used DH4096 was discarded", #WarningDHAESmismatch 2: "Received a GoClear message", #WarningGoClearReceived 3: "Hello offers an AES256 cipher but does not offer a Diffie-Helman 4096- not used DH4096 was discarded", #WarningDHShort 4: "No retained shared secrets available - must verify SAS", #WarningNoRSMatch 5: "Internal ZRTP packet checksum mismatch - packet dropped", #WarningCRCmismatch 6: "Dropping packet because SRTP authentication failed!", #WarningSRTPauthError 7: "Dropping packet because SRTP replay check failed!", #WarningSRTPreplayError 8: "Valid retained shared secrets availabe but no matches found - must verify SAS", #WarningNoExpectedRSMatch }, 'SEVERE': { 0: "Unknown", 1: "Hash HMAC check of Hello failed!", #SevereHelloHMACFailed 2: "Hash HMAC check of Commit failed!", #SevereCommitHMACFailed 3: "Hash HMAC check of DHPart1 failed!", #SevereDH1HMACFailed 4: "Hash HMAC check of DHPart2 failed!", #SevereDH2HMACFailed 5: "Cannot send data - connection or peer down?", #SevereCannotSend 6: "Internal protocol error occured!", #SevereProtocolError 7: "Cannot start a timer - internal resources exhausted?", #SevereNoTimer 8: "Too much retries during ZRTP negotiation - connection or peer down?", #SevereTooMuchRetries }, 'ERROR': { 0x00: "Unknown", 0x10: "Malformed packet (CRC OK, but wrong structure)", #MalformedPacket 0x20: "Critical software error", #CriticalSWError 0x30: "Unsupported ZRTP version", #UnsuppZRTPVersion 0x40: "Hello components mismatch", #HelloCompMismatch 0x51: "Hash type not supported", #UnsuppHashType 0x52: "Cipher type not supported", #UnsuppCiphertype 0x53: "Public key exchange not supported", #UnsuppPKExchange 0x54: "SRTP auth. tag not supported", #UnsuppSRTPAuthTag 0x55: "SAS scheme not supported", #UnsuppSASScheme 0x56: "No shared secret available, DH mode required", #NoSharedSecret 0x61: "DH Error: bad pvi or pvr ( == 1, 0, or p-1)", #DHErrorWrongPV 0x62: "DH Error: hvi != hashed data", #DHErrorWrongHVI 0x63: "Received relayed SAS from untrusted MiTM", #SASuntrustedMiTM 0x70: "Auth. Error: Bad Confirm pkt HMAC", #ConfirmHMACWrong 0x80: "Nonce reuse", #NonceReused 0x90: "Equal ZIDs in Hello", #EqualZIDHello 0x100: "GoClear packet received, but not allowed", #GoCleatNotAllowed 0x7fffffff: "Packet ignored", #IgnorePacket } } diff --git a/sipsimple/streams/rtp/__init__.py b/sipsimple/streams/rtp/__init__.py index 8f1635ad..3900872f 100644 --- a/sipsimple/streams/rtp/__init__.py +++ b/sipsimple/streams/rtp/__init__.py @@ -1,619 +1,619 @@ """ Handling of RTP media streams according to RFC3550, RFC3605, RFC3581, RFC2833 and RFC3711, RFC3489 and RFC5245. """ __all__ = ['RTPStream'] import weakref from abc import ABCMeta, abstractmethod from application.notification import IObserver, NotificationCenter, NotificationData, ObserverWeakrefProxy from application.python import Null from threading import RLock from zope.interface import implementer from sipsimple.account import BonjourAccount from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import RTPTransport, SIPCoreError, SIPURI from sipsimple.lookup import DNSLookup from sipsimple.streams import IMediaStream, InvalidStreamError, MediaStreamType, UnknownStreamError from sipsimple.threading import run_in_thread @implementer(IObserver) class ZRTPStreamOptions(object): def __init__(self, stream): self._stream = stream self.__dict__['master'] = None self.__dict__['sas'] = None self.__dict__['verified'] = False self.__dict__['peer_name'] = '' @property def sas(self): if self.master is not None: return self.master.encryption.zrtp.sas return self.__dict__['sas'] @property def verified(self): if self.master is not None: return self.master.encryption.zrtp.verified return self.__dict__['verified'] @verified.setter def verified(self, verified): if self.__dict__['verified'] == verified: return if self.sas is None: raise AttributeError('Cannot verify peer before SAS is received') if self.master is not None: self.master.encryption.zrtp.verified = verified else: rtp_transport = self._stream._rtp_transport if rtp_transport is not None: @run_in_thread('file-io') def update_verified(rtp_transport, verified): rtp_transport.set_zrtp_sas_verified(verified) notification_center = NotificationCenter() notification_center.post_notification('RTPStreamZRTPVerifiedStateChanged', sender=self._stream, data=NotificationData(verified=verified)) self.__dict__['verified'] = verified update_verified(rtp_transport, verified) @property def peer_id(self): if self.master is not None: return self.master.encryption.zrtp.peer_id rtp_transport = self._stream._rtp_transport if rtp_transport is None: return None return rtp_transport.zrtp_peer_id @property def peer_name(self): if self.master is not None: return self.master.encryption.zrtp.peer_name return self.__dict__['peer_name'] @peer_name.setter def peer_name(self, name): if self.__dict__['peer_name'] == name: return if self.master is not None: self.master.encryption.zrtp.peer_name = name else: rtp_transport = self._stream._rtp_transport if rtp_transport is not None: @run_in_thread('file-io') def update_name(rtp_transport, name): rtp_transport.zrtp_peer_name = name notification_center = NotificationCenter() notification_center.post_notification('RTPStreamZRTPPeerNameChanged', sender=self._stream, data=NotificationData(name=name)) self.__dict__['peer_name'] = name update_name(rtp_transport, name) @property def master(self): return self.__dict__['master'] @master.setter def master(self, master): old_master = self.__dict__['master'] if old_master is master: return notification_center = NotificationCenter() if old_master is not None: notification_center.remove_observer(self, sender=old_master) if master is not None: notification_center.add_observer(self, sender=master) self.__dict__['master'] = master def _enable(self, master_stream=None): rtp_transport = self._stream._rtp_transport if rtp_transport is None: return if master_stream is not None and not (master_stream.encryption.active and master_stream.encryption.type == 'ZRTP'): raise RuntimeError('Master stream must have ZRTP encryption activated') rtp_transport.set_zrtp_enabled(True, master_stream) self.master = master_stream def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPStreamZRTPReceivedSAS(self, notification): # ZRTP begins on the audio stream, so this notification will only be processed # by the other streams self.__dict__['sas'] = notification.data.sas self.__dict__['verified'] = notification.data.verified self.__dict__['peer_name'] = notification.data.peer_name notification.center.post_notification(notification.name, sender=self._stream, data=notification.data) def _NH_RTPStreamZRTPVerifiedStateChanged(self, notification): self.__dict__['verified'] = notification.data.verified notification.center.post_notification(notification.name, sender=self._stream, data=notification.data) def _NH_RTPStreamZRTPPeerNameChanged(self, notification): self.__dict__['peer_name'] = notification.data.name notification.center.post_notification(notification.name, sender=self._stream, data=notification.data) def _NH_MediaStreamDidEnd(self, notification): self.master = None @implementer(IObserver) class RTPStreamEncryption(object): def __init__(self, stream): self._stream_ref = weakref.ref(stream) # Keep a weak reference before the stream is initialized to avoid a memory cycle that would delay releasing audio resources self._stream = None # We will store the actual reference once it's initialized and we're guaranteed to get MediaStreamDidEnd and do the cleanup self._rtp_transport = None self.__dict__['type'] = None self.__dict__['zrtp'] = None notification_center = NotificationCenter() notification_center.add_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize') notification_center.add_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize') @property def active(self): rtp_transport = self._rtp_transport if rtp_transport is None: return False if self.type == 'SRTP/SDES': return rtp_transport.srtp_active elif self.type == 'ZRTP': return rtp_transport.zrtp_active return False @property def cipher(self): rtp_transport = self._rtp_transport if rtp_transport is None: return None if self.type == 'SRTP/SDES': return rtp_transport.srtp_cipher elif self.type == 'ZRTP': return rtp_transport.zrtp_cipher return None @property def type(self): return self.__dict__['type'] @property def zrtp(self): zrtp = self.__dict__['zrtp'] if zrtp is None: raise RuntimeError('ZRTP options have not been initialized') return zrtp def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidInitialize(self, notification): stream = notification.sender if stream is self._stream_ref(): self._stream = stream self._rtp_transport = stream._rtp_transport notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize') notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize') notification.center.add_observer(self, sender=self._stream) notification.center.add_observer(self, sender=self._rtp_transport) encryption = stream._srtp_encryption or '' if encryption.startswith('sdes'): self.__dict__['type'] = 'SRTP/SDES' elif encryption == 'zrtp': self.__dict__['type'] = 'ZRTP' self.__dict__['zrtp'] = ZRTPStreamOptions(stream) def _NH_MediaStreamDidNotInitialize(self, notification): if notification.sender is self._stream_ref(): notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize') notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize') self._stream_ref = None self._stream = None def _NH_MediaStreamDidStart(self, notification): if self.type == 'SRTP/SDES': stream = notification.sender if self.active: notification.center.post_notification('RTPStreamDidEnableEncryption', sender=stream) else: reason = 'Not supported by remote' notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason)) def _NH_MediaStreamDidEnd(self, notification): notification.center.remove_observer(self, sender=self._stream) notification.center.remove_observer(self, sender=self._rtp_transport) self._stream = None self._stream_ref = None self._rtp_transport = None self.__dict__['type'] = None self.__dict__['zrtp'] = None def _NH_RTPTransportZRTPSecureOn(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return notification.center.post_notification('RTPStreamDidEnableEncryption', sender=stream) def _NH_RTPTransportZRTPSecureOff(self, notification): # We should never get here because we don't allow disabling encryption -Saul pass def _NH_RTPTransportZRTPReceivedSAS(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return self.zrtp.__dict__['sas'] = sas = notification.data.sas self.zrtp.__dict__['verified'] = verified = notification.data.verified self.zrtp.__dict__['peer_name'] = peer_name = notification.sender.zrtp_peer_name - notification.center.post_notification('RTPStreamZRTPReceivedSAS', sender=stream, data=NotificationData(sas=sas, verified=verified, peer_name=peer_name)) + notification.center.post_notification('RTPStreamZRTPReceivedSAS', sender=stream, data=NotificationData(sas=sas.decode(), verified=verified, peer_name=peer_name)) def _NH_RTPTransportZRTPLog(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return notification.center.post_notification('RTPStreamZRTPLog', sender=stream, data=notification.data) def _NH_RTPTransportZRTPNegotiationFailed(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return reason = 'Negotiation failed: %s' % notification.data.reason notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason)) def _NH_RTPTransportZRTPNotSupportedByRemote(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return reason = 'ZRTP not supported by remote' notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason)) class RTPStreamType(ABCMeta, MediaStreamType): pass @implementer(IMediaStream, IObserver) class RTPStream(object, metaclass=RTPStreamType): type = None priority = None hold_supported = True def __init__(self): self.notification_center = NotificationCenter() self.on_hold_by_local = False self.on_hold_by_remote = False self.direction = None self.state = "NULL" self.session = None self.encryption = RTPStreamEncryption(self) self._transport = None self._hold_request = None self._ice_state = "NULL" self._lock = RLock() self._rtp_transport = None self._try_ice = False self._srtp_encryption = None self._remote_rtp_address_sdp = None self._remote_rtp_port_sdp = None self._initialized = False self._done = False self._failure_reason = None @property def codec(self): return self._transport.codec if self._transport else None @property def sample_rate(self): return self._transport.sample_rate if self._transport else None @property def statistics(self): return self._transport.statistics if self._transport else None @property def local_rtp_address(self): return self._rtp_transport.local_rtp_address if self._rtp_transport else None @property def local_rtp_port(self): return self._rtp_transport.local_rtp_port if self._rtp_transport else None @property def local_rtp_candidate(self): return self._rtp_transport.local_rtp_candidate if self._rtp_transport else None @property def remote_rtp_address(self): if self._ice_state == "IN_USE": return self._rtp_transport.remote_rtp_address if self._rtp_transport else None return self._remote_rtp_address_sdp if self._rtp_transport else None @property def remote_rtp_port(self): if self._ice_state == "IN_USE": return self._rtp_transport.remote_rtp_port if self._rtp_transport else None return self._remote_rtp_port_sdp if self._rtp_transport else None @property def remote_rtp_candidate(self): return self._rtp_transport.remote_rtp_candidate if self._rtp_transport else None @property def ice_active(self): return self._ice_state == "IN_USE" @property def on_hold(self): return self.on_hold_by_local or self.on_hold_by_remote @abstractmethod def start(self, local_sdp, remote_sdp, stream_index): raise NotImplementedError @abstractmethod def update(self, local_sdp, remote_sdp, stream_index): raise NotImplementedError @abstractmethod def validate_update(self, remote_sdp, stream_index): raise NotImplementedError @abstractmethod def deactivate(self): raise NotImplementedError @abstractmethod def end(self): raise NotImplementedError @abstractmethod def reset(self, stream_index): raise NotImplementedError def hold(self): with self._lock: if self.on_hold_by_local or self._hold_request == 'hold': return if self.state == "ESTABLISHED" and self.direction != "inactive": self._pause() self._hold_request = 'hold' def unhold(self): with self._lock: if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold': return if self.state == "ESTABLISHED" and self._hold_request == 'hold': self._resume() self._hold_request = None if self._hold_request == 'hold' else 'unhold' @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): # TODO: actually validate the SDP settings = SIPSimpleSettings() remote_stream = remote_sdp.media[stream_index] if remote_stream.media != cls.type.encode(): raise UnknownStreamError if remote_stream.transport not in (b'RTP/AVP', b'RTP/SAVP'): raise InvalidStreamError("expected RTP/AVP or RTP/SAVP transport in %s stream, got %s" % (cls.type, remote_stream.transport.decode())) local_encryption_policy = session.account.rtp.encryption.key_negotiation if session.account.rtp.encryption.enabled else None if local_encryption_policy == "sdes_mandatory" and not b"crypto" in remote_stream.attributes: raise InvalidStreamError("SRTP/SDES is locally mandatory but it's not remotely enabled") if remote_stream.transport == b'RTP/SAVP' and b"crypto" in remote_stream.attributes and local_encryption_policy not in ("opportunistic", "sdes_optional", "sdes_mandatory"): raise InvalidStreamError("SRTP/SDES is remotely mandatory but it's not locally enabled") account_preferred_codecs = getattr(session.account.rtp, '%s_codec_list' % cls.type) general_codecs = getattr(settings.rtp, '%s_codec_list' % cls.type) supported_codecs = account_preferred_codecs or general_codecs if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs): raise InvalidStreamError("no compatible codecs found") stream = cls() stream._incoming_remote_sdp = remote_sdp stream._incoming_stream_index = stream_index return stream def initialize(self, session, direction): with self._lock: if self.state != "NULL": raise RuntimeError("%sStream.initialize() may only be called in the NULL state" % self.type.capitalize()) self.state = "INITIALIZING" self.session = session local_encryption_policy = session.account.rtp.encryption.key_negotiation if session.account.rtp.encryption.enabled else None if hasattr(self, "_incoming_remote_sdp") and hasattr(self, '_incoming_stream_index'): # ICE attributes could come at the session level or at the media level remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index] self._try_ice = self.session.account.nat_traversal.use_ice and ((remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates) if "zrtp-hash" in remote_stream.attributes: incoming_stream_encryption = 'zrtp' elif "crypto" in remote_stream.attributes: incoming_stream_encryption = 'sdes_mandatory' if remote_stream.transport == 'RTP/SAVP' else 'sdes_optional' else: incoming_stream_encryption = None if incoming_stream_encryption is not None and local_encryption_policy == 'opportunistic': self._srtp_encryption = incoming_stream_encryption else: self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy else: self._try_ice = self.session.account.nat_traversal.use_ice self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy if self._try_ice: if self.session.account.nat_traversal.stun_server_list: stun_servers = list((server.host, server.port) for server in self.session.account.nat_traversal.stun_server_list) self._init_rtp_transport(stun_servers) elif not isinstance(self.session.account, BonjourAccount): dns_lookup = DNSLookup() self.notification_center.add_observer(self, sender=dns_lookup) dns_lookup.lookup_service(SIPURI(self.session.account.id.domain), "stun") else: self._init_rtp_transport() def get_local_media(self, remote_sdp=None, index=0): with self._lock: if self.state not in ("INITIALIZED", "WAIT_ICE", "ESTABLISHED"): raise RuntimeError("%sStream.get_local_media() may only be called in the INITIALIZED, WAIT_ICE or ESTABLISHED states" % self.type.capitalize()) if remote_sdp is None: # offer old_direction = self._transport.direction if old_direction is None: new_direction = "sendrecv" elif b"send" in old_direction: new_direction = ("sendonly" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "sendrecv") else: new_direction = ("inactive" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "recvonly") else: new_direction = None new_direction = new_direction.encode() if new_direction else None return self._transport.get_local_media(remote_sdp, index, new_direction) # Notifications def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidFail(self, notification): self.notification_center.remove_observer(self, sender=notification.sender) with self._lock: if self.state == "ENDED": return self._init_rtp_transport() def _NH_DNSLookupDidSucceed(self, notification): self.notification_center.remove_observer(self, sender=notification.sender) with self._lock: if self.state == "ENDED": return self._init_rtp_transport(notification.data.result) def _NH_RTPTransportDidInitialize(self, notification): rtp_transport = notification.sender with self._lock: if self.state == "ENDED": self.notification_center.remove_observer(self, sender=rtp_transport) return del self._rtp_args del self._stun_servers remote_sdp = self.__dict__.pop('_incoming_remote_sdp', None) stream_index = self.__dict__.pop('_incoming_stream_index', None) try: if remote_sdp is not None: transport = self._create_transport(rtp_transport, remote_sdp=remote_sdp, stream_index=stream_index) self._save_remote_sdp_rtp_info(remote_sdp, stream_index) else: transport = self._create_transport(rtp_transport) except SIPCoreError as e: self.state = "ENDED" self.notification_center.remove_observer(self, sender=rtp_transport) self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=e.args[0])) return self._rtp_transport = rtp_transport self._transport = transport self.notification_center.add_observer(self, sender=transport) self._initialized = True self.state = "INITIALIZED" self.notification_center.post_notification('MediaStreamDidInitialize', sender=self) def _NH_RTPTransportDidFail(self, notification): self.notification_center.remove_observer(self, sender=notification.sender) with self._lock: if self.state == "ENDED": return self._try_next_rtp_transport(notification.data.reason) def _NH_RTPTransportICENegotiationStateDidChange(self, notification): with self._lock: if self._ice_state != "NULL" or self.state not in ("INITIALIZING", "INITIALIZED", "WAIT_ICE"): return self.notification_center.post_notification('RTPStreamICENegotiationStateDidChange', sender=self, data=notification.data) def _NH_RTPTransportICENegotiationDidSucceed(self, notification): with self._lock: if self.state != "WAIT_ICE": return self._ice_state = "IN_USE" self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidSucceed', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) def _NH_RTPTransportICENegotiationDidFail(self, notification): with self._lock: if self.state != "WAIT_ICE": return self._ice_state = "FAILED" self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidFail', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) # Private methods def _init_rtp_transport(self, stun_servers=None): self._rtp_args = dict() self._rtp_args["encryption"] = self._srtp_encryption self._rtp_args["use_ice"] = self._try_ice self._stun_servers = [(None, None)] if stun_servers: self._stun_servers.extend(reversed(stun_servers)) self._try_next_rtp_transport() def _try_next_rtp_transport(self, failure_reason=None): if self._stun_servers: stun_address, stun_port = self._stun_servers.pop() try: stun_address = stun_address.encode() if stun_address else None rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args) except SIPCoreError as e: self._try_next_rtp_transport(e.args[0]) else: self.notification_center.add_observer(self, sender=rtp_transport) try: rtp_transport.set_INIT() except SIPCoreError as e: self.notification_center.remove_observer(self, sender=rtp_transport) self._try_next_rtp_transport(e.args[0]) else: self.state = "ENDED" self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=failure_reason)) def _save_remote_sdp_rtp_info(self, remote_sdp, index): connection = remote_sdp.media[index].connection or remote_sdp.connection self._remote_rtp_address_sdp = connection.address self._remote_rtp_port_sdp = remote_sdp.media[index].port @abstractmethod def _create_transport(self, rtp_transport, remote_sdp=None, stream_index=None): raise NotImplementedError @abstractmethod def _check_hold(self, direction, is_initial): raise NotImplementedError @abstractmethod def _pause(self): raise NotImplementedError @abstractmethod def _resume(self): raise NotImplementedError from sipsimple.streams.rtp import audio, video