diff --git a/sipsimple/core/_core.invitation.pxi b/sipsimple/core/_core.invitation.pxi index 7cd660fe..46cc0e22 100644 --- a/sipsimple/core/_core.invitation.pxi +++ b/sipsimple/core/_core.invitation.pxi @@ -1,1860 +1,1856 @@ import weakref from errno import EADDRNOTAVAIL, ENETUNREACH from operator import itemgetter # classes cdef class SDPPayloads: def __init__(self): self.proposed_local = None self.proposed_remote = None self.active_local = None self.active_remote = None cdef class StateCallbackTimer(Timer): def __init__(self, state, sub_state, rdata, tdata, originator): self.state = state self.sub_state = sub_state self.rdata = rdata self.tdata = tdata self.originator = originator cdef class SDPCallbackTimer(Timer): def __init__(self, int status, active_local, active_remote): self.status = status self.active_local = active_local self.active_remote = active_remote cdef class TransferStateCallbackTimer(Timer): def __init__(self, state, code, reason): self.state = state self.code = code self.reason = reason cdef class TransferResponseCallbackTimer(Timer): def __init__(self, method, rdata): self.method = method self.rdata = rdata cdef class TransferRequestCallbackTimer(Timer): def __init__(self, rdata): self.rdata = rdata class DialogID(tuple): call_id = property(itemgetter(0)) local_tag = property(itemgetter(1)) remote_tag = property(itemgetter(2)) def __new__(cls, call_id, local_tag, remote_tag): return tuple.__new__(cls, (call_id, local_tag, remote_tag)) def __repr__(self): return 'DialogID(call_id=%r, local_tag=%r, remote_tag=%r)' % self cdef class Invitation: expire_warning_time = 30 def __cinit__(self, *args, **kwargs): cdef int status self.weakref = weakref.ref(self) Py_INCREF(self.weakref) status = pj_mutex_create_recursive(_get_ua()._pjsip_endpoint._pool, "invitation_lock", &self._lock) if status != 0: raise PJSIPError("failed to create lock", status) pj_list_init( &self._route_set) self._invite_session = NULL self._dialog = NULL self._reinvite_transaction = NULL self._transfer_usage = NULL self._sdp_neg_status = -1 self._failed_response = 0 self._timer = None self._transfer_timeout_timer = None self._transfer_refresh_timer = None self.from_header = None self.to_header = None self.request_uri = None self.route_header = None self.local_contact_header = None self.remote_contact_header = None self.credentials = None self.sdp = SDPPayloads() self.remote_user_agent = None self.state = None self.sub_state = None self.transport = None self.transfer_state = None self.direction = None self.call_id = None self.peer_address = None cdef int init_incoming(self, PJSIPUA ua, pjsip_rx_data *rdata, unsigned int inv_options) except -1: cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_sdp_session_ptr_const sdp cdef pjsip_dialog *replaced_dialog = NULL cdef pjsip_tpselector tp_sel cdef pjsip_tx_data *tdata = NULL cdef PJSTR contact_str cdef char *error_message with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: # Validate replaces header with nogil: status = pjsip_replaces_verify_request(rdata, &replaced_dialog, 0, &tdata) if status != 0: if tdata != NULL: pjsip_endpt_send_response2(ua._pjsip_endpoint._obj, rdata, tdata, NULL, NULL) else: pjsip_endpt_respond_stateless(ua._pjsip_endpoint._obj, rdata, 500, NULL, NULL, NULL) if status != 0: return 0 self.direction = "incoming" self.transport = rdata.tp_info.transport.type_name.lower() self.request_uri = FrozenSIPURI_create( pjsip_uri_get_uri(rdata.msg_info.msg.line.req.uri)) if _is_valid_ip(pj_AF_INET(), self.request_uri.host.encode()): self.local_contact_header = FrozenContactHeader(self.request_uri) else: self.local_contact_header = FrozenContactHeader(FrozenSIPURI(host=_pj_str_to_str(rdata.tp_info.transport.local_name.host), user=self.request_uri.user, port=rdata.tp_info.transport.local_name.port, parameters=(frozendict(transport=self.transport) if self.transport != "udp" else frozendict()))) contact_str = PJSTR(str(self.local_contact_header.body)) tp_sel.type = PJSIP_TPSELECTOR_TRANSPORT tp_sel.u.transport = rdata.tp_info.transport with nogil: status = pjsip_dlg_create_uas_and_inc_lock(pjsip_ua_instance(), rdata, &contact_str.pj_str, &self._dialog) if status != 0: error_message = "Could not create dialog for new INVITE session" else: pjsip_dlg_set_transport(self._dialog, &tp_sel) status = pjsip_inv_create_uas(self._dialog, rdata, NULL, inv_options, &self._invite_session) pjsip_dlg_dec_lock(self._dialog) if status != 0: error_message = "Could not create new INVITE session" else: status = pjsip_inv_initial_answer(self._invite_session, rdata, 100, NULL, NULL, &tdata) if status != 0: error_message = "Could not create initial (unused) response to INVITE" else: pjsip_tx_data_dec_ref(tdata) if status != 0: raise PJSIPError(error_message, status) if self._invite_session.neg != NULL: if pjmedia_sdp_neg_get_state(self._invite_session.neg) == PJMEDIA_SDP_NEG_STATE_REMOTE_OFFER: pjmedia_sdp_neg_get_neg_remote(self._invite_session.neg, &sdp) self.sdp.proposed_remote = FrozenSDPSession_create(sdp) self._invite_session.sdp_neg_flags = PJMEDIA_SDP_NEG_ALLOW_MEDIA_CHANGE self._invite_session.mod_data[ua._module.id] = self.weakref self.call_id = _pj_str_to_str(self._dialog.call_id.id) self.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) event_dict = dict(obj=self, prev_state=self.state, state="incoming", originator="remote") _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) - print(event_dict) self.state = "incoming" self.remote_user_agent = event_dict['headers']['User-Agent'].body if 'User-Agent' in event_dict['headers'] else None try: self.remote_contact_header = event_dict['headers']['Contact'][0] except LookupError: pass _add_event("SIPInvitationChangedState", event_dict) self.from_header = FrozenFromHeader_create(rdata.msg_info.from_hdr) self.to_header = FrozenToHeader_create(rdata.msg_info.to_hdr) except: if self._invite_session != NULL: with nogil: pjsip_inv_terminate(self._invite_session, 500, 0) self._invite_session = NULL elif self._dialog != NULL: with nogil: pjsip_dlg_terminate(self._dialog) self._dialog = NULL else: with nogil: status = pjsip_endpt_create_response(ua._pjsip_endpoint._obj, rdata, 500, NULL, &tdata) if status != 0: error_message = "Could not create response" else: status = pjsip_endpt_send_response2(ua._pjsip_endpoint._obj, rdata, tdata, NULL, NULL) if status != 0: pjsip_tx_data_dec_ref(tdata) error_message = "Could not send response" if status != 0: raise PJSIPError(error_message, status) raise finally: with nogil: pj_mutex_unlock(lock) return 0 cdef int process_incoming_transfer(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1: global _incoming_transfer_cb global _event_hdr_name cdef int status, status2 cdef dict rdata_dict = dict(obj=self) cdef pjsip_tx_data *tdata cdef pjsip_transaction *initial_tsx cdef Timer timer cdef char *error_message if self._transfer_usage != NULL: with nogil: status = pjsip_endpt_create_response(ua._pjsip_endpoint._obj, rdata, 480, NULL, &tdata) if status != 0: error_message = "Could not create response" else: status = pjsip_endpt_send_response2(ua._pjsip_endpoint._obj, rdata, tdata, NULL, NULL) if status != 0: pjsip_tx_data_dec_ref(tdata) error_message = "Could not send response" if status != 0: raise PJSIPError(error_message, status) return 0 _pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict) try: refer_to_hdr = rdata_dict["headers"]["Refer-To"] SIPURI.parse(refer_to_hdr.uri) except (KeyError, SIPCoreError): with nogil: status = pjsip_endpt_create_response(ua._pjsip_endpoint._obj, rdata, 400, NULL, &tdata) if status != 0: error_message = "Could not create response" else: status = pjsip_endpt_send_response2(ua._pjsip_endpoint._obj, rdata, tdata, NULL, NULL) if status != 0: pjsip_tx_data_dec_ref(tdata) error_message = "Could not send response" if status != 0: raise PJSIPError(error_message, status) return 0 try: self._set_transfer_state("INCOMING") _add_event("SIPInvitationTransferNewIncoming", rdata_dict) # PJSIP event framework needs an Event header, even if it's not needed for REFER, so we insert a fake one event_header = pjsip_msg_find_hdr_by_name(rdata.msg_info.msg, &_event_hdr_name.pj_str, NULL) if event_header == NULL: event_header = pjsip_event_hdr_create(rdata.tp_info.pool) event_header.event_type = _refer_event.pj_str pjsip_msg_add_hdr(rdata.msg_info.msg, event_header) initial_tsx = pjsip_rdata_get_tsx(rdata) with nogil: status = pjsip_evsub_create_uas(self._dialog, &_incoming_transfer_cb, rdata, 0, &self._transfer_usage) if status != 0: pjsip_tsx_terminate(initial_tsx, 500) error_message = "Could not create incoming REFER session" else: self._transfer_usage_role = PJSIP_ROLE_UAS pjsip_evsub_set_mod_data(self._transfer_usage, ua._event_module.id, self.weakref) status = pjsip_dlg_create_response(self._dialog, rdata, 202, NULL, &tdata) if status != 0: pjsip_tsx_terminate(initial_tsx, 500) error_message = "Could not create response for incoming REFER" else: pjsip_evsub_update_expires(self._transfer_usage, 90) status = pjsip_dlg_send_response(self._dialog, initial_tsx, tdata) if status != 0: status2 = pjsip_dlg_modify_response(self._dialog, tdata, 500, NULL) if status2 != 0: error_message = "Could not modify response" status = status2 else: pjsip_tx_data_dec_ref(tdata) # pjsip_dlg_modify_response() increases ref count unnecessarily error_message = "Could not send response" if status != 0: raise PJSIPError(error_message, status) except PJSIPError, e: code = 0 reason = e.args[0] if self._transfer_usage != NULL: with nogil: pjsip_evsub_terminate(self._transfer_usage, 0) # Manually trigger the state callback since we handle the timeout ourselves state_timer = TransferStateCallbackTimer("TERMINATED", code, reason) state_timer.schedule(0, self._transfer_cb_state, self) raise else: self._set_transfer_state("ACTIVE") _add_event("SIPInvitationTransferDidStart", dict(obj=self)) timer = Timer() timer.schedule(0, self._start_incoming_transfer, self) return 0 cdef int process_incoming_options(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1: cdef pjsip_tx_data *tdata cdef pjsip_transaction *initial_tsx cdef int status cdef char *error_message initial_tsx = pjsip_rdata_get_tsx(rdata) with nogil: status = pjsip_dlg_create_response(self._dialog, rdata, 200, NULL, &tdata) if status != 0: pjsip_tsx_terminate(initial_tsx, 500) error_message = "Could not create response for incoming OPTIONS" else: status = pjsip_dlg_send_response(self._dialog, initial_tsx, tdata) if status != 0: error_message = "Could not send response" if status != 0: raise PJSIPError(error_message, status) def send_invite(self, SIPURI request_uri not None, FromHeader from_header not None, ToHeader to_header not None, RouteHeader route_header not None, ContactHeader contact_header not None, SDPSession sdp not None, Credentials credentials=None, list extra_headers not None=list(), timeout=None): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_sdp_session *local_sdp cdef pjsip_cred_info *cred_info cdef pjsip_replaces_hdr *pj_replaces_hdr cdef pjsip_route_hdr *route_set cdef pjsip_tx_data *tdata cdef PJSIPUA ua cdef PJSTR contact_str cdef PJSTR from_header_str cdef PJSTR to_header_str cdef PJSTR request_uri_str ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: route_set = &self._route_set if self.state is not None: raise SIPCoreInvalidStateError('Can only transition to the "outgoing" state from the "None" state, currently in the "%s" state' % self.state) if timeout is not None and timeout <= 0: raise ValueError("Timeout value must be positive") - self.transport = route_header.uri.transport + self.transport = route_header.uri.transport.decode() self.direction = "outgoing" self.credentials = FrozenCredentials.new(credentials) if credentials is not None else None - self.request_uri = FrozenSIPURI.new(request_uri) - self.route_header = FrozenRouteHeader.new(route_header) - self.route_header.uri.parameters.dict[b"lr"] = None # always send lr parameter in Route header - self.route_header.uri.parameters.dict[b"hide"] = None # always hide Route header self.local_contact_header = FrozenContactHeader.new(contact_header) self.sdp.proposed_local = FrozenSDPSession.new(sdp) if sdp is not None else None from_header_parameters = from_header.parameters.copy() from_header_parameters.pop("tag", None) from_header.parameters = {} - from_header_str = PJSTR(from_header.body) + from_header_str = PJSTR(from_header.body.encode()) to_header_parameters = to_header.parameters.copy() to_header_parameters.pop("tag", None) to_header.parameters = {} - to_header_str = PJSTR(to_header.body) - contact_str = PJSTR(str(self.local_contact_header.body)) - request_uri_str = PJSTR(str(request_uri)) + to_header_str = PJSTR(to_header.body.encode()) + contact_str = PJSTR(str(self.local_contact_header.body).encode()) + self.request_uri = FrozenSIPURI.new(request_uri) + struri = str(request_uri) + request_uri_str = PJSTR(struri.encode()) + + self.route_header = FrozenRouteHeader.new(route_header) + self.route_header.uri.parameters.dict["lr"] = None # always send lr parameter in Route header + self.route_header.uri.parameters.dict["hide"] = None # always hide Route header with nogil: status = pjsip_dlg_create_uac(pjsip_ua_instance(), &from_header_str.pj_str, &contact_str.pj_str, &to_header_str.pj_str, &request_uri_str.pj_str, &self._dialog) if status != 0: raise PJSIPError("Could not create dialog for outgoing INVITE session", status) with nogil: pjsip_dlg_inc_lock(self._dialog) if contact_header.expires is not None: self._dialog.local.contact.expires = contact_header.expires if contact_header.q is not None: self._dialog.local.contact.q1000 = int(contact_header.q*1000) contact_parameters = contact_header.parameters.copy() contact_parameters.pop("q", None) contact_parameters.pop("expires", None) _dict_to_pjsip_param(contact_parameters, &self._dialog.local.contact.other_param, self._dialog.pool) _dict_to_pjsip_param(from_header_parameters, &self._dialog.local.info.other_param, self._dialog.pool) _dict_to_pjsip_param(to_header_parameters, &self._dialog.remote.info.other_param, self._dialog.pool) self.from_header = FrozenFromHeader_create(self._dialog.local.info) self.to_header = FrozenToHeader.new(to_header) self.call_id = _pj_str_to_str(self._dialog.call_id.id) local_sdp = self.sdp.proposed_local.get_sdp_session() if sdp is not None else NULL with nogil: status = pjsip_inv_create_uac(self._dialog, local_sdp, 0, &self._invite_session) if status != 0: raise PJSIPError("Could not create outgoing INVITE session", status) self._invite_session.sdp_neg_flags = PJMEDIA_SDP_NEG_ALLOW_MEDIA_CHANGE self._invite_session.mod_data[ua._module.id] = self.weakref if self.credentials is not None: cred_info = self.credentials.get_cred_info() with nogil: status = pjsip_auth_clt_set_credentials(&self._dialog.auth_sess, 1, cred_info) if status != 0: raise PJSIPError("Could not set credentials for INVITE session", status) _BaseRouteHeader_to_pjsip_route_hdr(self.route_header, &self._route_header, self._dialog.pool) pj_list_insert_after( &self._route_set, &self._route_header) with nogil: status = pjsip_dlg_set_route_set(self._dialog, route_set) if status != 0: raise PJSIPError("Could not set route for INVITE session", status) with nogil: status = pjsip_inv_invite(self._invite_session, &tdata) if status != 0: raise PJSIPError("Could not create INVITE message", status) replaces_headers = [header for header in extra_headers if isinstance(header, BaseReplacesHeader)] if len(replaces_headers) > 1: raise SIPCoreError("Only one Replaces header is allowed") try: replaces_header = replaces_headers[0] except IndexError: pass else: extra_headers.remove(replaces_header) pj_replaces_hdr = pjsip_replaces_hdr_create(self._dialog.pool) _str_to_pj_str(replaces_header.call_id, &pj_replaces_hdr.call_id) _str_to_pj_str(replaces_header.to_tag, &pj_replaces_hdr.to_tag) _str_to_pj_str(replaces_header.from_tag, &pj_replaces_hdr.from_tag) _dict_to_pjsip_param(replaces_header.parameters, &pj_replaces_hdr.other_param, self._dialog.pool) pjsip_msg_add_hdr(tdata.msg, pj_replaces_hdr) _add_headers_to_tdata(tdata, extra_headers) with nogil: status = pjsip_inv_send_msg(self._invite_session, tdata) if status != 0: raise PJSIPError("Could not send initial INVITE", status) if timeout is not None: self._timer = Timer() self._timer.schedule(timeout, self._cb_timer_disconnect, self) with nogil: pjsip_dlg_dec_lock(self._dialog) except Exception, e: if isinstance(e, PJSIPError) and e.errno == EADDRNOTAVAIL: self._invite_session = NULL pjsip_dlg_dec_lock(self._dialog) self._dialog = NULL raise if self._invite_session != NULL: pjsip_inv_terminate(self._invite_session, 500, 0) self._invite_session = NULL elif self._dialog != NULL: pjsip_dlg_dec_lock(self._dialog) self._dialog = NULL raise finally: with nogil: pj_mutex_unlock(lock) def send_response(self, int code, str reason=None, BaseContactHeader contact_header=None, BaseSDPSession sdp=None, list extra_headers not None=list()): cdef int status cdef int clean_tdata = 0 cdef pj_mutex_t *lock = self._lock cdef pj_str_t reason_str cdef pjmedia_sdp_session_ptr_const lsdp = NULL cdef pjmedia_sdp_session *local_sdp cdef pjsip_inv_session *invite_session cdef pjsip_msg_body *body cdef pjsip_tx_data *tdata cdef PJSIPUA ua ua = _get_ua() - print('invitation send_response') with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: invite_session = self._invite_session if reason is not None: _str_to_pj_str(reason, &reason_str) if self.state not in ("incoming", "early", "connected"): raise SIPCoreInvalidStateError('Can only send response from the "incoming", "early" and "connected" states current in the "%s" state.' % self.state) if self.state == "early" and self.direction != "incoming": raise SIPCoreInvalidStateError('Cannot send response in the "early" state for an outgoing INVITE') if self.state == "connected" and self.sub_state not in ("received_proposal", "received_proposal_request"): raise SIPCoreInvalidStateError('Cannot send response in the "connected" state if a proposal has not been received') if contact_header is not None: self._update_contact_header(contact_header) if 200 <= code < 300 and sdp is None: raise SIPCoreError("Local SDP needs to be set for a positive response") if code >= 300 and sdp is not None: raise SIPCoreError("Local SDP cannot be specified for a negative response") self.sdp.proposed_local = FrozenSDPSession.new(sdp) if sdp is not None else None local_sdp = self.sdp.proposed_local.get_sdp_session() if sdp is not None else NULL - print('pjmedia_sdp_neg_modify_local_offer is next') if sdp is not None and self.sdp.proposed_remote is None: # There was no remote proposal, this is a reply with an offer with nogil: status = pjmedia_sdp_neg_modify_local_offer(self._dialog.pool, invite_session.neg, local_sdp) if status != 0: raise PJSIPError("Could not modify local SDP offer", status) # Retrieve the "fixed" offer from negotiator pjmedia_sdp_neg_get_neg_local(invite_session.neg, &lsdp) local_sdp = lsdp - print('pjsip_inv_answer is next') with nogil: status = pjsip_inv_answer(invite_session, code, &reason_str if reason is not None else NULL, local_sdp, &tdata) if status != 0: raise PJSIPError("Could not create %d reply to INVITE" % code, status) _add_headers_to_tdata(tdata, extra_headers) - print('pjsip_inv_send_msg is next') with nogil: status = pjsip_inv_send_msg(invite_session, tdata) if status != 0: exc = PJSIPError("Could not send %d response" % code, status) if sdp is not None and self.sdp.proposed_remote is not None and exc.errno in (EADDRNOTAVAIL, ENETUNREACH): self._failed_response = 1 raise exc self._failed_response = 0 finally: with nogil: pj_mutex_unlock(lock) def send_reinvite(self, BaseContactHeader contact_header=None, BaseSDPSession sdp=None, list extra_headers not None=list()): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_sdp_session *local_sdp cdef pjsip_inv_session *invite_session cdef pjsip_tx_data *tdata cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: invite_session = self._invite_session if self.state != "connected": raise SIPCoreError('Can only send re-INVITE in "connected" state, not "%s" state' % self.state) if self.sub_state != "normal": raise SIPCoreError('Can only send re-INVITE if no another re-INVITE transaction is active') if contact_header is not None: self._update_contact_header(contact_header) self.sdp.proposed_local = FrozenSDPSession.new(sdp) if sdp is not None else self.sdp.active_local local_sdp = self.sdp.proposed_local.get_sdp_session() with nogil: status = pjsip_inv_reinvite(invite_session, NULL, local_sdp, &tdata) if status != 0: raise PJSIPError("Could not create re-INVITE message", status) _add_headers_to_tdata(tdata, extra_headers) with nogil: status = pjsip_inv_send_msg(invite_session, tdata) if status != 0: raise PJSIPError("Could not send re-INVITE", status) self._failed_response = 0 # TODO: use a callback tiner here instead? self._reinvite_transaction = self._invite_session.invite_tsx self.sub_state = "sent_proposal" event_dict = dict(obj=self, prev_state="connected", state="connected", prev_sub_state="normal", sub_state="sent_proposal", originator="local") _pjsip_msg_to_dict(tdata.msg, event_dict) _add_event("SIPInvitationChangedState", event_dict) finally: with nogil: pj_mutex_unlock(lock) def cancel_reinvite(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjsip_inv_session *invite_session cdef pjsip_tx_data *tdata cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: invite_session = self._invite_session if not self.sub_state == "sent_proposal": raise SIPCoreError("re-INVITE can only be cancelled if INVITE session is in 'sent_proposal' sub state") if self._invite_session == NULL: raise SIPCoreError("INVITE session is not active") if self._reinvite_transaction == NULL: raise SIPCoreError("there is no active re-INVITE transaction") with nogil: status = pjsip_inv_cancel_reinvite(invite_session, &tdata) if status != 0: raise PJSIPError("Could not create message to CANCEL re-INVITE transaction", status) if tdata != NULL: with nogil: status = pjsip_inv_send_msg(invite_session, tdata) if status != 0: raise PJSIPError("Could not send %s" % _pj_str_to_str(tdata.msg.line.req.method.name), status) finally: with nogil: pj_mutex_unlock(lock) def transfer(self, SIPURI target_uri, object replaced_dialog_id=None, list extra_headers not None=list()): global _refer_event global _refer_method cdef int status cdef PJSIPUA ua cdef pj_mutex_t *lock = self._lock cdef pjsip_method refer_method cdef pjsip_tx_data *tdata cdef dict tdata_dict = dict(obj=self) ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state != "connected": raise SIPCoreError('Can only start transfer in "connected" state, not "%s" state' % self.state) if self._transfer_usage != NULL: raise SIPCoreError('Another transfer is in progress') with nogil: status = pjsip_evsub_create_uac(self._dialog, &_transfer_cb, &_refer_event.pj_str, PJSIP_EVSUB_NO_EVENT_ID, &self._transfer_usage) if status != 0: raise PJSIPError("Could not create REFER", status) self._transfer_usage_role = PJSIP_ROLE_UAC pjsip_evsub_set_mod_data(self._transfer_usage, ua._event_module.id, self.weakref) pjsip_method_init_np(&refer_method, &_refer_method.pj_str) with nogil: status = pjsip_evsub_initiate(self._transfer_usage, &refer_method, -1, &tdata) if status != 0: raise PJSIPError("Could not create REFER message", status) if replaced_dialog_id is not None and None not in replaced_dialog_id: target_uri.headers["Replaces"] = "%s;from-tag=%s;to-tag=%s" % replaced_dialog_id refer_to_header = ReferToHeader(str(target_uri)) _add_headers_to_tdata(tdata, [refer_to_header, Header('Referred-By', str(self.local_identity.uri))]) _add_headers_to_tdata(tdata, extra_headers) # We can't remove the Event header or PJSIP will fail to match responses to this request _remove_headers_from_tdata(tdata, ["Expires"]) with nogil: status = pjsip_evsub_send_request(self._transfer_usage, tdata) if status != 0: raise PJSIPError("Could not send REFER message", status) _pjsip_msg_to_dict(tdata.msg, tdata_dict) _add_event("SIPInvitationTransferNewOutgoing", tdata_dict) self._transfer_timeout_timer = Timer() self._transfer_timeout_timer.schedule(90, self._transfer_cb_timeout_timer, self) finally: with nogil: pj_mutex_unlock(lock) def notify_transfer_progress(self, int code, str reason=None): cdef int status cdef PJSIPUA ua cdef pj_mutex_t *lock = self._lock ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._transfer_usage == NULL: raise SIPCoreError("No transfer is in progress") if self._transfer_usage_role != PJSIP_ROLE_UAS: raise SIPCoreError("Transfer progress can only be notified by the transfer UAS") self._set_sipfrag_payload(code, reason) if 200 <= code < 700: self._terminate_transfer_uas() else: self._send_notify() finally: with nogil: pj_mutex_unlock(lock) def end(self, list extra_headers not None=list(), timeout=None): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjsip_inv_session *invite_session cdef pjsip_tx_data *tdata cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: invite_session = self._invite_session if self.state == "disconnected": return if self.state == "disconnecting": raise SIPCoreError('INVITE session is already in the "disconnecting" state') if self._invite_session == NULL: raise SIPCoreError("INVITE session is not active") if self.state not in ("outgoing", "early", "connecting", "connected"): raise SIPCoreError('Can only end the INVITE dialog from the "outgoing", "early", "connecting" and "connected" states' + 'current in the "%s" state.' % self.state) if self.state == "early" and self.direction != "outgoing": raise SIPCoreError('Cannot end incoming INVITE dialog while in the "early" state') if timeout is not None and timeout <= 0: raise ValueError("Timeout value cannot be negative") # End ongoing transfer self._terminate_transfer() with nogil: status = pjsip_inv_end_session(invite_session, 0, NULL, &tdata) if status != 0: raise PJSIPError("Could not create message to end INVITE session", status) if tdata != NULL: _add_headers_to_tdata(tdata, extra_headers) with nogil: status = pjsip_inv_send_msg(invite_session, tdata) if status != 0: raise PJSIPError("Could not send %s" % _pj_str_to_str(tdata.msg.line.req.method.name), status) if self._timer is not None: self._timer.cancel() self._timer = None if timeout is not None and timeout > 0: self._timer = Timer() self._timer.schedule(timeout, self._cb_timer_disconnect, self) event_dict = dict(obj=self, prev_state=self.state, state="disconnecting", originator="local") if self.state == "connected": event_dict["prev_sub_state"] = self.sub_state self.state = "disconnecting" self.sub_state = None if tdata != NULL: _pjsip_msg_to_dict(tdata.msg, event_dict) _add_event("SIPInvitationChangedState", event_dict) finally: with nogil: pj_mutex_unlock(lock) property local_identity: def __get__(self): if self.direction == 'outgoing': return self.from_header elif self.direction == 'incoming': return self.to_header else: return None property remote_identity: def __get__(self): if self.direction == 'incoming': return self.from_header elif self.direction == 'outgoing': return self.to_header else: return None property dialog_id: def __get__(self): local_tag = remote_tag = None if self.local_identity is not None: local_tag = self.local_identity.tag if self.remote_identity is not None: remote_tag = self.remote_identity.tag return DialogID(self.call_id, local_tag, remote_tag) cdef PJSIPUA _check_ua(self): try: return _get_ua() except: self.state = "disconnected" self.sub_state = None self._dialog = NULL self._invite_session = NULL self._reinvite_transaction = NULL cdef int _do_dealloc(self) except -1: cdef int status cdef pj_mutex_t *lock = self._lock cdef pjsip_inv_session *invite_session cdef PJSIPUA ua try: ua = _get_ua() except SIPCoreError: return 0 with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: invite_session = self._invite_session if self._invite_session != NULL: self._invite_session.mod_data[ua._module.id] = NULL if self.state != "disconnecting": with nogil: pjsip_inv_terminate(invite_session, 481, 0) self._dialog = NULL self._invite_session = NULL self._reinvite_transaction = NULL if self._timer is not None: self._timer.cancel() self._timer = None finally: with nogil: pj_mutex_unlock(lock) return 0 def __dealloc__(self): cdef Timer timer self._do_dealloc() if self._lock != NULL: pj_mutex_destroy(self._lock) timer = Timer() try: timer.schedule(60, deallocate_weakref, self.weakref) except SIPCoreError: pass cdef int _update_contact_header(self, BaseContactHeader contact_header) except -1: # The PJSIP functions called here don't do much, so there is no need to call them # without the gil. cdef pj_str_t contact_str_pj cdef pjsip_uri *contact contact_str = str(contact_header.uri) if contact_header.display_name: contact_str = "%s <%s>" % (contact_header.display_name, contact_str) - print('Built contact_str %s' % contact_str) pj_strdup2_with_null(self._dialog.pool, &contact_str_pj, contact_str.encode()) contact = pjsip_parse_uri(self._dialog.pool, contact_str_pj.ptr, contact_str_pj.slen, PJSIP_PARSE_URI_AS_NAMEADDR) if contact == NULL: raise SIPCoreError("Not a valid Contact header: %s" % contact_str) self._dialog.local.contact = pjsip_contact_hdr_create(self._dialog.pool) self._dialog.local.contact.uri = contact if contact_header.expires is not None: self._dialog.local.contact.expires = contact_header.expires if contact_header.q is not None: self._dialog.local.contact.q1000 = int(contact_header.q*1000) parameters = contact_header.parameters.copy() parameters.pop("q", None) parameters.pop("expires", None) _dict_to_pjsip_param(parameters, &self._dialog.local.contact.other_param, self._dialog.pool) self.local_contact_header = FrozenContactHeader.new(contact_header) return 0 cdef int _fail(self, PJSIPUA ua) except -1: cdef Timer timer ua._handle_exception(0) if self._transfer_usage != NULL: with nogil: pjsip_evsub_terminate(self._transfer_usage, 0) pjsip_evsub_set_mod_data(self._transfer_usage, ua._event_module.id, NULL) if self._transfer_timeout_timer is not None: self._transfer_timeout_timer.cancel() self._transfer_timeout_timer = None if self._transfer_refresh_timer is not None: self._transfer_refresh_timer.cancel() self._transfer_refresh_timer = None self._transfer_usage = NULL _add_event("SIPInvitationTransferDidFail", dict(obj=self, code=0, reason="internal error")) self._invite_session.mod_data[ua._module.id] = NULL if self.state != "disconnected": event_dict = dict(obj=self, prev_state=self.state, state="disconnected", originator="local", code=0, reason="internal error", disconnect_reason="internal error") if self.state == "connected": event_dict["prev_sub_state"] = self.sub_state self.state = "disconnected" self.sub_state = None _add_event("SIPInvitationChangedState", event_dict) # calling do_dealloc from within a callback makes PJSIP crash # the handler will be executed after pjsip_endpt_handle_events returns timer = Timer() timer.schedule(0, self._cb_postpoll_fail, self) return 0 cdef int _cb_state(self, StateCallbackTimer timer) except -1: cdef int status cdef bint pjsip_error = False cdef pj_mutex_t *lock = self._lock cdef pjmedia_sdp_session_ptr_const sdp cdef pjsip_inv_session *invite_session cdef object state cdef object sub_state cdef object rdata cdef object tdata cdef object originator cdef PJSIPUA ua ua = self._check_ua() if ua is None: return 0 with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: invite_session = self._invite_session state = timer.state sub_state = timer.sub_state rdata = timer.rdata tdata = timer.tdata originator = timer.originator if state != "early" and state == self.state and sub_state == self.sub_state: return 0 if state == "connected": if self.state == "connecting" and self._sdp_neg_status != 0: self.end() return 0 if state == "disconnected" and self.state != "disconnecting": # the invite session may have been destroyed if it failed if not self._invite_session: return 0 # we either sent a cancel or a negative reply to an incoming INVITE if self._invite_session.cancelling or (self.state in ("incoming", "early") and self.direction == "incoming" and rdata is None): # we caused the disconnect so send the transition to the disconnecting state pjsip_error = True event_dict = dict(obj=self, prev_state=self.state, state="disconnecting", originator="local") self.state = "disconnecting" _add_event("SIPInvitationChangedState", event_dict) if self.direction == "outgoing" and state in ('connecting', 'connected') and self.state in ('outgoing', 'early') and rdata is not None: self.to_header = rdata['headers']['To'] if self.direction == "incoming" and state in ('connecting', 'connected') and self.state in ('incoming', 'early') and tdata is not None: self.to_header = tdata['headers']['To'] event_dict = dict(obj=self, prev_state=self.state, state=state) if self.state == "connected": event_dict["prev_sub_state"] = self.sub_state if state == "connected": event_dict["sub_state"] = sub_state event_dict["originator"] = originator if rdata is not None: event_dict.update(rdata) if tdata is not None: event_dict.update(tdata) if rdata is None and tdata is None: event_dict['headers'] = dict() event_dict['body'] = None if self.remote_user_agent is None and state in ('connecting', 'connected') and rdata is not None: if 'User-Agent' in event_dict['headers']: self.remote_user_agent = event_dict['headers']['User-Agent'].body elif 'Server' in event_dict['headers']: self.remote_user_agent = event_dict['headers']['Server'].body if state not in ('disconnecting', 'disconnected') and rdata is not None: try: self.remote_contact_header = event_dict['headers']['Contact'][0] except LookupError: pass if state == "connected": if sub_state == "received_proposal": self._reinvite_transaction = self._invite_session.invite_tsx if pjmedia_sdp_neg_get_state(self._invite_session.neg) == PJMEDIA_SDP_NEG_STATE_REMOTE_OFFER: pjmedia_sdp_neg_get_neg_remote(self._invite_session.neg, &sdp) self.sdp.proposed_remote = FrozenSDPSession_create(sdp) elif sub_state == "sent_proposal": if pjmedia_sdp_neg_get_state(self._invite_session.neg) == PJMEDIA_SDP_NEG_STATE_LOCAL_OFFER: pjmedia_sdp_neg_get_neg_local(self._invite_session.neg, &sdp) self.sdp.proposed_local = FrozenSDPSession_create(sdp) elif sub_state == "received_proposal_request": self._reinvite_transaction = self._invite_session.invite_tsx if pjmedia_sdp_neg_get_state(self._invite_session.neg) == PJMEDIA_SDP_NEG_STATE_LOCAL_OFFER: pjmedia_sdp_neg_get_neg_local(self._invite_session.neg, &sdp) self.sdp.proposed_local = FrozenSDPSession_create(sdp) elif self.sub_state in ("received_proposal", "sent_proposal", "received_proposal_request"): if (rdata, tdata) == (None, None): event_dict['code'] = 408 event_dict['reason'] = 'Request Timeout' if pjmedia_sdp_neg_get_state(self._invite_session.neg) in (PJMEDIA_SDP_NEG_STATE_LOCAL_OFFER, PJMEDIA_SDP_NEG_STATE_REMOTE_OFFER): pjmedia_sdp_neg_cancel_offer(self._invite_session.neg) self._reinvite_transaction = NULL if state == "disconnected": event_dict["disconnect_reason"] = "user request" if not pjsip_error else "internal error" event_dict["code"] = self._invite_session.cause if self._invite_session.cause > 0: event_dict["reason"] = _pj_str_to_str(self._invite_session.cause_text) else: event_dict["reason"] = "" if not self._invite_session.cancelling and rdata is None and self._invite_session.cause > 0: # pjsip internally generates 408 and 503 if self._invite_session.cause == 408: if self.direction == "incoming" and self.state == "connecting": event_dict["disconnect_reason"] = "missing ACK" else: event_dict["disconnect_reason"] = "timeout" else: event_dict["disconnect_reason"] = _pj_str_to_str(self._invite_session.cause_text) elif self._invite_session.cancelling and rdata is None and self._invite_session.cause == 408 and self.state == "disconnecting": # silly pjsip sets cancelling field when we call pjsip_inv_end_session in end even if we send a BYE event_dict["disconnect_reason"] = "timeout" elif rdata is not None and 'Reason' in event_dict['headers']: try: reason = event_dict['headers']['Reason'].text if reason: event_dict["disconnect_reason"] = reason except (ValueError, IndexError): pass if self._transfer_usage != NULL: with nogil: pjsip_evsub_terminate(self._transfer_usage, 0) pjsip_evsub_set_mod_data(self._transfer_usage, ua._event_module.id, NULL) if self._transfer_timeout_timer is not None: self._transfer_timeout_timer.cancel() self._transfer_timeout_timer = None if self._transfer_refresh_timer is not None: self._transfer_refresh_timer.cancel() self._transfer_refresh_timer = None self._transfer_usage = NULL _add_event("SIPInvitationTransferDidFail", dict(obj=self, code=0, reason="invite dialog ended")) self._invite_session.mod_data[ua._module.id] = NULL self._invite_session = NULL self._dialog = NULL if self._timer is not None: self._timer.cancel() self._timer = None elif state in ("early", "connecting") and self._timer is not None: self._timer.cancel() self._timer = None self.state = state self.sub_state = sub_state _add_event("SIPInvitationChangedState", event_dict) finally: with nogil: pj_mutex_unlock(lock) return 0 cdef int _cb_sdp_done(self, SDPCallbackTimer timer) except -1: cdef int status cdef pj_mutex_t *lock = self._lock with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._failed_response == 1: return 0 self._sdp_neg_status = timer.status self.sdp.proposed_local = None self.sdp.proposed_remote = None if timer.status == 0: self.sdp.active_local = timer.active_local self.sdp.active_remote = timer.active_remote if self.state in ["disconnecting", "disconnected"]: return 0 event_dict = dict(obj=self, succeeded=timer.status == 0) if timer.status == 0: event_dict["local_sdp"] = self.sdp.active_local event_dict["remote_sdp"] = self.sdp.active_remote else: event_dict["error"] = _pj_status_to_str(timer.status) _add_event("SIPInvitationGotSDPUpdate", event_dict) finally: with nogil: pj_mutex_unlock(lock) return 0 cdef int _cb_timer_disconnect(self, timer) except -1: cdef pjsip_inv_session *invite_session = self._invite_session with nogil: pjsip_inv_terminate(invite_session, 408, 1) cdef int _cb_postpoll_fail(self, timer) except -1: self._do_dealloc() cdef int _start_incoming_transfer(self, timer) except -1: cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return 0 with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: self._set_sipfrag_payload(100, "Trying") self._send_notify() finally: with nogil: pj_mutex_unlock(lock) return 0 cdef int _terminate_transfer(self) except -1: if self._transfer_usage == NULL: return 0 if self._transfer_usage_role == PJSIP_ROLE_UAC: self._terminate_transfer_uac() else: self._terminate_transfer_uas() cdef int _terminate_transfer_uac(self) except -1: cdef pjsip_tx_data *tdata cdef int status cdef TransferStateCallbackTimer state_timer try: with nogil: status = pjsip_evsub_initiate(self._transfer_usage, NULL, 0, &tdata) if status != 0: raise PJSIPError("Could not create SUBSCRIBE message", status) with nogil: status = pjsip_evsub_send_request(self._transfer_usage, tdata) if status != 0: raise PJSIPError("Could not send SUBSCRIBE message", status) if self._transfer_timeout_timer is not None: self._transfer_timeout_timer.cancel() self._transfer_timeout_timer = None if self._transfer_refresh_timer is not None: self._transfer_refresh_timer.cancel() self._transfer_refresh_timer = None self._transfer_timeout_timer = Timer() self._transfer_timeout_timer.schedule(1, self._transfer_cb_timeout_timer, self) except PJSIPError, e: if self._transfer_usage != NULL: code = 0 reason = e.args[0] with nogil: pjsip_evsub_terminate(self._transfer_usage, 0) # Manually trigger the state callback since we handle the timeout ourselves state_timer = TransferStateCallbackTimer("TERMINATED", code, reason) state_timer.schedule(0, self._transfer_cb_state, self) cdef int _terminate_transfer_uas(self) except -1: global sipfrag_re cdef int code cdef TransferStateCallbackTimer state_timer if self.transfer_state == "TERMINATED": return 0 self._set_transfer_state("TERMINATED") self._send_notify() with nogil: pjsip_evsub_terminate(self._transfer_usage, 0) match = sipfrag_re.match(self._sipfrag_payload.str) code = int(match.group('code')) reason = match.group('reason') state_timer = TransferStateCallbackTimer("TERMINATED", code, reason) state_timer.schedule(0, self._transfer_cb_state, self) cdef int _set_transfer_state(self, str state) except -1: cdef str prev_state prev_state = self.transfer_state self.transfer_state = state if prev_state != state: _add_event("SIPInvitationTransferChangedState", dict(obj=self, prev_state=prev_state, state=state)) cdef int _set_sipfrag_payload(self, int code, str status) except -1: cdef str content if status is None: try: status = sip_status_messages[code] except IndexError: status = "Unknown" - content = "SIP/2.0 %d %s\r\n" % (code, status) + content = b"SIP/2.0 %d %s\r\n" % (code, status) self._sipfrag_payload = PJSTR(content) cdef int _send_notify(self) except -1: cdef pjsip_evsub_state state cdef pj_str_t *reason_p = NULL cdef pjsip_tx_data *tdata cdef int status cdef dict _sipfrag_version = dict(version="2.0") cdef PJSTR _content_type = PJSTR("message") cdef PJSTR _content_subtype = PJSTR("sipfrag") cdef PJSTR noresource = PJSTR("noresource") cdef PJSTR content if self.transfer_state == "ACTIVE": state = PJSIP_EVSUB_STATE_ACTIVE else: state = PJSIP_EVSUB_STATE_TERMINATED reason_p = &noresource.pj_str with nogil: status = pjsip_evsub_notify(self._transfer_usage, state, NULL, reason_p, &tdata) if status != 0: raise PJSIPError("Could not create NOTIFY request", status) if self.transfer_state in ("ACTIVE", "TERMINATED"): tdata.msg.body = pjsip_msg_body_create(tdata.pool, &_content_type.pj_str, &_content_subtype.pj_str, &self._sipfrag_payload.pj_str) _dict_to_pjsip_param(_sipfrag_version, &tdata.msg.body.content_type.param, tdata.pool) with nogil: status = pjsip_evsub_send_request(self._transfer_usage, tdata) if status != 0: raise PJSIPError("Could not send NOTIFY request", status) return 0 cdef int _transfer_cb_timeout_timer(self, timer) except -1: global sip_status_messages cdef int code cdef str reason cdef int status cdef TransferStateCallbackTimer state_timer cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return 0 with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._transfer_usage != NULL: code = PJSIP_SC_TSX_TIMEOUT reason = sip_status_messages[PJSIP_SC_TSX_TIMEOUT] with nogil: pjsip_evsub_terminate(self._transfer_usage, 0) # Manually trigger the state callback since we handle the timeout ourselves state_timer = TransferStateCallbackTimer("TERMINATED", code, reason) state_timer.schedule(0, self._transfer_cb_state, self) finally: with nogil: pj_mutex_unlock(lock) return 0 cdef int _transfer_cb_refresh_timer(self, timer) except -1: cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return 0 with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: self._terminate_transfer() finally: with nogil: pj_mutex_unlock(lock) return 0 cdef int _transfer_cb_state(self, TransferStateCallbackTimer timer) except -1: cdef int status cdef str prev_state cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return 0 with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: prev_state = self.transfer_state self._set_transfer_state(timer.state) if timer.state == "ACCEPTED" and prev_state == "SENT": _add_event("SIPInvitationTransferDidStart", dict(obj=self)) elif timer.state == "TERMINATED": # If a NOTIFY is rejected with 408 or 481 PJSIP will erase the subscription if self._transfer_usage != NULL: pjsip_evsub_set_mod_data(self._transfer_usage, ua._event_module.id, NULL) if self._transfer_timeout_timer is not None: self._transfer_timeout_timer.cancel() self._transfer_timeout_timer = None if self._transfer_refresh_timer is not None: self._transfer_refresh_timer.cancel() self._transfer_refresh_timer = None self._transfer_usage = NULL if timer.code/100 == 2: _add_event("SIPInvitationTransferDidEnd", dict(obj=self)) else: _add_event("SIPInvitationTransferDidFail", dict(obj=self, code=timer.code, reason=timer.reason)) finally: with nogil: pj_mutex_unlock(lock) return 0 cdef int _transfer_cb_response(self, TransferResponseCallbackTimer timer) except -1: cdef int expires cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return 0 with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._transfer_timeout_timer is not None: self._transfer_timeout_timer.cancel() self._transfer_timeout_timer = None finally: with nogil: pj_mutex_unlock(lock) return 0 cdef int _transfer_cb_notify(self, TransferRequestCallbackTimer timer) except -1: cdef pj_time_val refresh cdef int expires cdef dict notify_dict = dict(obj=self) cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return 0 with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: sub_state_hdr = timer.rdata["headers"].get("Subscription-State", None) if self.transfer_state != "TERMINATED" and sub_state_hdr is not None and sub_state_hdr.expires > 0: if self._transfer_refresh_timer is not None: self._transfer_refresh_timer.cancel() self._transfer_refresh_timer = None expires = max(1, sub_state_hdr.expires - self.expire_warning_time, sub_state_hdr.expires/2) self._transfer_refresh_timer = Timer() self._transfer_refresh_timer.schedule(expires, self._transfer_cb_refresh_timer, self) notify_dict["request_uri"] = timer.rdata["request_uri"] notify_dict["from_header"] = timer.rdata["headers"].get("From", None) notify_dict["to_header"] = timer.rdata["headers"].get("To", None) notify_dict["headers"] = timer.rdata["headers"] notify_dict["body"] = timer.rdata["body"] content_type = notify_dict["headers"].get("Content-Type", None) notify_dict["content_type"] = content_type.content_type if content_type else None event = notify_dict["headers"].get("Event", None) notify_dict["event"] = event.event if event else None _add_event("SIPInvitationTransferGotNotify", notify_dict) finally: with nogil: pj_mutex_unlock(lock) return 0 cdef int _transfer_cb_server_timeout(self, timer) except -1: cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return 0 with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: self._terminate_transfer() finally: with nogil: pj_mutex_unlock(lock) return 0 # Callback functions # cdef void _Invitation_cb_state(pjsip_inv_session *inv, pjsip_event *e) with gil: cdef pjsip_rx_data *rdata = NULL cdef pjsip_tx_data *tdata = NULL cdef object state cdef object rdata_dict = None cdef object tdata_dict = None cdef object originator = None cdef Invitation invitation cdef PJSIPUA ua cdef StateCallbackTimer timer try: ua = _get_ua() except: return try: if inv.state == PJSIP_INV_STATE_INCOMING: return if inv.mod_data[ua._module.id] != NULL: invitation = ( inv.mod_data[ua._module.id])() if invitation is None: return state = pjsip_inv_state_name(inv.state).lower() sub_state = None if state == "calling": state = "outgoing" elif state == "confirmed": state = "connected" sub_state = "normal" elif state == "disconnctd": state = "disconnected" if e != NULL: if e.type == PJSIP_EVENT_TSX_STATE and e.body.tsx_state.type == PJSIP_EVENT_TX_MSG: tdata = e.body.tsx_state.src.tdata if (tdata.msg.type == PJSIP_RESPONSE_MSG and tdata.msg.line.status.code == 487 and state == "disconnected" and invitation.state in ["incoming", "early"]): return elif e.type == PJSIP_EVENT_RX_MSG: rdata = e.body.rx_msg.rdata elif e.type == PJSIP_EVENT_TSX_STATE and e.body.tsx_state.type == PJSIP_EVENT_RX_MSG: if (inv.state != PJSIP_INV_STATE_CONFIRMED or e.body.tsx_state.src.rdata.msg_info.msg.type == PJSIP_REQUEST_MSG): rdata = e.body.tsx_state.src.rdata elif e.type == PJSIP_EVENT_TSX_STATE and e.body.tsx_state.type == PJSIP_EVENT_TRANSPORT_ERROR and e.body.tsx_state.tsx.role == PJSIP_ROLE_UAC: # A transport error occurred, fake a local reply rdata_dict = dict() rdata_dict["code"] = 408 rdata_dict["reason"] = "Transport Error" rdata_dict["headers"] = dict() rdata_dict["body"] = None originator = "local" if rdata != NULL: if invitation.peer_address is None: invitation.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) else: invitation.peer_address.ip = rdata.pkt_info.src_name invitation.peer_address.port = rdata.pkt_info.src_port rdata_dict = dict() _pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict) originator = "remote" if tdata != NULL: tdata_dict = dict() _pjsip_msg_to_dict(tdata.msg, tdata_dict) originator = "local" try: timer = StateCallbackTimer(state, sub_state, rdata_dict, tdata_dict, originator) timer.schedule(0, invitation._cb_state, invitation) except: invitation._fail(ua) except: ua._handle_exception(1) cdef void _Invitation_cb_sdp_done(pjsip_inv_session *inv, int status) with gil: cdef Invitation invitation cdef PJSIPUA ua cdef SDPCallbackTimer timer cdef pjmedia_sdp_session_ptr_const sdp try: ua = _get_ua() except: return try: if inv.mod_data[ua._module.id] != NULL: invitation = ( inv.mod_data[ua._module.id])() if invitation is None: return if status == 0: if pjmedia_sdp_neg_get_active_local(invitation._invite_session.neg, &sdp) == 0: local_sdp = SDPSession_create(sdp) else: local_sdp = None if pjmedia_sdp_neg_get_active_remote(invitation._invite_session.neg, &sdp) == 0: remote_sdp = SDPSession_create(sdp) else: remote_sdp = None if local_sdp is None or remote_sdp is None: active_local = None active_remote = None else: if len(local_sdp.media) > len(remote_sdp.media): local_sdp.media = local_sdp.media[:len(remote_sdp.media)] if len(remote_sdp.media) > len(local_sdp.media): remote_sdp.media = remote_sdp.media[:len(local_sdp.media)] for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] if not local_media.port and remote_media.port: remote_media.port = 0 if not remote_media.port and local_media.port: local_media.port = 0 active_local = FrozenSDPSession.new(local_sdp) active_remote = FrozenSDPSession.new(remote_sdp) else: active_local = None active_remote = None try: timer = SDPCallbackTimer(status, active_local, active_remote) timer.schedule(0, invitation._cb_sdp_done, invitation) except: invitation._fail(ua) except: ua._handle_exception(1) cdef int _Invitation_cb_rx_reinvite(pjsip_inv_session *inv, pjmedia_sdp_session_ptr_const offer, pjsip_rx_data *rdata) with gil: cdef int status cdef pjsip_tx_data *answer_tdata cdef object rdata_dict = None cdef Invitation invitation cdef PJSIPUA ua cdef StateCallbackTimer timer try: ua = _get_ua() except: return 1 try: if inv.mod_data[ua._module.id] != NULL: invitation = ( inv.mod_data[ua._module.id])() if invitation is None: return 1 if invitation.peer_address is None: invitation.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) else: invitation.peer_address.ip = rdata.pkt_info.src_name invitation.peer_address.port = rdata.pkt_info.src_port rdata_dict = dict() _pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict) with nogil: status = pjsip_inv_initial_answer(inv, rdata, 100, NULL, NULL, &answer_tdata) if status != 0: raise PJSIPError("Could not create initial (unused) response to re-INVITE", status) with nogil: pjsip_tx_data_dec_ref(answer_tdata) if offer != NULL: sub_state = "received_proposal" else: sub_state = "received_proposal_request" try: timer = StateCallbackTimer("connected", sub_state, rdata_dict, None, "remote") timer.schedule(0, invitation._cb_state, invitation) except: invitation._fail(ua) return 1 return 0 except: ua._handle_exception(1) return 1 cdef void _Invitation_cb_tsx_state_changed(pjsip_inv_session *inv, pjsip_transaction *tsx, pjsip_event *e) with gil: cdef pjsip_rx_data *rdata = NULL cdef pjsip_tx_data *tdata = NULL cdef object rdata_dict = None cdef object tdata_dict = None cdef object originator = None cdef Invitation invitation cdef PJSIPUA ua cdef StateCallbackTimer timer cdef TransferRequestCallbackTimer transfer_timer try: ua = _get_ua() except: return try: if tsx == NULL or e == NULL: return if e.type == PJSIP_EVENT_TSX_STATE and e.body.tsx_state.type == PJSIP_EVENT_RX_MSG: rdata = e.body.tsx_state.src.rdata if e.type == PJSIP_EVENT_TSX_STATE and e.body.tsx_state.type == PJSIP_EVENT_TX_MSG: tdata = e.body.tsx_state.src.tdata if inv.mod_data[ua._module.id] != NULL: invitation = ( inv.mod_data[ua._module.id])() if invitation is None: return if rdata != NULL: if invitation.peer_address is None: invitation.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) else: invitation.peer_address.ip = rdata.pkt_info.src_name invitation.peer_address.port = rdata.pkt_info.src_port if ((tsx.state == PJSIP_TSX_STATE_TERMINATED or tsx.state == PJSIP_TSX_STATE_COMPLETED) and (inv.neg != NULL and pjmedia_sdp_neg_get_state(inv.neg) in (PJMEDIA_SDP_NEG_STATE_REMOTE_OFFER, PJMEDIA_SDP_NEG_STATE_DONE)) and invitation._reinvite_transaction != NULL and invitation._reinvite_transaction == tsx): if rdata != NULL: rdata_dict = dict() _pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict) originator = "remote" if tdata != NULL: tdata_dict = dict() _pjsip_msg_to_dict(tdata.msg, tdata_dict) originator = "local" try: timer = StateCallbackTimer("connected", "normal", rdata_dict, tdata_dict, originator) timer.schedule(0, invitation._cb_state, invitation) except: invitation._fail(ua) elif (invitation.state in ("incoming", "early") and invitation.direction == "incoming" and rdata != NULL and rdata.msg_info.msg.type == PJSIP_REQUEST_MSG and rdata.msg_info.msg.line.req.method.id == PJSIP_CANCEL_METHOD): rdata_dict = dict() _pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict) originator = "remote" try: timer = StateCallbackTimer("disconnected", None, rdata_dict, None, originator) timer.schedule(0, invitation._cb_state, invitation) except: invitation._fail(ua) elif (tsx.role == PJSIP_ROLE_UAS and tsx.state == PJSIP_TSX_STATE_TRYING and rdata != NULL and rdata.msg_info.msg.type == PJSIP_REQUEST_MSG and _pj_str_to_str(tsx.method.name) == "REFER"): invitation.process_incoming_transfer(ua, rdata) elif (tsx.role == PJSIP_ROLE_UAS and tsx.state == PJSIP_TSX_STATE_TRYING and rdata != NULL and rdata.msg_info.msg.type == PJSIP_REQUEST_MSG and tsx.method.id == PJSIP_OPTIONS_METHOD): invitation.process_incoming_options(ua, rdata) except: ua._handle_exception(1) cdef void _Invitation_cb_new(pjsip_inv_session *inv, pjsip_event *e) with gil: # As far as I can tell this is never actually called! pass cdef void _Invitation_transfer_cb_state(pjsip_evsub *sub, pjsip_event *event) with gil: cdef void *invitation_void cdef Invitation invitation cdef object state cdef int code = 0 cdef dict event_dict = dict() cdef str reason = None cdef pjsip_rx_data *rdata = NULL cdef PJSIPUA ua try: ua = _get_ua() except: return try: invitation_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if invitation_void == NULL: return invitation = ( invitation_void)() if invitation is None: return state = pjsip_evsub_get_state_name(sub) if (event != NULL and event.type == PJSIP_EVENT_TSX_STATE and (event.body.tsx_state.tsx.state == PJSIP_TSX_STATE_COMPLETED or event.body.tsx_state.tsx.state == PJSIP_TSX_STATE_TERMINATED)): if state == "TERMINATED": if event.body.tsx_state.tsx.role == PJSIP_ROLE_UAC: code = event.body.tsx_state.tsx.status_code reason = _pj_str_to_str(event.body.tsx_state.tsx.status_text) else: reason = "Referral has expired" if event.body.tsx_state.type == PJSIP_EVENT_RX_MSG and _pj_str_to_str(event.body.tsx_state.tsx.method.name) == "NOTIFY": # Extract code and reason from the sipfrag payload rdata = event.body.tsx_state.src.rdata if rdata != NULL: _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) if event_dict.get('body', None) is not None: match = sipfrag_re.match(event_dict['body']) if match: code = int(match.group('code')) reason = match.group('reason') try: timer = TransferStateCallbackTimer(state, code, reason) timer.schedule(0, invitation._transfer_cb_state, invitation) except: invitation._fail(ua) except: ua._handle_exception(1) cdef void _Invitation_transfer_cb_tsx(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event) with gil: cdef void *invitation_void cdef Invitation invitation cdef pjsip_rx_data *rdata cdef PJSIPUA ua try: ua = _get_ua() except: return try: invitation_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if invitation_void == NULL: return invitation = ( invitation_void)() if invitation is None: return if (event != NULL and event.type == PJSIP_EVENT_TSX_STATE and event.body.tsx_state.type == PJSIP_EVENT_RX_MSG and event.body.tsx_state.tsx.role == PJSIP_ROLE_UAC and event.body.tsx_state.tsx.state == PJSIP_TSX_STATE_COMPLETED and _pj_str_to_str(event.body.tsx_state.tsx.method.name) in ("REFER", "SUBSCRIBE") and event.body.tsx_state.tsx.status_code/100 == 2): rdata = event.body.tsx_state.src.rdata if rdata != NULL: rdata_dict = dict() _pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict) try: timer = TransferResponseCallbackTimer(_pj_str_to_bytes(event.body.tsx_state.tsx.method.name), rdata_dict) timer.schedule(0, invitation._transfer_cb_response, invitation) except: invitation._fail(ua) except: ua._handle_exception(1) cdef void _Invitation_transfer_cb_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil: cdef void *invitation_void cdef Invitation invitation cdef TransferRequestCallbackTimer timer cdef PJSIPUA ua try: ua = _get_ua() except: return try: invitation_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if invitation_void == NULL: return invitation = ( invitation_void)() if invitation is None: return if rdata != NULL: rdata_dict = dict() _pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict) try: timer = TransferRequestCallbackTimer(rdata_dict) timer.schedule(0, invitation._transfer_cb_notify, invitation) except: invitation._fail(ua) except: ua._handle_exception(1) cdef void _Invitation_transfer_cb_refresh(pjsip_evsub *sub) with gil: # We want to handle the refresh timer oursevles, ignore the PJSIP provided timer pass cdef void _Invitation_transfer_in_cb_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil: cdef void *invitation_void cdef dict rdata_dict cdef pjsip_expires_hdr *expires_header cdef Invitation invitation cdef Timer timer cdef PJSIPUA ua try: ua = _get_ua() except: return try: invitation_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if invitation_void == NULL: p_st_code[0] = 481 return invitation = ( invitation_void)() if invitation is None: p_st_code[0] = 481 return expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, NULL) if expires_header != NULL and expires_header.ivalue == 0: try: timer = Timer() timer.schedule(0, invitation._terminate_transfer, invitation) except: invitation._fail(ua) p_st_code[0] = 200 return p_st_code[0] = 501 except: ua._handle_exception(1) cdef void _Invitation_transfer_in_cb_server_timeout(pjsip_evsub *sub) with gil: cdef void *invitation_void cdef Invitation invitation cdef Timer timer cdef PJSIPUA ua try: ua = _get_ua() except: return try: invitation_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if invitation_void == NULL: return invitation = ( invitation_void)() if invitation is None: return try: timer = Timer() timer.schedule(0, invitation._transfer_cb_server_timeout, invitation) except: invitation._fail(ua) except: ua._handle_exception(1) cdef void _Invitation_transfer_in_cb_tsx(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event) with gil: cdef void *invitation_void cdef Invitation invitation cdef PJSIPUA ua cdef pjsip_rx_data *rdata cdef dict event_dict cdef int code cdef str reason cdef TransferStateCallbackTimer timer try: ua = _get_ua() except: return try: invitation_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if invitation_void == NULL: return invitation = ( invitation_void)() if invitation is None: return if (event != NULL and event.type == PJSIP_EVENT_TSX_STATE and event.body.tsx_state.tsx.role == PJSIP_ROLE_UAC and _pj_str_to_str(event.body.tsx_state.tsx.method.name) == "NOTIFY" and event.body.tsx_state.tsx.state in (PJSIP_TSX_STATE_COMPLETED, PJSIP_TSX_STATE_TERMINATED)): code = event.body.tsx_state.tsx.status_code reason = _pj_str_to_str(event.body.tsx_state.tsx.status_text) if code in (408, 481) or code/100==7: # Be careful! PJSIP will erase the subscription timer = TransferStateCallbackTimer("TERMINATED", code, reason) timer.schedule(0, invitation._transfer_cb_state, invitation) except: ua._handle_exception(1) # Globals # cdef pjsip_inv_callback _inv_cb _inv_cb.on_state_changed = _Invitation_cb_state _inv_cb.on_media_update = _Invitation_cb_sdp_done _inv_cb.on_rx_reinvite = _Invitation_cb_rx_reinvite _inv_cb.on_tsx_state_changed = _Invitation_cb_tsx_state_changed _inv_cb.on_new_session = _Invitation_cb_new cdef pjsip_evsub_user _transfer_cb _transfer_cb.on_evsub_state = _Invitation_transfer_cb_state _transfer_cb.on_tsx_state = _Invitation_transfer_cb_tsx _transfer_cb.on_rx_notify = _Invitation_transfer_cb_notify _transfer_cb.on_client_refresh = _Invitation_transfer_cb_refresh cdef pjsip_evsub_user _incoming_transfer_cb _incoming_transfer_cb.on_rx_refresh = _Invitation_transfer_in_cb_rx_refresh _incoming_transfer_cb.on_server_timeout = _Invitation_transfer_in_cb_server_timeout _incoming_transfer_cb.on_tsx_state = _Invitation_transfer_in_cb_tsx diff --git a/sipsimple/core/_core.mediatransport.pxi b/sipsimple/core/_core.mediatransport.pxi index e64ee968..ee61d10a 100644 --- a/sipsimple/core/_core.mediatransport.pxi +++ b/sipsimple/core/_core.mediatransport.pxi @@ -1,2558 +1,2558 @@ import sys from errno import EADDRINUSE # classes cdef class RTPTransport: def __cinit__(self, *args, **kwargs): cdef int status cdef pj_pool_t *pool cdef bytes pool_name cdef char* c_pool_name cdef PJSIPUA ua ua = _get_ua() pool_name = b"RTPTransport_%d" % id(self) self.weakref = weakref.ref(self) Py_INCREF(self.weakref) self._af = pj_AF_INET() status = pj_mutex_create_recursive(ua._pjsip_endpoint._pool, "rtp_transport_lock", &self._lock) if status != 0: raise PJSIPError("failed to create lock", status) pool = ua.create_memory_pool(pool_name, 4096, 4096) self._pool = pool self.state = "NULL" def __init__(self, encryption=None, use_ice=False, ice_stun_address=None, ice_stun_port=PJ_STUN_PORT): cdef PJSIPUA ua = _get_ua() if self.state != "NULL": raise SIPCoreError("RTPTransport.__init__() was already called") self._rtp_valid_pair = None self._encryption = encryption self.use_ice = use_ice self.ice_stun_address = ice_stun_address self.ice_stun_port = ice_stun_port def __dealloc__(self): cdef PJSIPUA ua cdef pjmedia_transport *transport cdef Timer timer try: ua = _get_ua() except: return transport = self._obj if transport != NULL: transport.user_data = NULL if self._wrapped_transport != NULL: self._wrapped_transport.user_data = NULL with nogil: pjmedia_transport_media_stop(transport) pjmedia_transport_close(transport) self._obj = NULL self._wrapped_transport = NULL ua.release_memory_pool(self._pool) self._pool = NULL if self._lock != NULL: pj_mutex_destroy(self._lock) timer = Timer() try: timer.schedule(60, deallocate_weakref, self.weakref) except SIPCoreError: pass cdef PJSIPUA _check_ua(self): cdef PJSIPUA ua try: ua = _get_ua() return ua except: self.state = "INVALID" self._obj = NULL self._wrapped_transport = NULL self._pool = NULL return None cdef void _get_info(self, pjmedia_transport_info *info): cdef int status cdef pjmedia_transport *transport transport = self._obj with nogil: pjmedia_transport_info_init(info) status = pjmedia_transport_get_info(transport, info) if status != 0: raise PJSIPError("Could not get transport info", status) property local_rtp_port: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) if pj_sockaddr_has_addr(&info.sock_info.rtp_addr_name): return pj_sockaddr_get_port(&info.sock_info.rtp_addr_name) else: return None finally: with nogil: pj_mutex_unlock(lock) property local_rtp_address: def __get__(self): cdef char buf[PJ_INET6_ADDRSTRLEN] cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) if pj_sockaddr_has_addr(&info.sock_info.rtp_addr_name): return pj_sockaddr_print(&info.sock_info.rtp_addr_name, buf, PJ_INET6_ADDRSTRLEN, 0) else: return None finally: with nogil: pj_mutex_unlock(lock) property local_rtp_candidate: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._rtp_valid_pair: return self._rtp_valid_pair.local_candidate return None finally: with nogil: pj_mutex_unlock(lock) property remote_rtp_port: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None if self._ice_active() and self._rtp_valid_pair: return self._rtp_valid_pair.remote_candidate.port self._get_info(&info) if pj_sockaddr_has_addr(&info.src_rtp_name): return pj_sockaddr_get_port(&info.src_rtp_name) else: return None finally: with nogil: pj_mutex_unlock(lock) property remote_rtp_address: def __get__(self): cdef char buf[PJ_INET6_ADDRSTRLEN] cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None if self._ice_active() and self._rtp_valid_pair: return self._rtp_valid_pair.remote_candidate.address self._get_info(&info) if pj_sockaddr_has_addr(&info.src_rtp_name): return pj_sockaddr_print(&info.src_rtp_name, buf, PJ_INET6_ADDRSTRLEN, 0) else: return None finally: with nogil: pj_mutex_unlock(lock) property remote_rtp_candidate: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._rtp_valid_pair: return self._rtp_valid_pair.remote_candidate return None finally: with nogil: pj_mutex_unlock(lock) property srtp_active: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_srtp_info *srtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return False with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return False self._get_info(&info) srtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_SRTP) if srtp_info != NULL: return bool(srtp_info.active) return False finally: with nogil: pj_mutex_unlock(lock) property srtp_cipher: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_srtp_info *srtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) srtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_SRTP) if srtp_info == NULL or not bool(srtp_info.active): return None return _pj_str_to_bytes(srtp_info.tx_policy.name) finally: with nogil: pj_mutex_unlock(lock) property zrtp_active: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return False with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return False self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info != NULL: return bool(zrtp_info.active) return False finally: with nogil: pj_mutex_unlock(lock) cdef int _ice_active(self): # this function needs to be called with the lock held cdef pjmedia_transport_info info cdef pjmedia_ice_transport_info *ice_info if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return 0 self._get_info(&info) ice_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ICE) if ice_info != NULL and ice_info.sess_state == PJ_ICE_STRANS_STATE_RUNNING: return 1 return 0 property ice_active: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef PJSIPUA ua ua = self._check_ua() if ua is None: return False with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: return bool(self._ice_active()) finally: with nogil: pj_mutex_unlock(lock) cdef int _init_local_sdp(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index): cdef int status cdef pj_pool_t *pool cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_transport *transport pool = self._pool transport = self._obj pj_local_sdp = local_sdp.get_sdp_session() if remote_sdp is not None: pj_remote_sdp = remote_sdp.get_sdp_session() else: pj_remote_sdp = NULL if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if sdp_index >= pj_local_sdp.media_count: raise ValueError("sdp_index argument out of range") with nogil: status = pjmedia_transport_media_create(transport, pool, 0, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not create media transport", status) return 0 def set_LOCAL(self, SDPSession local_sdp, int sdp_index): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if local_sdp is None: raise SIPCoreError("local_sdp argument cannot be None") if self.state == "LOCAL": return if self.state != "INIT": raise SIPCoreError('set_LOCAL can only be called in the "INIT" state, current state is "%s"' % self.state) self._init_local_sdp(local_sdp, None, sdp_index) self.state = "LOCAL" finally: with nogil: pj_mutex_unlock(lock) def set_REMOTE(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if None in [local_sdp, remote_sdp]: raise SIPCoreError("SDP arguments cannot be None") if self.state == "REMOTE": return if self.state != "INIT": raise SIPCoreError('set_REMOTE can only be called in the "INIT" state, current state is "%s"' % self.state) self._init_local_sdp(local_sdp, remote_sdp, sdp_index) self.state = "REMOTE" finally: with nogil: pj_mutex_unlock(lock) def set_ESTABLISHED(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_transport *transport = self._obj _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: transport = self._obj if None in [local_sdp, remote_sdp]: raise SIPCoreError("SDP arguments cannot be None") pj_local_sdp = local_sdp.get_sdp_session() pj_remote_sdp = remote_sdp.get_sdp_session() if self.state == "ESTABLISHED": return if self.state not in ["LOCAL", "REMOTE"]: raise SIPCoreError('set_ESTABLISHED can only be called in the "INIT" and "LOCAL" states, ' + 'current state is "%s"' % self.state) with nogil: status = pjmedia_transport_media_start(transport, self._pool, pj_local_sdp, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not start media transport", status) self.state = "ESTABLISHED" finally: with nogil: pj_mutex_unlock(lock) def set_INIT(self): global _ice_cb cdef int af cdef int i cdef int status cdef int port cdef pj_caching_pool *caching_pool cdef pj_ice_strans_cfg ice_cfg cdef pj_ice_strans *ice_st cdef pj_ice_strans_state ice_state cdef pj_mutex_t *lock = self._lock cdef pj_str_t local_ip cdef pj_str_t *local_ip_address cdef pjmedia_endpt *media_endpoint cdef pjmedia_srtp_setting srtp_setting cdef pjmedia_transport **transport_address cdef pjmedia_transport *wrapped_transport cdef pjsip_endpoint *sip_endpoint cdef bytes zid_file cdef char *c_zid_file cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: af = self._af caching_pool = &ua._caching_pool._obj media_endpoint = ua._pjmedia_endpoint._obj sip_endpoint = ua._pjsip_endpoint._obj transport_address = &self._obj if self.state == "INIT": return if self.state in ["LOCAL", "ESTABLISHED"]: with nogil: status = pjmedia_transport_media_stop(transport_address[0]) if status != 0: raise PJSIPError("Could not stop media transport", status) self.state = "INIT" elif self.state == "NULL": if ua.ip_address is None: local_ip_address = NULL else: _str_to_pj_str(ua.ip_address, &local_ip) local_ip_address = &local_ip if self.use_ice: with nogil: pj_ice_strans_cfg_default(&ice_cfg) ice_cfg.af = self._af with nogil: pj_stun_config_init(&ice_cfg.stun_cfg, &caching_pool.factory, 0, pjmedia_endpt_get_ioqueue(media_endpoint), pjsip_endpt_get_timer_heap(sip_endpoint)) if self.ice_stun_address is not None: _str_to_pj_str(self.ice_stun_address, &ice_cfg.stun.server) ice_cfg.stun.port = self.ice_stun_port # IIRC we can't choose the port for ICE with nogil: status = pj_sockaddr_init(ice_cfg.af, &ice_cfg.stun.cfg.bound_addr, local_ip_address, 0) if status != 0: raise PJSIPError("Could not init ICE bound address", status) with nogil: status = pjmedia_ice_create2(media_endpoint, NULL, 2, &ice_cfg, &_ice_cb, 0, transport_address) if status != 0: raise PJSIPError("Could not create ICE media transport", status) else: status = PJ_EBUG for i in xrange(ua._rtp_port_index, ua._rtp_port_index + ua._rtp_port_usable_count, 2): port = ua._rtp_port_start + i % ua._rtp_port_usable_count with nogil: status = pjmedia_transport_udp_create3(media_endpoint, af, NULL, local_ip_address, port, 0, transport_address) if status != PJ_ERRNO_START_SYS + EADDRINUSE: ua._rtp_port_index = (i + 2) % ua._rtp_port_usable_count break if status != 0: raise PJSIPError("Could not create UDP/RTP media transport", status) self._obj.user_data = self.weakref if self._encryption is not None: wrapped_transport = self._wrapped_transport = self._obj self._obj = NULL if self._encryption.startswith('sdes'): with nogil: pjmedia_srtp_setting_default(&srtp_setting) if self._encryption == 'sdes_mandatory': srtp_setting.use = PJMEDIA_SRTP_MANDATORY with nogil: status = pjmedia_transport_srtp_create(media_endpoint, wrapped_transport, &srtp_setting, transport_address) if status != 0: with nogil: pjmedia_transport_close(wrapped_transport) self._wrapped_transport = NULL raise PJSIPError("Could not create SRTP media transport", status) elif self._encryption == 'zrtp': with nogil: status = pjmedia_transport_zrtp_create(media_endpoint, pjsip_endpt_get_timer_heap(sip_endpoint), wrapped_transport, transport_address, 1) if status == 0: zid_file = ua.zrtp_cache.encode(sys.getfilesystemencoding()) c_zid_file = zid_file with nogil: # Auto-enable is deactivated status = pjmedia_transport_zrtp_initialize(self._obj, c_zid_file, 0, &_zrtp_cb) if status != 0: with nogil: pjmedia_transport_close(wrapped_transport) self._wrapped_transport = NULL raise PJSIPError("Could not create ZRTP media transport", status) else: raise RuntimeError('invalid SRTP key negotiation specified: %s' % self._encryption) self._obj.user_data = self.weakref if not self.use_ice or self.ice_stun_address is None: self.state = "INIT" _add_event("RTPTransportDidInitialize", dict(obj=self)) else: self.state = "WAIT_STUN" if self.use_ice: _add_event("RTPTransportICENegotiationStateDidChange", dict(obj=self, prev_state="NULL", state="GATHERING")) ice_st = pjmedia_ice_get_strans(transport_address[0]) if ice_st != NULL: ice_state = pj_ice_strans_get_state(ice_st) if ice_state == PJ_ICE_STRANS_STATE_READY: _add_event("RTPTransportICENegotiationStateDidChange", dict(obj=self, prev_state="GATHERING", state="GATHERING_COMPLETE")) else: raise SIPCoreError('set_INIT can only be called in the "NULL", "LOCAL" and "ESTABLISHED" states, ' + 'current state is "%s"' % self.state) finally: with nogil: pj_mutex_unlock(lock) def set_zrtp_sas_verified(self, verified): cdef int status cdef int c_verified cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return False with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return False self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return False c_verified = int(verified) with nogil: pjmedia_transport_zrtp_setSASVerified(self._obj, c_verified) return True finally: with nogil: pj_mutex_unlock(lock) def set_zrtp_enabled(self, enabled, object master_stream): cdef int status cdef int c_enabled cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua cdef bytes multistream_params cdef char *c_multistream_params cdef int length cdef RTPTransport master_transport ua = self._check_ua() if ua is None: return with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL: return if master_stream is not None: master_transport = master_stream._rtp_transport assert master_transport is not None # extract the multistream parameters multistream_params = master_transport.zrtp_multistream_parameters if multistream_params: # set multistream mode in ourselves c_multistream_params = multistream_params length = len(multistream_params) with nogil: pjmedia_transport_zrtp_setMultiStreamParameters(self._obj, c_multistream_params, length, master_transport._obj) c_enabled = int(enabled) with nogil: pjmedia_transport_zrtp_setEnableZrtp(self._obj, c_enabled) finally: with nogil: pj_mutex_unlock(lock) property zrtp_multistream_parameters: def __get__(self): cdef int status cdef char* c_name cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua cdef char *multistr_params cdef int length ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return None with nogil: multistr_params = pjmedia_transport_zrtp_getMultiStreamParameters(self._obj, &length) if length > 0: ret = _pj_buf_len_to_str(multistr_params, length) free(multistr_params) return ret else: return None finally: with nogil: pj_mutex_unlock(lock) property zrtp_cipher: def __get__(self): cdef int status cdef char* c_name cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return None return _buf_to_str(zrtp_info.cipher) finally: with nogil: pj_mutex_unlock(lock) property zrtp_peer_name: def __get__(self): cdef int status cdef char* c_name cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return '' with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return '' self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return '' with nogil: c_name = pjmedia_transport_zrtp_getPeerName(self._obj) if c_name == NULL: return '' else: name = PyUnicode_FromString(c_name) or u'' free(c_name) return name finally: with nogil: pj_mutex_unlock(lock) def __set__(self, basestring name): cdef int status cdef char* c_name cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return name = name.encode('utf-8') c_name = name with nogil: pjmedia_transport_zrtp_putPeerName(self._obj, c_name) finally: with nogil: pj_mutex_unlock(lock) property zrtp_peer_id: def __get__(self): cdef int status cdef unsigned char name[12] # IDENTIFIER_LEN, 96bits cdef pj_mutex_t *lock = self._lock cdef pjmedia_zrtp_info *zrtp_info cdef pjmedia_transport_info info cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self.state in ["NULL", "WAIT_STUN", "INVALID"]: return None self._get_info(&info) zrtp_info = pjmedia_transport_info_get_spc_info(&info, PJMEDIA_TRANSPORT_TYPE_ZRTP) if zrtp_info == NULL or not bool(zrtp_info.active): return None with nogil: status = pjmedia_transport_zrtp_getPeerZid(self._obj, name) if status <= 0: return None else: return _pj_buf_len_to_str(name, 12) finally: with nogil: pj_mutex_unlock(lock) def update_local_sdp(self, SDPSession local_sdp, BaseSDPSession remote_sdp=None, int sdp_index=0): cdef int status cdef pj_pool_t *pool cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_transport *transport cdef SDPMediaStream local_media pool = self._pool transport = self._obj pj_local_sdp = local_sdp.get_sdp_session() if remote_sdp is not None: pj_remote_sdp = remote_sdp.get_sdp_session() else: pj_remote_sdp = NULL if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if sdp_index >= pj_local_sdp.media_count: raise ValueError("sdp_index argument out of range") # Remove ICE and SRTP/ZRTP related attributes from SDP, they will be added by pjmedia_transport_encode_sdp local_media = local_sdp.media[sdp_index] local_media.attributes = [ attr for attr in local_media.attributes if attr.name not in ('crypto', 'zrtp-hash', 'ice-ufrag', 'ice-pwd', 'ice-mismatch', 'candidate', 'remote-candidates')] pj_local_sdp = local_sdp.get_sdp_session() with nogil: status = pjmedia_transport_encode_sdp(transport, pool, pj_local_sdp, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not update SDP for media transport", status) local_sdp._update() return 0 cdef class MediaCheckTimer(Timer): def __init__(self, media_check_interval): self.media_check_interval = media_check_interval cdef class SDPInfo: def __init__(self, BaseSDPMediaStream local_media=None, BaseSDPSession local_sdp=None, BaseSDPSession remote_sdp=None, int index=0): self.local_media = local_media self.local_sdp = local_sdp self.remote_sdp = remote_sdp self.index = index property local_media: def __get__(self): return self._local_media def __set__(self, local_media): if local_media is not None: local_media = SDPMediaStream.new(local_media) self._local_media = local_media property local_sdp: def __get__(self): return self._local_sdp def __set__(self, local_sdp): if local_sdp is not None: local_sdp = SDPSession.new(local_sdp) self._local_sdp = local_sdp property remote_sdp: def __get__(self): return self._remote_sdp def __set__(self, remote_sdp): if remote_sdp is not None: remote_sdp = SDPSession.new(remote_sdp) self._remote_sdp = remote_sdp cdef class AudioTransport: def __cinit__(self, *args, **kwargs): cdef int status cdef pj_pool_t *pool cdef bytes pool_name cdef char* c_pool_name cdef PJSIPUA ua ua = _get_ua() pool_name = b"AudioTransport_%d" % id(self) self.weakref = weakref.ref(self) Py_INCREF(self.weakref) status = pj_mutex_create_recursive(ua._pjsip_endpoint._pool, "audio_transport_lock", &self._lock) if status != 0: raise PJSIPError("failed to create lock", status) pool = ua.create_memory_pool(pool_name, 4096, 4096) self._pool = pool self._slot = -1 self._timer = None self._volume = 100 def __init__(self, AudioMixer mixer, RTPTransport transport, BaseSDPSession remote_sdp=None, int sdp_index=0, enable_silence_detection=False, list codecs=None): cdef int status cdef pj_pool_t *pool cdef pjmedia_endpt *media_endpoint cdef pjmedia_sdp_media *local_media_c cdef pjmedia_sdp_session *local_sdp_c cdef pj_sockaddr *addr cdef pjmedia_transport_info info cdef list global_codecs cdef SDPMediaStream local_media cdef SDPSession local_sdp cdef PJSIPUA ua ua = _get_ua() media_endpoint = ua._pjmedia_endpoint._obj pool = self._pool if self.transport is not None: raise SIPCoreError("AudioTransport.__init__() was already called") if mixer is None: raise ValueError("mixer argument may not be None") if transport is None: raise ValueError("transport argument cannot be None") if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if transport.state != "INIT": raise SIPCoreError('RTPTransport object provided is not in the "INIT" state, but in the "%s" state' % transport.state) self._vad = int(bool(enable_silence_detection)) self.mixer = mixer self.transport = transport transport._get_info(&info) global_codecs = ua._pjmedia_endpoint._get_current_codecs() if codecs is None: codecs = global_codecs try: ua._pjmedia_endpoint._set_codecs(codecs) addr = &info.sock_info.rtp_addr_name with nogil: status = pjmedia_endpt_create_base_sdp(media_endpoint, pool, NULL, addr, &local_sdp_c) if status != 0: raise PJSIPError("Could not generate base SDP", status) with nogil: status = pjmedia_endpt_create_audio_sdp(media_endpoint, pool, &info.sock_info, 0, &local_media_c) if status != 0: raise PJSIPError("Could not generate SDP audio stream", status) # Create a 'fake' SDP, which only contains the audio stream, then the m line is extracted because the full # SDP is built by the Session local_sdp_c.media_count = 1 local_sdp_c.media[0] = local_media_c finally: ua._pjmedia_endpoint._set_codecs(global_codecs) local_sdp = SDPSession_create(local_sdp_c) local_media = local_sdp.media[0] if remote_sdp is None: self._is_offer = 1 self.transport.set_LOCAL(local_sdp, 0) else: self._is_offer = 0 if sdp_index != 0: local_sdp.media = [None] * (sdp_index+1) local_sdp.media[sdp_index] = local_media self.transport.set_REMOTE(local_sdp, remote_sdp, sdp_index) self._sdp_info = SDPInfo(local_media, local_sdp, remote_sdp, sdp_index) def __dealloc__(self): cdef PJSIPUA ua cdef Timer timer try: ua = _get_ua() except: return if self._obj != NULL: self.stop() ua.release_memory_pool(self._pool) self._pool = NULL if self._lock != NULL: pj_mutex_destroy(self._lock) timer = Timer() try: timer.schedule(60, deallocate_weakref, self.weakref) except SIPCoreError: pass cdef PJSIPUA _check_ua(self): cdef PJSIPUA ua try: ua = _get_ua() return ua except: self._obj = NULL self._pool = NULL return None property is_active: def __get__(self): self._check_ua() return bool(self._obj != NULL) property is_started: def __get__(self): return bool(self._is_started) property codec: def __get__(self): self._check_ua() if self._obj == NULL: return None else: return _pj_str_to_bytes(self._stream_info.fmt.encoding_name) property sample_rate: def __get__(self): self._check_ua() if self._obj == NULL: return None else: return self._stream_info.fmt.clock_rate property enable_silence_detection: def __get__(self): return bool(self._vad) property statistics: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_rtcp_stat stat cdef pjmedia_stream *stream cdef dict statistics = dict() cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream == NULL: return None with nogil: status = pjmedia_stream_get_stat(stream, &stat) if status != 0: raise PJSIPError("Could not get RTP statistics", status) statistics["rtt"] = _pj_math_stat_to_dict(&stat.rtt) statistics["rx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.rx) statistics["tx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.tx) return statistics finally: with nogil: pj_mutex_unlock(lock) property volume: def __get__(self): return self._volume def __set__(self, value): cdef int slot cdef int status cdef int volume cdef pj_mutex_t *lock = self._lock cdef pjmedia_conf *conf_bridge cdef PJSIPUA ua ua = self._check_ua() if ua is not None: with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: conf_bridge = self.mixer._obj slot = self._slot if value < 0: raise ValueError("volume attribute cannot be negative") if ua is not None and self._obj != NULL: volume = int(value * 1.28 - 128) with nogil: status = pjmedia_conf_adjust_rx_level(conf_bridge, slot, volume) if status != 0: raise PJSIPError("Could not set volume of audio transport", status) self._volume = value finally: if ua is not None: with nogil: pj_mutex_unlock(lock) property slot: def __get__(self): self._check_ua() if self._slot == -1: return None else: return self._slot def get_local_media(self, BaseSDPSession remote_sdp=None, int index=0, direction="sendrecv"): global valid_sdp_directions cdef int status cdef pj_mutex_t *lock = self._lock cdef object direction_attr cdef SDPAttribute attr cdef SDPSession local_sdp cdef SDPMediaStream local_media cdef pjmedia_sdp_media *c_local_media _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: is_offer = remote_sdp is None if is_offer and direction not in valid_sdp_directions: raise SIPCoreError("Unknown direction: %s" % direction) self._sdp_info.index = index local_sdp = self._sdp_info.local_sdp local_media = self._sdp_info.local_media local_sdp.media = [None] * (index+1) local_sdp.media[index] = local_media self.transport.update_local_sdp(local_sdp, remote_sdp, index) # updating the local SDP might have modified the connection line if local_sdp.connection is not None and local_media.connection is None: local_media.connection = SDPConnection.new(local_sdp.connection) local_media.attributes = [ attr for attr in local_media.attributes if attr.name not in valid_sdp_directions] if is_offer: direction_attr = direction else: if self.direction is None or "recv" in self.direction.decode(): direction_attr = b"sendrecv" else: direction_attr = b"sendonly" local_media.attributes.append(SDPAttribute(direction_attr, b"")) for attribute in local_media.attributes: if attribute.name == b'rtcp': attribute.value = (attribute.value.decode().split(' ', 1)[0]).encode() self._sdp_info.local_media = local_media return local_media finally: with nogil: pj_mutex_unlock(lock) def start(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index, int timeout=30): cdef int status cdef object desired_state cdef pj_mutex_t *lock = self._lock cdef pj_pool_t *pool cdef pjmedia_endpt *media_endpoint cdef pjmedia_port *media_port cdef pjmedia_sdp_media *local_media cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_stream **stream_address cdef pjmedia_stream_info *stream_info_address cdef pjmedia_transport *transport cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: pool = self._pool media_endpoint = ua._pjmedia_endpoint._obj stream_address = &self._obj stream_info_address = &self._stream_info transport = self.transport._obj if self._is_started: raise SIPCoreError("This AudioTransport was already started once") desired_state = ("LOCAL" if self._is_offer else "REMOTE") if self.transport.state != desired_state: raise SIPCoreError('RTPTransport object provided is not in the "%s" state, but in the "%s" state' % (desired_state, self.transport.state)) if None in [local_sdp, remote_sdp]: raise ValueError("SDP arguments cannot be None") pj_local_sdp = local_sdp.get_sdp_session() pj_remote_sdp = remote_sdp.get_sdp_session() if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if local_sdp.media[sdp_index].port == 0 or remote_sdp.media[sdp_index].port == 0: raise SIPCoreError("Cannot start a rejected audio stream") if timeout < 0: raise ValueError("timeout value cannot be negative") self.transport.set_ESTABLISHED(local_sdp, remote_sdp, sdp_index) with nogil: status = pjmedia_stream_info_from_sdp(stream_info_address, pool, media_endpoint, pj_local_sdp, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not parse SDP for audio session", status) if self._stream_info.param == NULL: raise SIPCoreError("Could not parse SDP for audio session") self._stream_info.param.setting.vad = self._vad self._stream_info.use_ka = 1 with nogil: status = pjmedia_stream_create(media_endpoint, pool, stream_info_address, transport, NULL, stream_address) if status != 0: raise PJSIPError("Could not initialize RTP for audio session", status) with nogil: status = pjmedia_stream_set_dtmf_callback(stream_address[0], _AudioTransport_cb_dtmf, self.weakref) if status != 0: with nogil: pjmedia_stream_destroy(stream_address[0]) self._obj = NULL raise PJSIPError("Could not set DTMF callback for audio session", status) with nogil: status = pjmedia_stream_start(stream_address[0]) if status != 0: with nogil: pjmedia_stream_destroy(stream_address[0]) self._obj = NULL raise PJSIPError("Could not start RTP for audio session", status) with nogil: status = pjmedia_stream_get_port(stream_address[0], &media_port) if status != 0: with nogil: pjmedia_stream_destroy(stream_address[0]) self._obj = NULL raise PJSIPError("Could not get audio port for audio session", status) try: self._slot = self.mixer._add_port(ua, pool, media_port) if self._volume != 100: self.volume = self._volume except: with nogil: pjmedia_stream_destroy(stream_address[0]) self._obj = NULL raise self.update_direction(local_sdp.media[sdp_index].direction) self._sdp_info.local_media = local_sdp.media[sdp_index] self._sdp_info.local_sdp = local_sdp self._sdp_info.remote_sdp = remote_sdp self._sdp_info.index = sdp_index self._is_started = 1 if timeout > 0: self._timer = MediaCheckTimer(timeout) self._timer.schedule(timeout, self._cb_check_rtp, self) self.mixer.reset_ec() finally: with nogil: pj_mutex_unlock(lock) def stop(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_stream *stream cdef PJSIPUA ua ua = self._check_ua() if ua is not None: with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._timer is not None: self._timer.cancel() self._timer = None if self._obj == NULL: return self._obj = NULL self.mixer._remove_port(ua, self._slot) with nogil: pjmedia_stream_destroy(stream) self.transport.set_INIT() finally: if ua is not None: with nogil: pj_mutex_unlock(lock) def update_direction(self, direction): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._obj == NULL: raise SIPCoreError("Stream is not active") if direction not in valid_sdp_directions: raise SIPCoreError("Unknown direction: %s" % direction) if direction != self.direction: self.mixer.reset_ec() self.direction = direction finally: with nogil: pj_mutex_unlock(lock) def update_sdp(self, local_sdp, remote_sdp, index): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._obj == NULL: raise SIPCoreError("Stream is not active") self._sdp_info.local_media = local_sdp.media[index] self._sdp_info.local_sdp = local_sdp self._sdp_info.remote_sdp = remote_sdp self._sdp_info.index = index finally: with nogil: pj_mutex_unlock(lock) def send_dtmf(self, digit): cdef int status cdef pj_mutex_t *lock = self._lock cdef pj_str_t digit_pj cdef pjmedia_stream *stream cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._obj == NULL: raise SIPCoreError("Stream is not active") if len(digit) != 1 or digit not in "0123456789*#ABCD": raise SIPCoreError("Not a valid DTMF digit: %s" % digit) - _str_to_pj_str(digit, &digit_pj) + _str_to_pj_str(digit.encode(), &digit_pj) if not self._stream_info.tx_event_pt < 0: # If the remote doesn't support telephone-event just don't send DTMF with nogil: status = pjmedia_stream_dial_dtmf(stream, &digit_pj) if status != 0: raise PJSIPError("Could not send DTMF digit on audio stream", status) finally: with nogil: pj_mutex_unlock(lock) cdef int _cb_check_rtp(self, MediaCheckTimer timer) except -1: cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_rtcp_stat stat cdef pjmedia_stream *stream with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream == NULL: return 0 if self._timer is None: return 0 self._timer = None with nogil: status = pjmedia_stream_get_stat(stream, &stat) if status == 0: if self._packets_received == stat.rx.pkt and self.direction == "sendrecv": _add_event("RTPAudioTransportDidTimeout", dict(obj=self)) self._packets_received = stat.rx.pkt if timer.media_check_interval > 0: self._timer = MediaCheckTimer(timer.media_check_interval) self._timer.schedule(timer.media_check_interval, self._cb_check_rtp, self) finally: with nogil: pj_mutex_unlock(lock) cdef class VideoTransport: def __cinit__(self, *args, **kwargs): cdef int status cdef pj_pool_t *pool cdef bytes pool_name cdef PJSIPUA ua ua = _get_ua() endpoint = ua._pjsip_endpoint._obj pool_name = b"VideoTransport_%d" % id(self) self.weakref = weakref.ref(self) Py_INCREF(self.weakref) pool = ua.create_memory_pool(pool_name, 4096, 4096) self._pool = pool status = pj_mutex_create_recursive(pool, "video_transport_lock", &self._lock) if status != 0: raise PJSIPError("failed to create lock", status) self._timer = None def __init__(self, RTPTransport transport, BaseSDPSession remote_sdp=None, int sdp_index=0, list codecs=None): cdef int status cdef pj_pool_t *pool cdef pjmedia_endpt *media_endpoint cdef pjmedia_sdp_media *local_media_c cdef pjmedia_sdp_session *local_sdp_c cdef pjmedia_transport_info info cdef pj_sockaddr *addr cdef list global_codecs cdef SDPMediaStream local_media cdef SDPSession local_sdp cdef PJSIPUA ua ua = _get_ua() media_endpoint = ua._pjmedia_endpoint._obj pool = self._pool if self.transport is not None: raise SIPCoreError("VideoTransport.__init__() was already called") if transport is None: raise ValueError("transport argument cannot be None") if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if transport.state != "INIT": raise SIPCoreError('RTPTransport object provided is not in the "INIT" state, but in the "%s" state' % transport.state) self.transport = transport transport._get_info(&info) global_codecs = ua._pjmedia_endpoint._get_current_video_codecs() if codecs is None: codecs = global_codecs try: ua._pjmedia_endpoint._set_video_codecs(codecs) addr = &(info.sock_info.rtp_addr_name) with nogil: status = pjmedia_endpt_create_base_sdp(media_endpoint, pool, NULL, addr, &local_sdp_c) if status != 0: raise PJSIPError("Could not generate base SDP", status) with nogil: status = pjmedia_endpt_create_video_sdp(media_endpoint, pool, &info.sock_info, 0, &local_media_c) if status != 0: raise PJSIPError("Could not generate SDP video stream", status) # Create a 'fake' SDP, which only contains the video stream, then the m line is extracted because the full # SDP is built by the Session local_sdp_c.media_count = 1 local_sdp_c.media[0] = local_media_c finally: ua._pjmedia_endpoint._set_video_codecs(global_codecs) local_sdp = SDPSession_create(local_sdp_c) local_media = local_sdp.media[0] if remote_sdp is None: self._is_offer = 1 self.transport.set_LOCAL(local_sdp, 0) else: self._is_offer = 0 if sdp_index != 0: local_sdp.media = [None] * (sdp_index+1) local_sdp.media[sdp_index] = local_media self.transport.set_REMOTE(local_sdp, remote_sdp, sdp_index) self._sdp_info = SDPInfo(local_media, local_sdp, remote_sdp, sdp_index) self.local_video = None self.remote_video = None def __dealloc__(self): cdef PJSIPUA ua cdef Timer timer try: ua = _get_ua() except SIPCoreError: return if self._obj != NULL: self.stop() if self._lock != NULL: pj_mutex_destroy(self._lock) ua.release_memory_pool(self._pool) self._pool = NULL timer = Timer() try: timer.schedule(60, deallocate_weakref, self.weakref) except SIPCoreError: pass cdef PJSIPUA _check_ua(self): cdef PJSIPUA ua try: ua = _get_ua() return ua except: self._obj = NULL self._pool = NULL return None property is_active: def __get__(self): self._check_ua() return bool(self._obj != NULL) property is_started: def __get__(self): return bool(self._is_started) property codec: def __get__(self): self._check_ua() if self._obj == NULL: return None else: return _pj_str_to_bytes(self._stream_info.codec_info.encoding_name) property sample_rate: def __get__(self): self._check_ua() if self._obj == NULL: return None else: return self._stream_info.codec_info.clock_rate property statistics: def __get__(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_rtcp_stat stat cdef pjmedia_vid_stream *stream cdef dict statistics = dict() cdef PJSIPUA ua ua = self._check_ua() if ua is None: return None with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream == NULL: return None with nogil: status = pjmedia_vid_stream_get_stat(stream, &stat) if status != 0: raise PJSIPError("Could not get RTP statistics", status) statistics["rtt"] = _pj_math_stat_to_dict(&stat.rtt) statistics["rx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.rx) statistics["tx"] = _pjmedia_rtcp_stream_stat_to_dict(&stat.tx) return statistics finally: with nogil: pj_mutex_unlock(lock) def get_local_media(self, BaseSDPSession remote_sdp=None, int index=0, direction="sendrecv"): global valid_sdp_directions cdef int status cdef pj_mutex_t *lock = self._lock cdef object direction_attr cdef SDPAttribute attr cdef SDPSession local_sdp cdef SDPMediaStream local_media cdef pjmedia_sdp_media *c_local_media _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: is_offer = remote_sdp is None if is_offer and direction not in valid_sdp_directions: raise SIPCoreError("Unknown direction: %s" % direction) self._sdp_info.index = index local_sdp = self._sdp_info.local_sdp local_media = self._sdp_info.local_media local_sdp.media = [None] * (index+1) local_sdp.media[index] = local_media self.transport.update_local_sdp(local_sdp, remote_sdp, index) # updating the local SDP might have modified the connection line if local_sdp.connection is not None and local_media.connection is None: local_media.connection = SDPConnection.new(local_sdp.connection) local_media.attributes = [ attr for attr in local_media.attributes if attr.name not in valid_sdp_directions] if is_offer: direction_attr = direction else: if self.direction is None or "recv" in self.direction.decode(): direction_attr = b"sendrecv" else: direction_attr = b"sendonly" local_media.attributes.append(SDPAttribute(direction_attr, b"")) for attribute in local_media.attributes: if attribute.name == 'rtcp': attribute.value = (attribute.value.decode().split(' ', 1)[0]).encode() return local_media finally: with nogil: pj_mutex_unlock(lock) def start(self, BaseSDPSession local_sdp, BaseSDPSession remote_sdp, int sdp_index, int timeout=30): cdef int status cdef object desired_state cdef pj_mutex_t *lock = self._lock cdef pj_pool_t *pool cdef pjmedia_endpt *media_endpoint cdef pjmedia_sdp_media *local_media cdef pjmedia_sdp_session *pj_local_sdp cdef pjmedia_sdp_session *pj_remote_sdp cdef pjmedia_vid_stream *stream cdef pjmedia_vid_stream_info *stream_info cdef pjmedia_transport *transport cdef PJSIPUA ua ua = _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: pool = self._pool media_endpoint = ua._pjmedia_endpoint._obj stream_info = &self._stream_info transport = self.transport._obj if self._is_started: raise SIPCoreError("This VideoTransport was already started once") desired_state = ("LOCAL" if self._is_offer else "REMOTE") if self.transport.state != desired_state: raise SIPCoreError('RTPTransport object provided is not in the "%s" state, but in the "%s" state' % (desired_state, self.transport.state)) if None in (local_sdp, remote_sdp): raise ValueError("SDP arguments cannot be None") pj_local_sdp = local_sdp.get_sdp_session() pj_remote_sdp = remote_sdp.get_sdp_session() if sdp_index < 0: raise ValueError("sdp_index argument cannot be negative") if local_sdp.media[sdp_index].port == 0 or remote_sdp.media[sdp_index].port == 0: raise SIPCoreError("Cannot start a rejected video stream") if timeout < 0: raise ValueError("timeout value cannot be negative") self.transport.set_ESTABLISHED(local_sdp, remote_sdp, sdp_index) with nogil: status = pjmedia_vid_stream_info_from_sdp(stream_info, pool, media_endpoint, pj_local_sdp, pj_remote_sdp, sdp_index) if status != 0: raise PJSIPError("Could not parse SDP for video session", status) if self._stream_info.codec_param == NULL: raise SIPCoreError("Could not parse SDP for video session") self._stream_info.use_ka = 1 with nogil: status = pjmedia_vid_stream_create(media_endpoint, pool, stream_info, transport, NULL, &stream) if status != 0: raise PJSIPError("Could not initialize RTP for video session", status) self._obj = stream with nogil: status = pjmedia_vid_stream_start(stream) if status != 0: with nogil: pjmedia_vid_stream_destroy(stream) self._obj = NULL raise PJSIPError("Could not start RTP for video session", status) with nogil: pjmedia_vid_stream_send_rtcp_sdes(stream) try: local_video = LocalVideoStream_create(stream) remote_video = RemoteVideoStream_create(stream, self._remote_video_event_handler) except PJSIPError: with nogil: pjmedia_vid_stream_destroy(stream) self._obj = NULL self.local_video = None self.remote_video = None raise self.local_video = local_video self.remote_video = remote_video self.update_direction(local_sdp.media[sdp_index].direction) self._sdp_info.local_media = local_sdp.media[sdp_index] self._sdp_info.local_sdp = local_sdp self._sdp_info.remote_sdp = remote_sdp self._sdp_info.index = sdp_index self._is_started = 1 if timeout > 0: self._timer = MediaCheckTimer(timeout) self._timer.schedule(timeout, self._cb_check_rtp, self) finally: with nogil: pj_mutex_unlock(lock) def stop(self): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream cdef PJSIPUA ua ua = self._check_ua() if ua is not None: with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._timer is not None: self._timer.cancel() self._timer = None if self._obj == NULL: return self._obj = NULL if self.local_video is not None: self.local_video.close() self.local_video = None if self.remote_video is not None: self.remote_video.close() self.remote_video = None with nogil: pjmedia_vid_stream_send_rtcp_bye(stream) pjmedia_vid_stream_destroy(stream) self.transport.set_INIT() finally: if ua is not None: with nogil: pj_mutex_unlock(lock) def update_direction(self, direction): global valid_sdp_directions cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._obj == NULL: raise SIPCoreError("Stream is not active") if direction not in valid_sdp_directions: raise SIPCoreError("Unknown direction: %s" % direction) self.direction = direction finally: with nogil: pj_mutex_unlock(lock) def update_sdp(self, local_sdp, remote_sdp, index): cdef int status cdef pj_mutex_t *lock = self._lock _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: if self._obj == NULL: raise SIPCoreError("Stream is not active") self._sdp_info.local_media = local_sdp.media[index] self._sdp_info.local_sdp = local_sdp self._sdp_info.remote_sdp = remote_sdp self._sdp_info.index = index finally: with nogil: pj_mutex_unlock(lock) def pause(self, direction="both"): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream cdef pjmedia_dir pj_dir _get_ua() if direction not in ("incoming", "outgoing", "both"): raise ValueError("direction can only be one of 'incoming', 'outgoing' or 'both'") if direction == "incoming": pj_dir = PJMEDIA_DIR_RENDER elif direction == "outgoing": pj_dir = PJMEDIA_DIR_CAPTURE else: pj_dir = PJMEDIA_DIR_CAPTURE_RENDER with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._obj == NULL: raise SIPCoreError("Stream is not active") with nogil: status = pjmedia_vid_stream_pause(stream, pj_dir) if status != 0: raise PJSIPError("failed to pause video stream", status) finally: with nogil: pj_mutex_unlock(lock) def resume(self, direction="both"): cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream cdef pjmedia_dir pj_dir _get_ua() if direction not in ("incoming", "outgoing", "both"): raise ValueError("direction can only be one of 'incoming', 'outgoing' or 'both'") if direction == "incoming": pj_dir = PJMEDIA_DIR_RENDER elif direction == "outgoing": pj_dir = PJMEDIA_DIR_CAPTURE else: pj_dir = PJMEDIA_DIR_CAPTURE_RENDER with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if self._obj == NULL: raise SIPCoreError("Stream is not active") with nogil: status = pjmedia_vid_stream_resume(stream, pj_dir) if status != 0: raise PJSIPError("failed to resume video stream", status) finally: with nogil: pj_mutex_unlock(lock) def send_keyframe(self): cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream != NULL: # Do not check for errors, it's OK if we can't send it pjmedia_vid_stream_send_keyframe(stream) finally: with nogil: pj_mutex_unlock(lock) def request_keyframe(self): cdef pj_mutex_t *lock = self._lock cdef pjmedia_vid_stream *stream _get_ua() with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream != NULL: # Do not check for errors, it's OK if we can't send it pjmedia_vid_stream_send_rtcp_pli(stream) finally: with nogil: pj_mutex_unlock(lock) cdef int _cb_check_rtp(self, MediaCheckTimer timer) except -1: cdef int status cdef pj_mutex_t *lock = self._lock cdef pjmedia_rtcp_stat stat cdef pjmedia_vid_stream *stream with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: stream = self._obj if stream == NULL: return 0 if self._timer is None: return 0 self._timer = None with nogil: status = pjmedia_vid_stream_get_stat(stream, &stat) if status == 0: if self._packets_received == stat.rx.pkt and self.direction == "sendrecv": _add_event("RTPVideoTransportDidTimeout", dict(obj=self)) self._packets_received = stat.rx.pkt if timer.media_check_interval > 0: self._timer = MediaCheckTimer(timer.media_check_interval) self._timer.schedule(timer.media_check_interval, self._cb_check_rtp, self) finally: with nogil: pj_mutex_unlock(lock) def _remote_video_event_handler(self, str name, object data): if name == "FORMAT_CHANGED": size, framerate = data _add_event("RTPVideoTransportRemoteFormatDidChange", dict(obj=self, size=size, framerate=framerate)) elif name == "RECEIVED_KEYFRAME": _add_event("RTPVideoTransportReceivedKeyFrame", dict(obj=self)) elif name == "MISSED_KEYFRAME": _add_event("RTPVideoTransportMissedKeyFrame", dict(obj=self)) elif name == "REQUESTED_KEYFRAME": _add_event("RTPVideoTransportRequestedKeyFrame", dict(obj=self)) cdef class ICECandidate: def __init__(self, component, cand_type, address, port, priority, rel_addr=''): self.component = component self.type = cand_type self.address = address self.port = port self.priority = priority self.rel_address = rel_addr def __str__(self): return '(%s) %s:%d%s priority=%d type=%s' % (self.component, self.address, self.port, ' rel_addr=%s' % self.rel_address if self.rel_address else '', self.priority, self.type.lower()) cdef ICECandidate ICECandidate_create(pj_ice_sess_cand *cand): cdef char buf[PJ_INET6_ADDRSTRLEN] cdef str address cdef str cand_type cdef int port if cand.type == PJ_ICE_CAND_TYPE_HOST: cand_type = 'HOST' elif cand.type == PJ_ICE_CAND_TYPE_SRFLX: cand_type = 'SRFLX' elif cand.type == PJ_ICE_CAND_TYPE_PRFLX: cand_type = 'PRFLX' elif cand.type == PJ_ICE_CAND_TYPE_RELAYED: cand_type = 'RELAY' else: cand_type = 'UNKNOWN' pj_sockaddr_print(&cand.addr, buf, PJ_INET6_ADDRSTRLEN, 0) address = _buf_to_str(buf) port = pj_sockaddr_get_port(&cand.addr) if pj_sockaddr_has_addr(&cand.rel_addr): pj_sockaddr_print(&cand.rel_addr, buf, PJ_INET6_ADDRSTRLEN, 0) rel_addr = _buf_to_str(buf) else: rel_addr = '' return ICECandidate('RTP' if cand.comp_id==1 else 'RTCP', cand_type, address, port, cand.prio, rel_addr) cdef class ICECheck: def __init__(self, local_candidate, remote_candidate, state, nominated): self.local_candidate = local_candidate self.remote_candidate = remote_candidate self.state = state self.nominated = nominated def __str__(self): return '%s:%d -> %s:%d (%s, %s)' % (self.local_candidate.address, self.local_candidate.port, self.remote_candidate.address, self.remote_candidate.port, self.state.lower(), 'nominated' if self.nominated else 'not nominated') cdef ICECheck ICECheck_create(pj_ice_sess_check *check): cdef str state cdef ICECandidate lcand cdef ICECandidate rcand if check.state == PJ_ICE_SESS_CHECK_STATE_FROZEN: state = 'FROZEN' elif check.state == PJ_ICE_SESS_CHECK_STATE_WAITING: state = 'WAITING' elif check.state == PJ_ICE_SESS_CHECK_STATE_IN_PROGRESS: state = 'IN_PROGRESS' elif check.state == PJ_ICE_SESS_CHECK_STATE_SUCCEEDED: state = 'SUCCEEDED' elif check.state == PJ_ICE_SESS_CHECK_STATE_FAILED: state = 'FAILED' else: state = 'UNKNOWN' lcand = ICECandidate_create(check.lcand) rcand = ICECandidate_create(check.rcand) return ICECheck(lcand, rcand, state, bool(check.nominated)) cdef ICECheck _get_rtp_valid_pair(pj_ice_strans *ice_st): cdef pj_ice_sess_check_ptr_const ice_check ice_check = pj_ice_strans_get_valid_pair(ice_st, 1) if ice_check == NULL: return None return ICECheck_create(ice_check) # helper functions cdef dict _pj_math_stat_to_dict(pj_math_stat *stat): cdef dict retval = dict() retval["count"] = stat.n retval["max"] = stat.max retval["min"] = stat.min retval["last"] = stat.last retval["avg"] = stat.mean return retval cdef dict _pjmedia_rtcp_stream_stat_to_dict(pjmedia_rtcp_stream_stat *stream_stat): cdef dict retval = dict() retval["packets"] = stream_stat.pkt retval["bytes"] = stream_stat.bytes retval["packets_discarded"] = stream_stat.discard retval["packets_lost"] = stream_stat.loss retval["packets_reordered"] = stream_stat.reorder retval["packets_duplicate"] = stream_stat.dup retval["loss_period"] = _pj_math_stat_to_dict(&stream_stat.loss_period) retval["burst_loss"] = bool(stream_stat.loss_type.burst) retval["random_loss"] = bool(stream_stat.loss_type.random) retval["jitter"] = _pj_math_stat_to_dict(&stream_stat.jitter) return retval cdef str _ice_state_to_str(int state): if state == PJ_ICE_STRANS_STATE_NULL: return 'NULL' elif state == PJ_ICE_STRANS_STATE_INIT: return 'GATHERING' elif state == PJ_ICE_STRANS_STATE_READY: return 'GATHERING_COMPLETE' elif state == PJ_ICE_STRANS_STATE_SESS_READY: return 'NEGOTIATION_START' elif state == PJ_ICE_STRANS_STATE_NEGO: return 'NEGOTIATING' elif state == PJ_ICE_STRANS_STATE_RUNNING: return 'RUNNING' elif state == PJ_ICE_STRANS_STATE_FAILED: return 'FAILED' else: return 'UNKNOWN' cdef dict _extract_ice_session_data(pj_ice_sess *ice_sess): cdef dict data = dict() cdef pj_ice_sess_cand *cand cdef pj_ice_sess_check *check # Process local candidates local_candidates = [] for i in range(ice_sess.lcand_cnt): cand = &ice_sess.lcand[i] local_candidates.append(ICECandidate_create(cand)) data['local_candidates'] = local_candidates # Process remote candidates remote_candidates = [] for i in range(ice_sess.rcand_cnt): cand = &ice_sess.rcand[i] remote_candidates.append(ICECandidate_create(cand)) data['remote_candidates'] = remote_candidates # Process valid pairs valid_pairs = [] for i in range(ice_sess.comp_cnt): check = ice_sess.comp[i].valid_check valid_pairs.append(ICECheck_create(check)) data['valid_pairs'] = valid_pairs # Process valid list valid_list = [] for i in range(ice_sess.valid_list.count): check = &ice_sess.valid_list.checks[i] valid_list.append(ICECheck_create(check)) data['valid_list'] = valid_list return data cdef object _extract_rtp_transport(pjmedia_transport *tp): cdef void *rtp_transport_ptr = NULL if tp != NULL: rtp_transport_ptr = tp.user_data if rtp_transport_ptr == NULL: return None return ( rtp_transport_ptr)() # callback functions cdef void _RTPTransport_cb_ice_complete(pjmedia_transport *tp, pj_ice_strans_op op, int status) with gil: # Despite the name this callback is not only called when ICE negotiation ends, it depends on the # op parameter cdef int lock_status cdef double duration cdef pj_ice_strans *ice_st cdef pj_ice_sess *ice_sess cdef pj_time_val tv, start_time cdef pj_mutex_t *lock cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return lock = rtp_transport._lock with nogil: lock_status = pj_mutex_lock(lock) if lock_status != 0: raise PJSIPError("failed to acquire lock", status) try: if op == PJ_ICE_STRANS_OP_NEGOTIATION: if status == 0: ice_st = pjmedia_ice_get_strans(tp) if ice_st == NULL: return ice_sess = pj_ice_strans_get_session(ice_st) if ice_sess == NULL: return start_time = pj_ice_strans_get_start_time(ice_st) pj_gettimeofday(&tv) tv.sec -= start_time.sec tv.msec -= start_time.msec pj_time_val_normalize(&tv) duration = (tv.sec*1000 + tv.msec)/1000.0 data = _extract_ice_session_data(ice_sess) rtp_transport._rtp_valid_pair = _get_rtp_valid_pair(ice_st) _add_event("RTPTransportICENegotiationDidSucceed", dict(obj=rtp_transport, duration=duration, local_candidates=data['local_candidates'], remote_candidates=data['remote_candidates'], valid_pairs=data['valid_pairs'], valid_list=data['valid_list'])) else: rtp_transport._rtp_valid_pair = None _add_event("RTPTransportICENegotiationDidFail", dict(obj=rtp_transport, reason=_pj_status_to_str(status))) elif op == PJ_ICE_STRANS_OP_INIT: if status == 0: rtp_transport.state = "INIT" _add_event("RTPTransportDidInitialize", dict(obj=rtp_transport)) else: rtp_transport.state = "INVALID" _add_event("RTPTransportDidFail", dict(obj=rtp_transport, reason=_pj_status_to_str(status))) else: # silence compiler warning pass finally: with nogil: pj_mutex_unlock(lock) cdef void _RTPTransport_cb_ice_state(pjmedia_transport *tp, pj_ice_strans_state prev, pj_ice_strans_state curr) with gil: cdef int status cdef pj_mutex_t *lock cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return lock = rtp_transport._lock with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: _add_event("RTPTransportICENegotiationStateDidChange", dict(obj=rtp_transport, prev_state=_ice_state_to_str(prev), state=_ice_state_to_str(curr))) finally: with nogil: pj_mutex_unlock(lock) cdef void _RTPTransport_cb_ice_stop(pjmedia_transport *tp, char *reason, int err) with gil: cdef int status cdef pj_mutex_t *lock cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return lock = rtp_transport._lock with nogil: status = pj_mutex_lock(lock) if status != 0: raise PJSIPError("failed to acquire lock", status) try: rtp_transport._rtp_valid_pair = None _reason = reason if _reason != b"media stop requested": _add_event("RTPTransportICENegotiationDidFail", dict(obj=rtp_transport, reason=_reason)) finally: with nogil: pj_mutex_unlock(lock) cdef void _RTPTransport_cb_zrtp_secure_on(pjmedia_transport *tp, char* cipher) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return _add_event("RTPTransportZRTPSecureOn", dict(obj=rtp_transport, cipher=bytes(cipher))) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_secure_off(pjmedia_transport *tp) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return _add_event("RTPTransportZRTPSecureOff", dict(obj=rtp_transport)) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_show_sas(pjmedia_transport *tp, char* sas, int verified) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return _add_event("RTPTransportZRTPReceivedSAS", dict(obj=rtp_transport, sas=str(sas), verified=bool(verified))) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_confirm_goclear(pjmedia_transport *tp) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return # TODO: not yet implemented by PJSIP's ZRTP transport except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_show_message(pjmedia_transport *tp, int severity, int sub_code) with gil: global zrtp_message_levels, zrtp_error_messages cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return level = zrtp_message_levels.get(severity, 1) message = zrtp_error_messages[level].get(sub_code, 'Unknown') _add_event("RTPTransportZRTPLog", dict(obj=rtp_transport, level=level, message=message)) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_negotiation_failed(pjmedia_transport *tp, int severity, int sub_code) with gil: global zrtp_message_levels, zrtp_error_messages cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return level = zrtp_message_levels.get(severity, 1) reason = zrtp_error_messages[level].get(sub_code, 'Unknown') _add_event("RTPTransportZRTPNegotiationFailed", dict(obj=rtp_transport, reason=reason)) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_not_supported_by_other(pjmedia_transport *tp) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return _add_event("RTPTransportZRTPNotSupportedByRemote", dict(obj=rtp_transport)) except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_ask_enrollment(pjmedia_transport *tp, int info) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return # TODO: implement PBX enrollment except: ua._handle_exception(1) cdef void _RTPTransport_cb_zrtp_inform_enrollment(pjmedia_transport *tp, int info) with gil: cdef RTPTransport rtp_transport cdef PJSIPUA ua try: ua = _get_ua() except: return try: rtp_transport = _extract_rtp_transport(tp) if rtp_transport is None: return # TODO: implement PBX enrollment except: ua._handle_exception(1) cdef void _AudioTransport_cb_dtmf(pjmedia_stream *stream, void *user_data, int digit) with gil: cdef AudioTransport audio_stream = ( user_data)() cdef PJSIPUA ua try: ua = _get_ua() except: return if audio_stream is None: return try: _add_event("RTPAudioStreamGotDTMF", dict(obj=audio_stream, digit=chr(digit))) except: ua._handle_exception(1) # globals cdef pjmedia_ice_cb _ice_cb _ice_cb.on_ice_complete = _RTPTransport_cb_ice_complete _ice_cb.on_ice_state = _RTPTransport_cb_ice_state _ice_cb.on_ice_stop = _RTPTransport_cb_ice_stop valid_sdp_directions = (b"sendrecv", b"sendonly", b"recvonly", b"inactive") # ZRTP cdef pjmedia_zrtp_cb _zrtp_cb _zrtp_cb.secure_on = _RTPTransport_cb_zrtp_secure_on _zrtp_cb.secure_off = _RTPTransport_cb_zrtp_secure_off _zrtp_cb.show_sas = _RTPTransport_cb_zrtp_show_sas _zrtp_cb.confirm_go_clear = _RTPTransport_cb_zrtp_confirm_goclear _zrtp_cb.show_message = _RTPTransport_cb_zrtp_show_message _zrtp_cb.negotiation_failed = _RTPTransport_cb_zrtp_negotiation_failed _zrtp_cb.not_supported_by_other = _RTPTransport_cb_zrtp_not_supported_by_other _zrtp_cb.ask_enrollment = _RTPTransport_cb_zrtp_ask_enrollment _zrtp_cb.inform_enrollment = _RTPTransport_cb_zrtp_inform_enrollment _zrtp_cb.sign_sas = NULL _zrtp_cb.check_sas_signature = NULL # Keep these aligned with ZrtpCodes.h cdef dict zrtp_message_levels = {1: 'INFO', 2: 'WARNING', 3: 'SEVERE', 4: 'ERROR'} cdef dict zrtp_error_messages = { 'INFO': { 0: "Unknown", 1: "Hello received and prepared a Commit, ready to get peer's hello hash", #InfoHelloReceived 2: "Commit: Generated a public DH key", #InfoCommitDHGenerated 3: "Responder: Commit received, preparing DHPart1", #InfoRespCommitReceived 4: "DH1Part: Generated a public DH key", #InfoDH1DHGenerated 5: "Initiator: DHPart1 received, preparing DHPart2", #InfoInitDH1Received 6: "Responder: DHPart2 received, preparing Confirm1", #InfoRespDH2Received 7: "Initiator: Confirm1 received, preparing Confirm2", #InfoInitConf1Received 8: "Responder: Confirm2 received, preparing Conf2Ack", #InfoRespConf2Received 9: "At least one retained secrets matches - security OK", #InfoRSMatchFound 10: "Entered secure state", #InfoSecureStateOn 11: "No more security for this session", #InfoSecureStateOff }, 'WARNING': { 0: "Unknown", 1: "WarningDHAESmismatch = 1, //!< Commit contains an AES256 cipher but does not offer a Diffie-Helman 4096 - not used DH4096 was discarded", #WarningDHAESmismatch 2: "Received a GoClear message", #WarningGoClearReceived 3: "Hello offers an AES256 cipher but does not offer a Diffie-Helman 4096- not used DH4096 was discarded", #WarningDHShort 4: "No retained shared secrets available - must verify SAS", #WarningNoRSMatch 5: "Internal ZRTP packet checksum mismatch - packet dropped", #WarningCRCmismatch 6: "Dropping packet because SRTP authentication failed!", #WarningSRTPauthError 7: "Dropping packet because SRTP replay check failed!", #WarningSRTPreplayError 8: "Valid retained shared secrets availabe but no matches found - must verify SAS", #WarningNoExpectedRSMatch }, 'SEVERE': { 0: "Unknown", 1: "Hash HMAC check of Hello failed!", #SevereHelloHMACFailed 2: "Hash HMAC check of Commit failed!", #SevereCommitHMACFailed 3: "Hash HMAC check of DHPart1 failed!", #SevereDH1HMACFailed 4: "Hash HMAC check of DHPart2 failed!", #SevereDH2HMACFailed 5: "Cannot send data - connection or peer down?", #SevereCannotSend 6: "Internal protocol error occured!", #SevereProtocolError 7: "Cannot start a timer - internal resources exhausted?", #SevereNoTimer 8: "Too much retries during ZRTP negotiation - connection or peer down?", #SevereTooMuchRetries }, 'ERROR': { 0x00: "Unknown", 0x10: "Malformed packet (CRC OK, but wrong structure)", #MalformedPacket 0x20: "Critical software error", #CriticalSWError 0x30: "Unsupported ZRTP version", #UnsuppZRTPVersion 0x40: "Hello components mismatch", #HelloCompMismatch 0x51: "Hash type not supported", #UnsuppHashType 0x52: "Cipher type not supported", #UnsuppCiphertype 0x53: "Public key exchange not supported", #UnsuppPKExchange 0x54: "SRTP auth. tag not supported", #UnsuppSRTPAuthTag 0x55: "SAS scheme not supported", #UnsuppSASScheme 0x56: "No shared secret available, DH mode required", #NoSharedSecret 0x61: "DH Error: bad pvi or pvr ( == 1, 0, or p-1)", #DHErrorWrongPV 0x62: "DH Error: hvi != hashed data", #DHErrorWrongHVI 0x63: "Received relayed SAS from untrusted MiTM", #SASuntrustedMiTM 0x70: "Auth. Error: Bad Confirm pkt HMAC", #ConfirmHMACWrong 0x80: "Nonce reuse", #NonceReused 0x90: "Equal ZIDs in Hello", #EqualZIDHello 0x100: "GoClear packet received, but not allowed", #GoCleatNotAllowed 0x7fffffff: "Packet ignored", #IgnorePacket } } diff --git a/sipsimple/core/_core.request.pxi b/sipsimple/core/_core.request.pxi index 0455188f..5bd72428 100644 --- a/sipsimple/core/_core.request.pxi +++ b/sipsimple/core/_core.request.pxi @@ -1,503 +1,503 @@ from datetime import datetime, timedelta cdef class EndpointAddress: def __init__(self, ip, port): self.ip = ip self.port = port def __repr__(self): return "%s(%r, %r)" % (self.__class__.__name__, self.ip, self.port) def __str__(self): return "%s:%d" % (self.ip, self.port) cdef class Request: expire_warning_time = 30 # properties property method: def __get__(self): return self._method.str property call_id: def __get__(self): return self._call_id.str property content_type: def __get__(self): if self._content_type is None: return None else: return "/".join([self._content_type.str, self._content_subtype.str]) property body: def __get__(self): if self._body is None: return None else: return self._body.str property expires_in: def __get__(self): cdef object dt self._get_ua() if self.state != "EXPIRING" or self._expire_time is None: return 0 else: dt = self._expire_time - datetime.now() return max(0, dt.seconds) # public methods def __cinit__(self, *args, **kwargs): self.state = "INIT" self.peer_address = None pj_timer_entry_init(&self._timer, 0, self, _Request_cb_timer) self._timer_active = 0 def __init__(self, method, SIPURI request_uri not None, FromHeader from_header not None, ToHeader to_header not None, RouteHeader route_header not None, Credentials credentials=None, ContactHeader contact_header=None, call_id=None, cseq=None, object extra_headers=None, content_type=None, body=None): cdef pjsip_method method_pj cdef PJSTR from_header_str cdef PJSTR to_header_str cdef PJSTR request_uri_str cdef PJSTR contact_header_str cdef pj_str_t *contact_header_pj = NULL cdef pj_str_t *call_id_pj = NULL cdef object content_type_spl cdef pjsip_hdr *hdr cdef pjsip_contact_hdr *contact_hdr cdef pjsip_cid_hdr *cid_hdr cdef pjsip_cseq_hdr *cseq_hdr cdef int status cdef dict event_dict cdef PJSIPUA ua = _get_ua() if self._tsx != NULL or self.state != "INIT": raise SIPCoreError("Request.__init__() was already called") if cseq is not None and cseq < 0: raise ValueError("cseq argument cannot be negative") if extra_headers is not None: header_names = set([header.name for header in extra_headers]) if "Route" in header_names: raise ValueError("Route should be specified with route_header argument, not extra_headers") if "Content-Type" in header_names: raise ValueError("Content-Type should be specified with content_type argument, not extra_headers") else: header_names = () if content_type is not None and body is None: raise ValueError("Cannot specify a content_type without a body") if content_type is None and body is not None: raise ValueError("Cannot specify a body without a content_type") self._method = PJSTR(method) pjsip_method_init_np(&method_pj, &self._method.pj_str) if credentials is not None: self.credentials = FrozenCredentials.new(credentials) from_header_str = PJSTR(from_header.body) self.to_header = FrozenToHeader.new(to_header) to_header_str = PJSTR(to_header.body) self.request_uri = FrozenSIPURI.new(request_uri) request_uri_str = PJSTR(str(request_uri)) self.route_header = FrozenRouteHeader.new(route_header) self.route_header.uri.parameters.dict["lr"] = None # always send lr parameter in Route header - #self.route_header.uri.parameters.dict["hide"] = None # always hide Route header + self.route_header.uri.parameters.dict["hide"] = None # always hide Route header if contact_header is not None: self.contact_header = FrozenContactHeader.new(contact_header) contact_parameters = contact_header.parameters.copy() contact_parameters.pop("q", None) contact_parameters.pop("expires", None) contact_header.parameters = {} contact_header_str = PJSTR(contact_header.body) contact_header_pj = &contact_header_str.pj_str if call_id is not None: self._call_id = PJSTR(call_id) call_id_pj = &self._call_id.pj_str if cseq is None: self.cseq = -1 else: self.cseq = cseq if extra_headers is None: self.extra_headers = frozenlist() else: self.extra_headers = frozenlist([header.frozen_type.new(header) for header in extra_headers]) if body is not None: content_type_spl = content_type.split("/", 1) self._content_type = PJSTR(content_type_spl[0]) self._content_subtype = PJSTR(content_type_spl[1]) self._body = PJSTR(body) status = pjsip_endpt_create_request(ua._pjsip_endpoint._obj, &method_pj, &request_uri_str.pj_str, &from_header_str.pj_str, &to_header_str.pj_str, contact_header_pj, call_id_pj, self.cseq, NULL, &self._tdata) if status != 0: raise PJSIPError("Could not create request", status) if body is not None: self._tdata.msg.body = pjsip_msg_body_create(self._tdata.pool, &self._content_type.pj_str, &self._content_subtype.pj_str, &self._body.pj_str) status = _BaseRouteHeader_to_pjsip_route_hdr(self.route_header, &self._route_header, self._tdata.pool) pjsip_msg_add_hdr(self._tdata.msg, &self._route_header) hdr = ( &self._tdata.msg.hdr).next while hdr != &self._tdata.msg.hdr: hdr_name = _pj_str_to_str(hdr.name) if hdr_name in header_names: raise ValueError("Cannot override %s header value in extra_headers" % _pj_str_to_bytes(hdr.name)) if hdr.type == PJSIP_H_CONTACT: contact_hdr = hdr _dict_to_pjsip_param(contact_parameters, &contact_hdr.other_param, self._tdata.pool) elif hdr.type == PJSIP_H_CALL_ID: cid_hdr = hdr self._call_id = PJSTR(_pj_str_to_str(cid_hdr.id)) elif hdr.type == PJSIP_H_CSEQ: cseq_hdr = hdr self.cseq = cseq_hdr.cseq elif hdr.type == PJSIP_H_FROM: self.from_header = FrozenFromHeader_create( hdr) else: pass hdr = ( hdr).next _add_headers_to_tdata(self._tdata, self.extra_headers) #event_dict = dict(obj=self) #_pjsip_msg_to_dict(self._tdata.msg, event_dict) #print('Request dict %s' % event_dict) if self.credentials is not None: status = pjsip_auth_clt_init(&self._auth, ua._pjsip_endpoint._obj, self._tdata.pool, 0) if status != 0: raise PJSIPError("Could not init authentication credentials", status) status = pjsip_auth_clt_set_credentials(&self._auth, 1, self.credentials.get_cred_info()) if status != 0: raise PJSIPError("Could not set authentication credentials", status) self._need_auth = 1 else: self._need_auth = 0 status = pjsip_tsx_create_uac(&ua._module, self._tdata, &self._tsx) if status != 0: raise PJSIPError("Could not create transaction for request", status) self._tsx.mod_data[ua._module.id] = self def __dealloc__(self): cdef PJSIPUA ua = self._get_ua() if self._tsx != NULL: self._tsx.mod_data[ua._module.id] = NULL if self._tsx.state < PJSIP_TSX_STATE_COMPLETED: pjsip_tsx_terminate(self._tsx, 500) self._tsx = NULL if self._tdata != NULL: pjsip_tx_data_dec_ref(self._tdata) self._tdata = NULL if self._timer_active: pjsip_endpt_cancel_timer(ua._pjsip_endpoint._obj, &self._timer) self._timer_active = 0 def send(self, timeout=None): cdef pj_time_val timeout_pj cdef int status cdef PJSIPUA ua = self._get_ua() if self.state != "INIT": raise SIPCoreError('This method may only be called in the "INIT" state, current state is "%s"' % self.state) if timeout is not None: if timeout <= 0: raise ValueError("Timeout value cannot be negative") timeout_pj.sec = int(timeout) timeout_pj.msec = (timeout * 1000) % 1000 self._timeout = timeout status = pjsip_tsx_send_msg(self._tsx, self._tdata) if status != 0: raise PJSIPError("Could not send request", status) pjsip_tx_data_add_ref(self._tdata) if timeout: status = pjsip_endpt_schedule_timer(ua._pjsip_endpoint._obj, &self._timer, &timeout_pj) if status == 0: self._timer_active = 1 self.state = "IN_PROGRESS" def end(self): cdef PJSIPUA ua = self._get_ua() if self.state == "IN_PROGRESS": pjsip_tsx_terminate(self._tsx, 408) elif self.state == "EXPIRING": pjsip_endpt_cancel_timer(ua._pjsip_endpoint._obj, &self._timer) self._timer_active = 0 self.state = "TERMINATED" _add_event("SIPRequestDidEnd", dict(obj=self)) # private methods cdef PJSIPUA _get_ua(self): cdef PJSIPUA ua try: ua = _get_ua() except SIPCoreError: self._tsx = NULL self._tdata = NULL self._timer_active = 0 self.state = "TERMINATED" return None else: return ua cdef int _cb_tsx_state(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1: cdef pjsip_tx_data *tdata_auth cdef pjsip_transaction *tsx_auth cdef pjsip_cseq_hdr *cseq cdef dict event_dict cdef int expires = -1 cdef SIPURI contact_uri cdef dict contact_params cdef pj_time_val timeout_pj cdef int status if rdata != NULL: self.to_header = FrozenToHeader_create(rdata.msg_info.to_hdr) if self.peer_address is None: self.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) else: self.peer_address.ip = rdata.pkt_info.src_name self.peer_address.port = rdata.pkt_info.src_port if self._tsx.state == PJSIP_TSX_STATE_PROCEEDING: if rdata == NULL: return 0 event_dict = dict(obj=self) _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) _add_event("SIPRequestGotProvisionalResponse", event_dict) elif self._tsx.state == PJSIP_TSX_STATE_COMPLETED: if self._timer_active: pjsip_endpt_cancel_timer(ua._pjsip_endpoint._obj, &self._timer) self._timer_active = 0 if self._need_auth and self._tsx.status_code in [401, 407]: self._need_auth = 0 status = pjsip_auth_clt_reinit_req(&self._auth, rdata, self._tdata, &tdata_auth) if status != 0: _add_event("SIPRequestDidFail", dict(obj=self, code=0, reason="Could not add auth data to request %s" % _pj_status_to_str(status))) self.state = "TERMINATED" _add_event("SIPRequestDidEnd", dict(obj=self)) return 0 cseq = pjsip_msg_find_hdr(tdata_auth.msg, PJSIP_H_CSEQ, NULL) if cseq != NULL: cseq.cseq += 1 self.cseq = cseq.cseq status = pjsip_tsx_create_uac(&ua._module, tdata_auth, &tsx_auth) if status != 0: pjsip_tx_data_dec_ref(tdata_auth) _add_event("SIPRequestDidFail", dict(obj=self, code=0, reason="Could not create transaction for request with auth %s" % _pj_status_to_str(status))) self.state = "TERMINATED" _add_event("SIPRequestDidEnd", dict(obj=self)) return 0 self._tsx.mod_data[ua._module.id] = NULL self._tsx = tsx_auth self._tsx.mod_data[ua._module.id] = self status = pjsip_tsx_send_msg(self._tsx, tdata_auth) if status != 0: pjsip_tx_data_dec_ref(tdata_auth) _add_event("SIPRequestDidFail", dict(obj=self, code=0, reason="Could not send request with auth %s" % _pj_status_to_str(status))) self.state = "TERMINATED" _add_event("SIPRequestDidEnd", dict(obj=self)) return 0 elif self._timeout is not None: timeout_pj.sec = int(self._timeout) timeout_pj.msec = (self._timeout * 1000) % 1000 status = pjsip_endpt_schedule_timer(ua._pjsip_endpoint._obj, &self._timer, &timeout_pj) if status == 0: self._timer_active = 1 else: event_dict = dict(obj=self) if rdata != NULL: # This shouldn't happen, but safety fist! _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) if self._tsx.status_code / 100 == 2: if rdata != NULL: if "Expires" in event_dict["headers"]: expires = event_dict["headers"]["Expires"] elif self.contact_header is not None: for contact_header in event_dict["headers"].get("Contact", []): if contact_header.uri == self.contact_header.uri and contact_header.expires is not None: expires = contact_header.expires if expires == -1: expires = 0 for header in self.extra_headers: if header.name == "Expires": try: expires = int(header.body) except ValueError: pass break event_dict["expires"] = expires self._expire_time = datetime.now() + timedelta(seconds=expires) _add_event("SIPRequestDidSucceed", event_dict) else: expires = 0 _add_event("SIPRequestDidFail", event_dict) if expires == 0: self.state = "TERMINATED" _add_event("SIPRequestDidEnd", dict(obj=self)) else: timeout_pj.sec = max(1, expires - self.expire_warning_time, expires/2) timeout_pj.msec = 0 status = pjsip_endpt_schedule_timer(ua._pjsip_endpoint._obj, &self._timer, &timeout_pj) if status == 0: self._timer_active = 1 self.state = "EXPIRING" self._expire_rest = max(1, expires - timeout_pj.sec) else: self.state = "TERMINATED" _add_event("SIPRequestDidEnd", dict(obj=self)) elif self._tsx.state == PJSIP_TSX_STATE_TERMINATED: if self.state == "IN_PROGRESS": if self._timer_active: pjsip_endpt_cancel_timer(ua._pjsip_endpoint._obj, &self._timer) self._timer_active = 0 _add_event("SIPRequestDidFail", dict(obj=self, code=self._tsx.status_code, reason=_pj_str_to_bytes(self._tsx.status_text))) self.state = "TERMINATED" _add_event("SIPRequestDidEnd", dict(obj=self)) self._tsx.mod_data[ua._module.id] = NULL self._tsx = NULL else: pass cdef int _cb_timer(self, PJSIPUA ua) except -1: cdef pj_time_val expires cdef int status if self.state == "IN_PROGRESS": pjsip_tsx_terminate(self._tsx, 408) elif self.state == "EXPIRING": if self._expire_rest > 0: _add_event("SIPRequestWillExpire", dict(obj=self, expires=self._expire_rest)) expires.sec = self._expire_rest expires.msec = 0 self._expire_rest = 0 status = pjsip_endpt_schedule_timer(ua._pjsip_endpoint._obj, &self._timer, &expires) if status == 0: self._timer_active = 1 else: self.state = "TERMINATED" _add_event("SIPRequestDidEnd", dict(obj=self)) else: self.state = "TERMINATED" _add_event("SIPRequestDidEnd", dict(obj=self)) return 0 cdef class IncomingRequest: def __cinit__(self, *args, **kwargs): self.peer_address = None def __dealloc__(self): cdef PJSIPUA ua try: ua = _get_ua() except SIPCoreError: return if self._tsx != NULL: pjsip_tsx_terminate(self._tsx, 500) self._tsx = NULL if self._tdata != NULL: pjsip_tx_data_dec_ref(self._tdata) self._tdata = NULL def answer(self, int code, str reason=None, object extra_headers=None): cdef bytes reason_bytes cdef dict event_dict cdef int status cdef PJSIPUA ua = _get_ua() if self.state != "incoming": raise SIPCoreInvalidStateError('Can only answer an incoming request in the "incoming" state, ' 'object is currently in the "%s" state' % self.state) if code < 200 or code >= 700: raise ValueError("Invalid SIP final response code: %d" % code) self._tdata.msg.line.status.code = code if reason is None: self._tdata.msg.line.status.reason = pjsip_get_status_text(code)[0] else: pj_strdup2_with_null(self._tdata.pool, &self._tdata.msg.line.status.reason, reason.encode()) if extra_headers is not None: _add_headers_to_tdata(self._tdata, extra_headers) event_dict = dict(obj=self) _pjsip_msg_to_dict(self._tdata.msg, event_dict) status = pjsip_tsx_send_msg(self._tsx, self._tdata) if status != 0: raise PJSIPError("Could not send response", status) self.state = "answered" self._tdata = NULL self._tsx = NULL _add_event("SIPIncomingRequestSentResponse", event_dict) cdef int init(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1: cdef dict event_dict cdef int status status = pjsip_endpt_create_response(ua._pjsip_endpoint._obj, rdata, 500, NULL, &self._tdata) if status != 0: raise PJSIPError("Could not create response", status) status = pjsip_tsx_create_uas(&ua._module, rdata, &self._tsx) if status != 0: pjsip_tx_data_dec_ref(self._tdata) self._tdata = NULL raise PJSIPError("Could not create transaction for incoming request", status) pjsip_tsx_recv_msg(self._tsx, rdata) self.state = "incoming" self.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) event_dict = dict(obj=self) _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) _add_event("SIPIncomingRequestGotRequest", event_dict) # callback functions cdef void _Request_cb_tsx_state(pjsip_transaction *tsx, pjsip_event *event) with gil: cdef PJSIPUA ua cdef void *req_ptr cdef Request req cdef pjsip_rx_data *rdata = NULL try: ua = _get_ua() except: return try: req_ptr = tsx.mod_data[ua._module.id] if req_ptr != NULL: req = req_ptr if event.type == PJSIP_EVENT_RX_MSG: rdata = event.body.rx_msg.rdata elif event.type == PJSIP_EVENT_TSX_STATE and event.body.tsx_state.type == PJSIP_EVENT_RX_MSG: rdata = event.body.tsx_state.src.rdata req._cb_tsx_state(ua, rdata) except: ua._handle_exception(1) cdef void _Request_cb_timer(pj_timer_heap_t *timer_heap, pj_timer_entry *entry) with gil: cdef PJSIPUA ua cdef Request req try: ua = _get_ua() except: return try: if entry.user_data != NULL: req = entry.user_data req._timer_active = 0 req._cb_timer(ua) except: ua._handle_exception(1) diff --git a/sipsimple/core/_core.util.pxi b/sipsimple/core/_core.util.pxi index bd992658..0ccf102c 100644 --- a/sipsimple/core/_core.util.pxi +++ b/sipsimple/core/_core.util.pxi @@ -1,441 +1,441 @@ import platform import re import sys from application.version import Version cdef class PJSTR: def __cinit__(self, str): self.str = str _str_to_pj_str(str, &self.pj_str) def __str__(self): return self.str cdef class SIPStatusMessages: cdef object _default_status def __cinit__(self, *args, **kwargs): self._default_status = _pj_str_to_str(pjsip_get_status_text(0)[0]) def __getitem__(self, int val): cdef object _status _status = _pj_str_to_str(pjsip_get_status_text(val)[0]) if _status == self._default_status: raise IndexError("Unknown SIP response code: %d" % val) return _status cdef class frozenlist: def __cinit__(self, *args, **kw): self.list = list() self.initialized = 0 self.hash = 0 def __init__(self, *args, **kw): if not self.initialized: self.list = list(*args, **kw) self.initialized = 1 self.hash = hash(tuple(self.list)) def __reduce__(self): return (self.__class__, (self.list,), None) def __repr__(self): return "frozenlist(%r)" % self.list def __len__(self): return self.list.__len__() def __hash__(self): return self.hash def __iter__(self): return self.list.__iter__() def __cmp__(self, frozenlist other): return self.list.__cmp__(other.list) def __richcmp__(frozenlist self, other, op): if isinstance(other, frozenlist): other = (other).list if op == 0: return self.list.__cmp__(other) < 0 elif op == 1: return self.list.__cmp__(other) <= 0 elif op == 2: return self.list.__eq__(other) elif op == 3: return self.list.__ne__(other) elif op == 4: return self.list.__cmp__(other) > 0 elif op == 5: return self.list.__cmp__(other) >= 0 else: return NotImplemented def __contains__(self, item): return self.list.__contains__(item) def __getitem__(self, key): return self.list.__getitem__(key) def __add__(first, second): if isinstance(first, frozenlist): first = (first).list if isinstance(second, frozenlist): second = (second).list return frozenlist(first+second) def __mul__(first, second): if isinstance(first, frozenlist): first = (first).list if isinstance(second, frozenlist): second = (second).list return frozenlist(first*second) def __reversed__(self): return self.list.__reversed__() def count(self, elem): return self.list.count(elem) def index(self, elem): return self.list.index(elem) cdef class frozendict: def __cinit__(self, *args, **kw): self.dict = dict() self.initialized = 0 def __init__(self, *args, **kw): if not self.initialized: self.dict = dict(*args, **kw) self.initialized = 1 self.hash = hash(tuple(self.dict.iteritems())) def __reduce__(self): return (self.__class__, (self.dict,), None) def __repr__(self): return "frozendict(%r)" % self.dict def __len__(self): return self.dict.__len__() def __hash__(self): return self.hash def __iter__(self): return self.dict.__iter__() def __cmp__(self, frozendict other): return self.dict.__cmp__(other.dict) def __richcmp__(frozendict self, other, op): if isinstance(other, frozendict): other = (other).dict if op == 0: return self.dict.__cmp__(other) < 0 elif op == 1: return self.dict.__cmp__(other) <= 0 elif op == 2: return self.dict.__eq__(other) elif op == 3: return self.dict.__ne__(other) elif op == 4: return self.dict.__cmp__(other) > 0 elif op == 5: return self.dict.__cmp__(other) >= 0 else: return NotImplemented def __contains__(self, item): return self.dict.__contains__(item) def __getitem__(self, key): return self.dict.__getitem__(key) def copy(self): return self def get(self, *args): return self.dict.get(*args) def has_key(self, key): return self.dict.has_key(key) def items(self): return self.dict.items() def iteritems(self): return self.dict.iteritems() def iterkeys(self): return self.dict.iterkeys() def itervalues(self): return self.dict.itervalues() def keys(self): return self.dict.keys() def values(self): return self.dict.values() # functions cdef int _str_to_pj_str(object string, pj_str_t *pj_str) except -1: # Feed data from Python to PJSIP # TODO: convert to Python3 bytes_string = string.encode() pj_str.ptr = PyBytes_AsString(bytes_string) pj_str.slen = len(bytes_string) print("Encoded STR %s to PJS %s" % (string, pj_str.ptr)) cdef object _pj_str_to_str(pj_str_t pj_str): # Feed data from PJSIP to the Python bytes_string = PyBytes_FromStringAndSize(pj_str.ptr, pj_str.slen) string = bytes_string.decode() print("Decoded PJS %s to STR %s" % (pj_str.ptr, string)) return string cdef object _pj_str_to_str(pj_str_t pj_str): return PyBytes_FromStringAndSize(pj_str.ptr, pj_str.slen).decode() cdef object _pj_buf_len_to_str(object buf, int buf_len): # TODO: convert to Python3 return PyBytes_FromStringAndSize(buf, buf_len) cdef object _buf_to_str(object buf): # TODO: convert to Python3 return PyBytes_FromString(buf) cdef object _str_as_str(object string): # TODO: convert to Python3 return PyBytes_AsString(string) cdef object _str_as_size(object string): # TODO: convert to Python3 return PyBytes_Size(string) cdef object _pj_status_to_str(int status): cdef char buf[PJ_ERR_MSG_SIZE] return _pj_str_to_str(pj_strerror(status, buf, PJ_ERR_MSG_SIZE)) cdef object _pj_status_to_def(int status): return _re_pj_status_str_def.match(_pj_status_to_str(status)).group(1) cdef dict _pjsip_param_to_dict(pjsip_param *param_list): cdef pjsip_param *param cdef dict retval = dict() param = ( param_list).next while param != param_list: if param.value.slen == 0: retval[_pj_str_to_str(param.name)] = None else: retval[_pj_str_to_str(param.name)] = _pj_str_to_str(param.value) param = ( param).next return retval cdef int _dict_to_pjsip_param(object params, pjsip_param *param_list, pj_pool_t *pool): cdef pjsip_param *param = NULL for name, value in params.iteritems(): param = pj_pool_alloc(pool, sizeof(pjsip_param)) if param == NULL: return -1 _str_to_pj_str(name.encode(), ¶m.name) if value is None: param.value.slen = 0 else: _str_to_pj_str(value.encode(), ¶m.value) pj_list_insert_after( param_list, param) return 0 cdef int _pjsip_msg_to_dict(pjsip_msg *msg, dict info_dict) except -1: cdef pjsip_msg_body *body cdef pjsip_hdr *header cdef pjsip_generic_array_hdr *array_header cdef pjsip_ctype_hdr *ctype_header cdef pjsip_cseq_hdr *cseq_header cdef char *buf cdef int buf_len, i, status headers = {} header = ( &msg.hdr).next while header != &msg.hdr: header_name = _pj_str_to_str(header.name) header_data = None multi_header = False if header_name in ("Accept", "Allow", "Require", "Supported", "Unsupported", "Allow-Events"): array_header = header header_data = [] for i from 0 <= i < array_header.count: pass # TODO crash here #header_data.append(_pj_str_to_str(array_header.values[i])) elif header_name == "Contact": multi_header = True header_data = FrozenContactHeader_create( header) elif header_name == "Content-Length": header_data = ( header).len elif header_name == "Content-Type": header_data = FrozenContentTypeHeader_create( header) elif header_name == "CSeq": cseq_header = header header_data = (cseq_header.cseq, _pj_str_to_str(cseq_header.method.name)) elif header_name in ("Expires", "Max-Forwards", "Min-Expires"): header_data = ( header).ivalue elif header_name == "From": header_data = FrozenFromHeader_create( header) elif header_name == "To": header_data = FrozenToHeader_create( header) elif header_name == "Route": multi_header = True header_data = FrozenRouteHeader_create( header) elif header_name == "Reason": value = _pj_str_to_str((header).hvalue) protocol, sep, params_str = value.partition(';') params = frozendict([(name, value or None) for name, sep, value in [param.partition('=') for param in params_str.split(';')]]) header_data = FrozenReasonHeader(protocol, params) elif header_name == "Record-Route": multi_header = True header_data = FrozenRecordRouteHeader_create( header) elif header_name == "Retry-After": header_data = FrozenRetryAfterHeader_create( header) elif header_name == "Via": multi_header = True header_data = FrozenViaHeader_create( header) elif header_name == "Warning": match = _re_warning_hdr.match(_pj_str_to_str((header).hvalue)) if match is not None: warning_params = match.groupdict() warning_params['code'] = int(warning_params['code']) header_data = FrozenWarningHeader(**warning_params) elif header_name == "Event": header_data = FrozenEventHeader_create( header) elif header_name == "Subscription-State": header_data = FrozenSubscriptionStateHeader_create( header) elif header_name == "Refer-To": header_data = FrozenReferToHeader_create( header) elif header_name == "Subject": header_data = FrozenSubjectHeader_create( header) elif header_name == "Replaces": header_data = FrozenReplacesHeader_create( header) # skip the following headers: elif header_name not in ("Authorization", "Proxy-Authenticate", "Proxy-Authorization", "WWW-Authenticate"): header_data = FrozenHeader(header_name, _pj_str_to_str(( header).hvalue)) if header_data is not None: if multi_header: headers.setdefault(header_name, []).append(header_data) else: if header_name not in headers: headers[header_name] = header_data header = ( header).next info_dict["headers"] = headers body = msg.body if body == NULL: info_dict["body"] = None else: status = pjsip_print_body(body, &buf, &buf_len) if status != 0: info_dict["body"] = None else: info_dict["body"] = _pj_buf_len_to_str(buf, buf_len) if msg.type == PJSIP_REQUEST_MSG: info_dict["method"] = _pj_str_to_str(msg.line.req.method.name) # You need to call pjsip_uri_get_uri on the request URI if the message is for transmitting, # but it isn't required if message is one received. Otherwise, a seg fault occurs. Don't ask. info_dict["request_uri"] = FrozenSIPURI_create(pjsip_uri_get_uri(msg.line.req.uri)) else: info_dict["code"] = msg.line.status.code info_dict["reason"] = _pj_str_to_str(msg.line.status.reason) return 0 cdef int _is_valid_ip(int af, object ip) except -1: cdef char buf[16] cdef pj_str_t src cdef int status _str_to_pj_str(ip.encode(), &src) status = pj_inet_pton(af, &src, buf) if status == 0: return 1 else: return 0 cdef int _get_ip_version(object ip) except -1: if _is_valid_ip(pj_AF_INET(), ip): return pj_AF_INET() elif _is_valid_ip(pj_AF_INET6(), ip): return pj_AF_INET() else: return 0 cdef int _add_headers_to_tdata(pjsip_tx_data *tdata, object headers) except -1: cdef pj_str_t name_pj, value_pj cdef pjsip_hdr *hdr for header in headers: _str_to_pj_str(header.name, &name_pj) _str_to_pj_str(header.body, &value_pj) hdr = pjsip_generic_string_hdr_create(tdata.pool, &name_pj, &value_pj) pjsip_msg_add_hdr(tdata.msg, hdr) cdef int _remove_headers_from_tdata(pjsip_tx_data *tdata, object headers) except -1: cdef pj_str_t header_name_pj cdef pjsip_hdr *hdr for header in headers: _str_to_pj_str(header.encode(), &header_name_pj) hdr = pjsip_msg_find_remove_hdr_by_name(tdata.msg, &header_name_pj, NULL) cdef int _BaseRouteHeader_to_pjsip_route_hdr(BaseIdentityHeader header, pjsip_route_hdr *pj_header, pj_pool_t *pool) except -1: cdef pjsip_param *param cdef pjsip_sip_uri *sip_uri pjsip_route_hdr_init(NULL, pj_header) sip_uri = pj_pool_alloc(pool, sizeof(pjsip_sip_uri)) _BaseSIPURI_to_pjsip_sip_uri(header.uri, sip_uri, pool) pj_header.name_addr.uri = sip_uri if header.display_name: _str_to_pj_str(header.display_name, &pj_header.name_addr.display) _dict_to_pjsip_param(header.parameters, &pj_header.other_param, pool) return 0 cdef int _BaseSIPURI_to_pjsip_sip_uri(BaseSIPURI uri, pjsip_sip_uri *pj_uri, pj_pool_t *pool) except -1: cdef pjsip_param *param pjsip_sip_uri_init(pj_uri, uri.secure) if uri.user: _str_to_pj_str(uri.user, &pj_uri.user) if uri.password: _str_to_pj_str(uri.password, &pj_uri.passwd) if uri.host: _str_to_pj_str(uri.host, &pj_uri.host) if uri.port: pj_uri.port = uri.port for name, value in uri.parameters.iteritems(): if name == "lr": pj_uri.lr_param = 1 elif name == "maddr": _str_to_pj_str(value, &pj_uri.maddr_param) elif name == "method": _str_to_pj_str(value, &pj_uri.method_param) elif name == "transport": _str_to_pj_str(value, &pj_uri.transport_param) elif name == "ttl": pj_uri.ttl_param = int(value) elif name == "user": _str_to_pj_str(value, &pj_uri.user_param) else: param = pj_pool_alloc(pool, sizeof(pjsip_param)) - _str_to_pj_str(name.encode(), ¶m.name) + _str_to_pj_str(name, ¶m.name) if value is None: param.value.slen = 0 else: _str_to_pj_str(value, ¶m.value) pj_list_insert_after( &pj_uri.other_param, param) _dict_to_pjsip_param(uri.headers, &pj_uri.header_param, pool) return 0 cdef int _BaseRouteHeader_to_pjsip_route_hdr(BaseIdentityHeader header, pjsip_route_hdr *pj_header, pj_pool_t *pool) except -1: cdef pjsip_param *param cdef pjsip_sip_uri *sip_uri pjsip_route_hdr_init(NULL, pj_header) sip_uri = pj_pool_alloc(pool, sizeof(pjsip_sip_uri)) _BaseSIPURI_to_pjsip_sip_uri(header.uri, sip_uri, pool) pj_header.name_addr.uri = sip_uri if header.display_name: _str_to_pj_str(header.display_name.encode('utf-8'), &pj_header.name_addr.display) _dict_to_pjsip_param(header.parameters, &pj_header.other_param, pool) return 0 def _get_device_name_encoding(): if sys.platform == 'win32': encoding = 'mbcs' elif sys.platform.startswith('linux2') and Version.parse(platform.release()) < Version(2,6,31): encoding = 'latin1' else: encoding = 'utf-8' return encoding _device_name_encoding = _get_device_name_encoding() def decode_device_name(device_name): # ignore decoding errors, some systems (I'm looking at you, OSX), seem to misbehave return device_name.decode(_device_name_encoding, 'ignore') # globals cdef object _re_pj_status_str_def = re.compile("^.*\((.*)\)$") cdef object _re_warning_hdr = re.compile('(?P[0-9]{3}) (?P.*?) "(?P.*?)"') sip_status_messages = SIPStatusMessages() diff --git a/sipsimple/session.py b/sipsimple/session.py index b9d173f4..316dd8b3 100644 --- a/sipsimple/session.py +++ b/sipsimple/session.py @@ -1,2737 +1,2740 @@ """ Implements an asynchronous notification based mechanism for establishment, modification and termination of sessions using Session Initiation Protocol (SIP) standardized in RFC3261. """ __all__ = ['Session', 'SessionManager'] import random from threading import RLock from time import time from application.notification import IObserver, Notification, NotificationCenter, NotificationData from application.python import Null, limit from application.python.decorator import decorator, preserve_signature from application.python.types import Singleton from application.system import host from eventlib import api, coros, proc from twisted.internet import reactor from zope.interface import implementer from sipsimple import log from sipsimple.account import AccountManager, BonjourAccount from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import DialogID, Engine, Invitation, Referral, Subscription, PJSIPError, SIPCoreError, SIPCoreInvalidStateError, SIPURI, sip_status_messages, sipfrag_re from sipsimple.core import ContactHeader, FromHeader, Header, ReasonHeader, ReferToHeader, ReplacesHeader, RouteHeader, ToHeader, WarningHeader from sipsimple.core import SDPConnection, SDPMediaStream, SDPSession from sipsimple.core import PublicGRUU, PublicGRUUIfAvailable, NoGRUU from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import ParserError from sipsimple.payloads.conference import ConferenceDocument from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import ISOTimestamp class InvitationDisconnectedError(Exception): def __init__(self, invitation, data): self.invitation = invitation self.data = data class MediaStreamDidNotInitializeError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class MediaStreamDidFailError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class SubscriptionError(Exception): def __init__(self, error, timeout, **attributes): self.error = error self.timeout = timeout self.attributes = attributes class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class ReferralError(Exception): def __init__(self, error, code=0): self.error = error self.code = code class TerminateReferral(Exception): pass class SIPReferralDidFail(Exception): def __init__(self, data): self.data = data class IllegalStateError(RuntimeError): pass class IllegalDirectionError(RuntimeError): pass class SIPInvitationTransferDidFail(Exception): def __init__(self, data): self.data = data @decorator def transition_state(required_state, new_state): def state_transitioner(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): with obj._lock: if obj.state != required_state: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) obj.state = new_state return func(obj, *args, **kwargs) return wrapper return state_transitioner @decorator def check_state(required_states): def state_checker(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): if obj.state not in required_states: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) return func(obj, *args, **kwargs) return wrapper return state_checker @decorator def check_transfer_state(direction, state): def state_checker(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): if obj.transfer_handler.direction != direction: raise IllegalDirectionError('cannot transfer in %s direction' % obj.transfer_handler.direction) if obj.transfer_handler.state != state: raise IllegalStateError('cannot transfer in %s state' % obj.transfer_handler.state) return func(obj, *args, **kwargs) return wrapper return state_checker class AddParticipantOperation(object): pass class RemoveParticipantOperation(object): pass @implementer(IObserver) class ReferralHandler(object): def __init__(self, session, participant_uri, operation): self.participant_uri = participant_uri if not isinstance(self.participant_uri, SIPURI): if not self.participant_uri.startswith(('sip:', 'sips:')): self.participant_uri = 'sip:%s' % self.participant_uri try: self.participant_uri = SIPURI.parse(self.participant_uri) except SIPCoreError: notification_center = NotificationCenter() if operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI')) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI')) return self.session = session self.operation = operation self.active = False self.route = None self._channel = coros.queue() self._referral = None def start(self): notification_center = NotificationCenter() if not self.session.remote_focus: if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus')) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus')) self.session = None return notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') proc.spawn(self._run) def _run(self): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account is BonjourAccount(): uri = SIPURI.new(self.session._invitation.remote_contact_header.uri) elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: timeout = random.uniform(15, 30) raise ReferralError(error='DNS lookup failed: %s' % e) target_uri = SIPURI.new(self.session.remote_identity.uri) timeout = time() + 30 for route in routes: self.route = route remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[NoGRUU, route] except KeyError: continue refer_to_header = ReferToHeader(str(self.participant_uri)) refer_to_header.parameters['method'] = 'INVITE' if self.operation is AddParticipantOperation else 'BYE' referral = Referral(target_uri, FromHeader(account.uri, account.display_name), ToHeader(target_uri), refer_to_header, ContactHeader(contact_uri), RouteHeader(route.uri), account.credentials) notification_center.add_observer(self, sender=referral) try: referral.send_refer(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=referral) timeout = 5 raise ReferralError(error='Internal error') self._referral = referral try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidStart': break except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=referral) self._referral = None if e.data.code in (403, 405): raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code) else: # Otherwise just try the next route continue else: break else: self.route = None raise ReferralError(error='No more routes to try') # At this point it is subscribed. Handle notifications and ending/failures. try: self.active = True while True: notification = self._channel.wait() if notification.name == 'SIPReferralGotNotify': if notification.data.event == 'refer' and notification.data.body: match = sipfrag_re.match(notification.data.body) if match: code = int(match.group('code')) reason = match.group('reason') if code/100 > 2: continue if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceGotAddParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason)) else: notification_center.post_notification('SIPConferenceGotRemoveParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason)) elif notification.name == 'SIPReferralDidEnd': break except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=self._referral) raise ReferralError(error=e.data.reason, code=e.data.code) else: notification_center.remove_observer(self, sender=self._referral) if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri)) else: notification_center.post_notification('SIPConferenceDidRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri)) finally: self.active = False except TerminateReferral: if self._referral is not None: try: self._referral.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidEnd': break except SIPReferralDidFail: pass finally: notification_center.remove_observer(self, sender=self._referral) if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error')) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error')) except ReferralError as e: if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error)) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error)) finally: notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self.session = None self._referral = None def _refresh(self): try: contact_header = ContactHeader(self.session.account.contact[NoGRUU, self.route]) except KeyError: pass else: try: self._referral.refresh(contact_header=contact_header, timeout=2) except (SIPCoreError, SIPCoreInvalidStateError): pass @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPReferralDidStart(self, notification): self._channel.send(notification) def _NH_SIPReferralDidEnd(self, notification): self._channel.send(notification) def _NH_SIPReferralDidFail(self, notification): self._channel.send_exception(SIPReferralDidFail(notification.data)) def _NH_SIPReferralGotNotify(self, notification): self._channel.send(notification) def _NH_SIPSessionDidFail(self, notification): self._channel.send_exception(TerminateReferral()) def _NH_SIPSessionWillEnd(self, notification): self._channel.send_exception(TerminateReferral()) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._refresh() @implementer(IObserver) class ConferenceHandler(object): def __init__(self, session): self.session = session self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) @run_in_green_thread def add_participant(self, participant_uri): referral_handler = ReferralHandler(self.session, participant_uri, AddParticipantOperation) referral_handler.start() @run_in_green_thread def remove_participant(self, participant_uri): referral_handler = ReferralHandler(self.session, participant_uri, RemoveParticipantOperation) referral_handler.start() def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _activate(self): self.active = True command = Command('subscribe') self._command_channel.send(command) return command def _deactivate(self): self.active = False command = Command('unsubscribe') self._command_channel.send(command) return command def _resubscribe(self): command = Command('subscribe') self._command_channel.send(command) return command def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._deactivate() command = Command('terminate') self._command_channel.send(command) command.wait() self.session = None def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): command.signal() raise proc.ProcExit() def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account is BonjourAccount(): uri = SIPURI.new(self.session._invitation.remote_contact_header.uri) elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) target_uri = SIPURI.new(self.session.remote_identity.uri) default_interval = 600 if account is BonjourAccount() else account.sip.subscribe_interval refresh_interval = getattr(command, 'refresh_interval', default_interval) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[NoGRUU, route] except KeyError: continue subscription = Subscription(target_uri, FromHeader(account.uri, account.display_name), ToHeader(target_uri), ContactHeader(contact_uri), 'conference', RouteHeader(route.uri), credentials=account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) timeout = 5 raise SubscriptionError(error='Internal error', timeout=timeout) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail as e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time timeout = random.uniform(60, 120) raise SubscriptionError(error='Authentication failed', timeout=timeout) elif e.data.code == 423: # Get the value of the Min-Expires header timeout = random.uniform(60, 120) if e.data.min_expires is not None and e.data.min_expires > refresh_interval: raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires) else: raise SubscriptionError(error='Interval too short', timeout=timeout) elif e.data.code in (405, 406, 489, 1400): command.signal(e) return else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription timeout = random.uniform(60, 180) raise SubscriptionError(error='No more routes to try', timeout=timeout) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'conference' and notification.data.body: try: conference_info = ConferenceDocument.parse(notification.data.body) except ParserError: pass else: notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info)) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) except InterruptSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) except SubscriptionError as e: if 'min_expires' in e.attributes: command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires']) else: command = Command('subscribe', command.event) self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_SIPSessionDidStart(self, notification): if self.session.remote_focus: self._activate() @run_in_green_thread def _NH_SIPSessionDidFail(self, notification): self._terminate() @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): self._terminate() def _NH_SIPSessionDidRenegotiateStreams(self, notification): if self.session.remote_focus and not self.active: self._activate() elif not self.session.remote_focus and self.active: self._deactivate() def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._resubscribe() class TransferInfo(object): def __init__(self, referred_by=None, replaced_dialog_id=None): self.referred_by = referred_by self.replaced_dialog_id = replaced_dialog_id @implementer(IObserver) class TransferHandler(object): def __init__(self, session): self.state = None self.direction = None self.new_session = None self.session = session notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.session._invitation) self._command_channel = coros.queue() self._data_channel = coros.queue() self._proc = proc.spawn(self._run) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) self.direction = None self.state = None def _CH_incoming_transfer(self, command): self.direction = 'incoming' notification_center = NotificationCenter() refer_to_hdr = command.data.headers.get('Refer-To') target = SIPURI.parse(refer_to_hdr.uri) referred_by_hdr = command.data.headers.get('Referred-By', None) if referred_by_hdr is not None: origin = referred_by_hdr.body else: origin = str(self.session.remote_identity.uri) try: while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail: self.state = 'failed' return else: if notification.name == 'SIPInvitationTransferDidStart': self.state = 'starting' refer_to_uri = SIPURI.new(target) refer_to_uri.headers = {} refer_to_uri.parameters = {} notification_center.post_notification('SIPSessionTransferNewIncoming', self.session, NotificationData(transfer_destination=refer_to_uri)) elif notification.name == 'SIPSessionTransferDidStart': break elif notification.name == 'SIPSessionTransferDidFail': self.state = 'failed' try: self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason) except SIPCoreError: return while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail: return self.state = 'started' transfer_info = TransferInfo(referred_by=origin) try: replaces_hdr = target.headers.pop('Replaces') call_id, rest = replaces_hdr.split(';', 1) params = dict((item.split('=') for item in rest.split(';'))) to_tag = params.get('to-tag') from_tag = params.get('from-tag') except (KeyError, ValueError): pass else: transfer_info.replaced_dialog_id = DialogID(call_id, local_tag=from_tag, remote_tag=to_tag) settings = SIPSimpleSettings() account = self.session.account if account is BonjourAccount(): uri = target elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = target lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: self.state = 'failed' notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=0, reason="DNS lookup failed: {}".format(e))) try: self.session._invitation.notify_transfer_progress(480) except SIPCoreError: return while True: try: self._data_channel.wait() except SIPInvitationTransferDidFail: return return self.new_session = Session(account) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.new_session) self.new_session.connect(ToHeader(target), routes=routes, streams=[MediaStreamRegistry.AudioStream()], transfer_info=transfer_info) while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail: return if notification.name == 'SIPInvitationTransferDidEnd': return except proc.ProcExit: if self.new_session is not None: notification_center.remove_observer(self, sender=self.new_session) self.new_session = None raise def _CH_outgoing_transfer(self, command): self.direction = 'outgoing' notification_center = NotificationCenter() self.state = 'starting' while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail as e: self.state = 'failed' notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=e.data.code, reason=e.data.reason)) return if notification.name == 'SIPInvitationTransferDidStart': self.state = 'started' notification_center.post_notification('SIPSessionTransferDidStart', sender=self.session) elif notification.name == 'SIPInvitationTransferDidEnd': self.state = 'ended' self.session.end() notification_center.post_notification('SIPSessionTransferDidEnd', sender=self.session) return def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session._invitation) notification_center.remove_observer(self, sender=self.session) self._proc.kill() self._proc = None self._command_channel = None self._data_channel = None self.session = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPInvitationTransferNewIncoming(self, notification): self._command_channel.send(Command('incoming_transfer', data=notification.data)) def _NH_SIPInvitationTransferNewOutgoing(self, notification): self._command_channel.send(Command('outgoing_transfer', data=notification.data)) def _NH_SIPInvitationTransferDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPInvitationTransferDidFail(self, notification): self._data_channel.send_exception(SIPInvitationTransferDidFail(notification.data)) def _NH_SIPInvitationTransferDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPInvitationTransferGotNotify(self, notification): if notification.data.event == 'refer' and notification.data.body: match = sipfrag_re.match(notification.data.body) if match: code = int(match.group('code')) reason = match.group('reason') notification.center.post_notification('SIPSessionTransferGotProgress', sender=self.session, data=NotificationData(code=code, reason=reason)) def _NH_SIPSessionTransferDidStart(self, notification): if notification.sender is self.session and self.state == 'starting': self._data_channel.send(notification) def _NH_SIPSessionTransferDidFail(self, notification): if notification.sender is self.session and self.state == 'starting': self._data_channel.send(notification) def _NH_SIPSessionGotRingIndication(self, notification): if notification.sender is self.new_session and self.session is not None: try: self.session._invitation.notify_transfer_progress(180) except SIPCoreError: pass def _NH_SIPSessionGotProvisionalResponse(self, notification): if notification.sender is self.new_session and self.session is not None: try: self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason) except SIPCoreError: pass def _NH_SIPSessionDidStart(self, notification): if notification.sender is self.new_session: notification.center.remove_observer(self, sender=notification.sender) self.new_session = None if self.session is not None: notification.center.post_notification('SIPSessionTransferDidEnd', sender=self.session) if self.state == 'started': try: self.session._invitation.notify_transfer_progress(200) except SIPCoreError: pass self.state = 'ended' self.session.end() def _NH_SIPSessionDidEnd(self, notification): if notification.sender is self.new_session: # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead notification.center.remove_observer(self, sender=notification.sender) self.new_session = None if self.session is not None: notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=500, reason='internal error')) if self.state == 'started': try: self.session._invitation.notify_transfer_progress(500) except SIPCoreError: pass self.state = 'failed' else: self._terminate() def _NH_SIPSessionDidFail(self, notification): if notification.sender is self.new_session: notification.center.remove_observer(self, sender=notification.sender) self.new_session = None if self.session is not None: notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=notification.data.code or 500, reason=notification.data.reason)) if self.state == 'started': try: self.session._invitation.notify_transfer_progress(notification.data.code or 500, notification.data.reason) except SIPCoreError: pass self.state = 'failed' else: self._terminate() class OptionalTag(str): def __eq__(self, other): return other is None or super(OptionalTag, self).__eq__(other) def __ne__(self, other): return not self == other def __repr__(self): return '{}({})'.format(self.__class__.__name__, super(OptionalTag, self).__repr__()) @implementer(IObserver) class SessionReplaceHandler(object): def __init__(self, session): self.session = session def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.session.replaced_session) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): notification.center.remove_observer(self, sender=self.session) notification.center.remove_observer(self, sender=self.session.replaced_session) self.session.replaced_session.end() self.session.replaced_session = None self.session = None def _NH_SIPSessionDidFail(self, notification): if notification.sender is self.session: notification.center.remove_observer(self, sender=self.session) notification.center.remove_observer(self, sender=self.session.replaced_session) self.session.replaced_session = None self.session = None _NH_SIPSessionDidEnd = _NH_SIPSessionDidFail @implementer(IObserver) class Session(object): media_stream_timeout = 15 short_reinvite_timeout = 5 def __init__(self, account): self.account = account self.direction = None self.end_time = None self.on_hold = False self.proposed_streams = None self.route = None self.state = None self.start_time = None self.streams = None self.transport = None self.local_focus = False self.remote_focus = False self.greenlet = None self.conference = None self.replaced_session = None self.transfer_handler = None self.transfer_info = None self._channel = coros.queue() self._hold_in_progress = False self._invitation = None self._local_identity = None self._remote_identity = None self._lock = RLock() def init_incoming(self, invitation, data): notification_center = NotificationCenter() remote_sdp = invitation.sdp.proposed_remote self.proposed_streams = [] if remote_sdp: for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, remote_sdp, index) except UnknownStreamError: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break self.direction = 'incoming' self.state = 'incoming' self.transport = invitation.transport self._invitation = invitation self.conference = ConferenceHandler(self) self.transfer_handler = TransferHandler(self) if 'isfocus' in invitation.remote_contact_header.parameters: self.remote_focus = True if 'Referred-By' in data.headers or 'Replaces' in data.headers: self.transfer_info = TransferInfo() if 'Referred-By' in data.headers: self.transfer_info.referred_by = data.headers['Referred-By'].body if 'Replaces' in data.headers: replaces_header = data.headers.get('Replaces') # Because we only allow the remote tag to be optional, it can only match established dialogs and early outgoing dialogs, but not early incoming dialogs, # which according to RFC3891 should be rejected with 481 (which will happen automatically by never matching them). if replaces_header.early_only or replaces_header.from_tag == '0': replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=OptionalTag(replaces_header.from_tag)) else: replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=replaces_header.from_tag) session_manager = SessionManager() try: replaced_session = next(session for session in session_manager.sessions if session.dialog_id == replaced_dialog_id) except StopIteration: invitation.send_response(481) return else: # Any matched dialog at this point is either established, terminated or early outgoing. if replaced_session.state in ('terminating', 'terminated'): invitation.send_response(603) return elif replaced_session.dialog_id.remote_tag is not None and replaces_header.early_only: # The replaced dialog is established, but the early-only flag is set invitation.send_response(486) return self.replaced_session = replaced_session self.transfer_info.replaced_dialog_id = replaced_dialog_id replace_handler = SessionReplaceHandler(self) replace_handler.start() notification_center.add_observer(self, sender=invitation) notification_center.post_notification('SIPSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams[:], headers=data.headers)) @transition_state(None, 'connecting') @run_in_green_thread def connect(self, to_header, routes, streams, is_focus=False, transfer_info=None, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False received_code = 0 received_reason = None unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') self.direction = 'outgoing' self.proposed_streams = streams self.route = routes[0] self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() - self._local_identity = FromHeader(self.account.uri, self.account.display_name) + display_name = self.account.display_name.decode() if self.account.display_name else None + self._local_identity = FromHeader(self.account.uri, display_name) self._remote_identity = to_header self.conference = ConferenceHandler(self) self.transfer_handler = TransferHandler(self) self.transfer_info = transfer_info notification_center.add_observer(self, sender=self._invitation) notification_center.post_notification('SIPSessionNewOutgoing', sender=self, data=NotificationData(streams=streams[:])) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 try: contact_uri = self.account.contact[PublicGRUUIfAvailable, self.route] local_ip = host.outgoing_ip_for(self.route.address) if local_ip is None: raise ValueError("could not get outgoing IP address") except (KeyError, ValueError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e)) return connection = SDPConnection(local_ip.encode()) local_sdp = SDPSession(local_ip.encode(), name=settings.user_agent.encode()) for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection local_sdp.media.append(media) - from_header = FromHeader(self.account.uri, self.account.display_name.decode()) + + display_name = self.account.display_name.decode() if self.account.display_name else None + from_header = FromHeader(self.account.uri, display_name) route_header = RouteHeader(self.route.uri) contact_header = ContactHeader(contact_uri) if is_focus: contact_header.parameters['isfocus'] = None if self.transfer_info is not None: if self.transfer_info.referred_by is not None: extra_headers.append(Header('Referred-By', self.transfer_info.referred_by)) if self.transfer_info.replaced_dialog_id is not None: dialog_id = self.transfer_info.replaced_dialog_id extra_headers.append(ReplacesHeader(dialog_id.call_id, dialog_id.local_tag, dialog_id.remote_tag)) self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, self.account.credentials, extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.end() return notification_center.post_notification('SIPSessionWillStart', sender=self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: # TODO: check if port is also 0 in local_sdp. In that case PJSIP disabled the stream because # negotiation failed. If there are more streams, however, the negotiation is considered successful as a # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind io # OK, but we cannot really start the stream. -Saul stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) for notification in invitation_notifications: self._channel.send(notification) while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed-out while starting' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=0, reason=None, error=error) except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator)) if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200])) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason)) code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: # TODO: we should know *exactly* when there are set -Saul code = getattr(e.data, 'code', 0) reason = getattr(e.data, 'reason', 'Session disconnected') if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in self.streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() def _reinvite_after_ice(self): # This function does not do any error checking, it's designed to be called at the end of connect and add_stream self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for index, stream in enumerate(self.streams): local_sdp.media[index] = stream.get_local_media(remote_sdp=None, index=index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False try: with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for index, stream in enumerate(self.streams): stream.update(local_sdp, remote_sdp, index) else: return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'disconnected': self.end() return except Exception: pass finally: self.state = 'connected' self.greenlet = None @check_state(['incoming', 'received_proposal']) @run_in_green_thread def send_ring_indication(self): try: self._invitation.send_response(180) except SIPCoreInvalidStateError: pass # The INVITE session might have already been cancelled; ignore the error @transition_state('incoming', 'accepting') @run_in_green_thread def accept(self, streams, is_focus=False, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() self.local_focus = is_focus connected = False unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') if self.proposed_streams: for stream in self.proposed_streams: if stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') else: for index, stream in enumerate(streams): notification_center.add_observer(self, sender=stream) stream.index = index stream.initialize(self, direction='outgoing') self.proposed_streams = streams wait_count = len(self.proposed_streams) try: while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 remote_sdp = self._invitation.sdp.proposed_remote sdp_connection = remote_sdp.connection or next((media.connection for media in remote_sdp.media if media.connection is not None)) local_ip = host.outgoing_ip_for(sdp_connection.address) if sdp_connection.address != '0.0.0.0' else sdp_connection.address if local_ip is None: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address') return connection = SDPConnection(local_ip.encode()) local_sdp = SDPSession(local_ip.encode(), name=settings.user_agent.encode()) if remote_sdp: stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: print('session.py: we will get local media') # TODO: broken for RTP streams here media = stream.get_local_media(remote_sdp=remote_sdp, index=index) print('session.py: we got local media %s' % media) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection else: media = SDPMediaStream.new(media.encode()) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] print('Added media to SDP: %s' % media) local_sdp.media.append(media) else: for index, stream in enumerate(self.proposed_streams): stream.index = index print('session.py: we will get local media 2') media = stream.get_local_media(remote_sdp=None, index=index) print('session.py: we got local media 2 %s' % media) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection print('Added media to SDP: %s' % media) local_sdp.media.append(media) contact_header = ContactHeader.new(self._invitation.local_contact_header) try: local_contact_uri = self.account.contact[PublicGRUU, self._invitation.transport] except KeyError: pass else: contact_header.uri = local_contact_uri if is_focus: contact_header.parameters['isfocus'] = None self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp, extra_headers=extra_headers) notification_center.post_notification('SIPSessionWillStart', sender=self) # Local and remote SDPs will be set after the 200 OK is sent while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: if not connected: # we could not have got a SIPInvitationGotSDPUpdate if we did not get an ACK connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True)) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True)) elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) wait_count = 0 stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map.get(index, None) if stream is not None: if remote_media.port: wait_count += 1 stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): while wait_count > 0 or not connected or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True)) elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: if self._invitation.state == 'connecting': ack_received = False if isinstance(e, api.TimeoutError) and wait_count == 0 else 'unknown' # pjsip's invite session object does not inform us whether the ACK was received or not notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=ack_received)) elif self._invitation.state == 'connected' and not connected: # we didn't yet get to process the SIPInvitationChangedState (state -> connected) notification notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True)) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() reason_header = None if isinstance(e, api.TimeoutError): if wait_count > 0: error = 'media stream timed-out while starting' else: error = 'No ACK received' reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'Missing ACK' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream did not initialize' else: error = 'media stream failed: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream failed to start' self.start_time = ISOTimestamp.now() if self._invitation.state in ('incoming', 'early'): self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header) else: self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header) except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' if e.data.prev_state in ('incoming', 'early'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown')) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None)) elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=False)) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote')) notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=getattr(e.data, 'method', 'INVITE'), code=200, reason='OK')) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason)) self.greenlet = None except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown')) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None @transition_state('incoming', 'terminating') @run_in_green_thread def reject(self, code=603, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'disconnected': ack_received = notification.data.disconnect_reason != 'missing ACK' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=ack_received)) break except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party self.greenlet = None except SIPCoreError as e: self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) except api.TimeoutError: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=False)) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None)) else: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None)) finally: self.greenlet = None @transition_state('received_proposal', 'accepting_proposal') @run_in_green_thread def accept_proposal(self, streams): self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] streams = [stream for stream in streams if stream in self.proposed_streams] for stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') try: wait_count = len(streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 remote_sdp = self._invitation.sdp.proposed_remote connection = SDPConnection(local_sdp.address) stream_map = dict((stream.index, stream) for stream in streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection if index < len(local_sdp.media): local_sdp.media[index] = media else: local_sdp.media.append(media) elif index >= len(local_sdp.media): # actually == is sufficient media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) self._invitation.send_response(200, sdp=local_sdp) prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) received_invitation_state = False received_sdp_update = False while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown')) received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) for stream in streams: # TODO: check if port is 0 in local_sdp. In that case PJSIP disabled the stream because # negotiation failed. If there are more streams, however, the negotiation is considered successful as a # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind of # OK, but we cannot really start the stream. -Saul stream.start(local_sdp, remote_sdp, stream.index) with api.timeout(self.media_stream_timeout): wait_count = len(streams) while wait_count > 0 or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 else: unhandled_notifications.append(notification) except api.TimeoutError: self._fail_proposal(originator='remote', error='media stream timed-out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='remote', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='remote', error='media stream failed: {.data.reason}'.format(e)) except InvitationDisconnectedError as e: self._fail_proposal(originator='remote', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: proposed_streams = self.proposed_streams self.proposed_streams = None self.streams = self.streams + streams self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None @transition_state('received_proposal', 'rejecting_proposal') @run_in_green_thread def reject_proposal(self, code=488, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1, None): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received='unknown')) break except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: proposed_streams = self.proposed_streams self.proposed_streams = None self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None def add_stream(self, stream): self.add_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def add_streams(self, streams): streams = list(set(streams).difference(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() unhandled_notifications = [] self.proposed_streams = streams for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': # This is actually the only reason for which this notification could be received if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self._fail_proposal(originator='local', error='received stream proposal') self.handle_notification(notification) return local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.proposed_streams: # Try to reuse a disabled media stream to avoid an ever-growing SDP try: index = next(index for index, media in enumerate(local_sdp.media) if media.port == 0) reuse_media = True except StopIteration: index = len(local_sdp.media) reuse_media = False stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if reuse_media: local_sdp.media[index] = media else: local_sdp.media.append(media) self._invitation.send_reinvite(sdp=local_sdp) notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:])) received_invitation_state = False received_sdp_update = False try: with api.timeout(settings.sip.invite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) else: self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if notification.data.code >= 300: proposed_streams = self.proposed_streams for stream in proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) return elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.cancel_proposal() return accepted_streams = [] for stream in self.proposed_streams: try: remote_media = remote_sdp.media[stream.index] except IndexError: self._fail_proposal(originator='local', error='SDP media missing in answer') return else: if remote_media.port: stream.start(local_sdp, remote_sdp, stream.index) accepted_streams.append(stream) else: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): wait_count = len(accepted_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 except api.TimeoutError: self._fail_proposal(originator='local', error='media stream timed-out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='local', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='local', error='media stream failed: {.data.reason}'.format(e)) except InvitationDisconnectedError as e: self._fail_proposal(originator='local', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError as e: self._fail_proposal(originator='local', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams += accepted_streams proposed_streams = self.proposed_streams self.proposed_streams = None any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in accepted_streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=accepted_streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=accepted_streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None def remove_stream(self, stream): self.remove_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def remove_streams(self, streams): streams = list(set(streams).intersection(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() self.streams.remove(stream) media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if not (200 <= notification.data.code < 300): break elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: for stream in streams: stream.end() self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except (api.TimeoutError, MediaStreamDidFailError, SIPCoreError): for stream in streams: stream.end() self.end() else: for stream in streams: stream.end() self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=streams)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None @transition_state('sending_proposal', 'cancelling_proposal') @run_in_green_thread def cancel_proposal(self): if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if notification.data.code == 487: proposed_streams = self.proposed_streams or [] for stream in proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) elif notification.data.code == 200: self.end() elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError as e: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) proposed_streams = self.proposed_streams or [] for stream in proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=proposed_streams)) except InvitationDisconnectedError as e: for stream in self.proposed_streams or []: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) else: for stream in self.proposed_streams or []: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None self.state = 'connected' finally: self.greenlet = None if self._hold_in_progress: self._send_hold() @run_in_green_thread def hold(self): if self.on_hold or self._hold_in_progress: return self._hold_in_progress = True streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.hold() if self.state == 'connected': self._send_hold() @run_in_green_thread def unhold(self): if not self.on_hold and not self._hold_in_progress: return self._hold_in_progress = False streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.unhold() if self.state == 'connected': self._send_unhold() @run_in_green_thread def end(self): if self.state in (None, 'terminating', 'terminated'): return if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = None notification_center = NotificationCenter() if self._invitation is None: # The invitation was not yet constructed self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) return elif self._invitation.state is None: # The invitation was built but never sent streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() stream.end() self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) return invitation_state = self._invitation.state if invitation_state in ('disconnecting', 'disconnected'): return self.greenlet = api.getcurrent() self.state = 'terminating' if invitation_state == 'connected': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local')) streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() cancelling = invitation_state != 'connected' and self.direction == 'outgoing' try: self._invitation.end(timeout=1) while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': if notification.data.disconnect_reason in ('internal error', 'missing ACK'): pass elif notification.data.disconnect_reason == 'timeout': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local' if self.direction=='outgoing' else 'remote', method='INVITE', code=408, reason='Timeout')) elif cancelling: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif hasattr(notification.data, 'method'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=notification.data.method, code=200, reason=sip_status_messages[200])) elif notification.data.disconnect_reason == 'user request': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='BYE', code=notification.data.code, reason=notification.data.reason)) break except SIPCoreError as e: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e))) except InvitationDisconnectedError as e: # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state == 'connected': if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200])) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200])) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=code, reason=reason)) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) else: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request')) finally: for stream in streams: stream.end() notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' @check_state(['connected']) @check_transfer_state(None, None) @run_in_twisted_thread def transfer(self, target_uri, replaced_session=None): notification_center = NotificationCenter() notification_center.post_notification('SIPSessionTransferNewOutgoing', self, NotificationData(transfer_destination=target_uri)) try: self._invitation.transfer(target_uri, replaced_session.dialog_id if replaced_session is not None else None) except SIPCoreError as e: notification_center.post_notification('SIPSessionTransferDidFail', sender=self, data=NotificationData(code=500, reason=str(e))) @check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal']) @check_transfer_state('incoming', 'starting') def accept_transfer(self): notification_center = NotificationCenter() notification_center.post_notification('SIPSessionTransferDidStart', sender=self) @check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal']) @check_transfer_state('incoming', 'starting') def reject_transfer(self, code=603, reason=None): notification_center = NotificationCenter() notification_center.post_notification('SIPSessionTransferDidFail', self, NotificationData(code=code, reason=reason or sip_status_messages[code])) @property def dialog_id(self): return self._invitation.dialog_id if self._invitation is not None else None @property def local_identity(self): if self._invitation is not None and self._invitation.local_identity is not None: return self._invitation.local_identity else: return self._local_identity @property def peer_address(self): return self._invitation.peer_address if self._invitation is not None else None @property def remote_identity(self): if self._invitation is not None and self._invitation.remote_identity is not None: return self._invitation.remote_identity else: return self._remote_identity @property def remote_user_agent(self): return self._invitation.remote_user_agent if self._invitation is not None else None def _cancel_hold(self): notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if notification.data.code == 200: self.end() return False elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError as e: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return False return True def _send_hold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = True self.state = 'connected' hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams))) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._hold_in_progress = False else: for stream in self.streams: stream.unhold() self._send_unhold() def _send_unhold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = False self.state = 'connected' notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: for stream in self.streams: stream.hold() self._send_hold() def _fail(self, originator, code, reason, error, reason_header=None): notification_center = NotificationCenter() prev_inv_state = self._invitation.state self.state = 'terminating' if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'): notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator)) if self._invitation.state not in (None, 'disconnecting', 'disconnected'): try: if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'): if 400<=code<=699 and reason is not None: self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else []) else: self._invitation.end(extra_headers=[reason_header] if reason_header is not None else []) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': if prev_inv_state in ('connecting', 'connected'): if notification.data.disconnect_reason in ('timeout', 'missing ACK'): sip_code = 200 sip_reason = 'OK' originator = 'local' elif hasattr(notification.data, 'method'): sip_code = 200 sip_reason = 'OK' originator = 'remote' else: sip_code = notification.data.code sip_reason = notification.data.reason originator = 'local' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=originator, method='BYE', code=sip_code, reason=sip_reason)) elif self._invitation.direction == 'incoming' and prev_inv_state in ('incoming', 'early'): ack_received = notification.data.disconnect_reason != 'missing ACK' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=reason, ack_received=ack_received)) elif self._invitation.direction == 'outgoing' and prev_inv_state in ('outgoing', 'early'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=487, reason='Session Cancelled')) break except SIPCoreError: pass except api.TimeoutError: if prev_inv_state in ('connecting', 'connected'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='BYE', code=408, reason=sip_status_messages[408])) notification_center.remove_observer(self, sender=self._invitation) self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None)) self.greenlet = None def _fail_proposal(self, originator, error): notification_center = NotificationCenter() for stream in self.proposed_streams: try: notification_center.remove_observer(self, sender=stream) except KeyError: # _fail_proposal can be called from reject_proposal, which means the stream will # not have been initialized or the session registered as an observer for it. pass else: stream.deactivate() stream.end() if originator == 'remote' and self._invitation.sub_state == 'received_proposal': try: self._invitation.send_response(488 if self.proposed_streams else 500) except SIPCoreError: pass else: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=500, reason=sip_status_messages[500], ack_received='unknown')) notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:])) self.state = 'connected' self.proposed_streams = None self.greenlet = None @run_in_green_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPInvitationChangedState(self, notification): if self.state == 'terminated': return if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'): contact_header = notification.data.headers.get('Contact', None) if contact_header and 'isfocus' in contact_header[0].parameters: self.remote_focus = True if self.greenlet is not None: if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting': self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data)) else: self._channel.send(notification) else: self.greenlet = api.getcurrent() unhandled_notifications = [] try: if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self.state = 'received_proposal' try: proposed_remote_sdp = self._invitation.sdp.proposed_remote active_remote_sdp = self._invitation.sdp.active_remote if len(proposed_remote_sdp.media) < len(active_remote_sdp.media): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Streams cannot be deleted from the SDP')]) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return for stream in self.streams: if not stream.validate_update(proposed_remote_sdp, stream.index): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)]) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return added_media_indexes = set() removed_media_indexes = set() reused_media_indexes = set() for index, media_stream in enumerate(proposed_remote_sdp.media): if index >= len(active_remote_sdp.media): added_media_indexes.add(index) elif media_stream.port == 0 and active_remote_sdp.media[index].port > 0: removed_media_indexes.add(index) elif media_stream.port > 0 and active_remote_sdp.media[index].port == 0: reused_media_indexes.add(index) elif media_stream.media != active_remote_sdp.media[index].media: added_media_indexes.add(index) removed_media_indexes.add(index) if added_media_indexes | reused_media_indexes and removed_media_indexes: engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')]) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return elif added_media_indexes | reused_media_indexes: self.proposed_streams = [] for index in added_media_indexes | reused_media_indexes: media_stream = proposed_remote_sdp.media[index] if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index) except UnknownStreamError: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self._invitation.send_response(100) notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:])) else: self._invitation.send_response(488) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return else: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes] prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) for stream in removed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=proposed_remote_sdp, index=stream.index) try: self._invitation.send_response(200, sdp=local_sdp) except PJSIPError: for stream in removed_streams: self.streams.remove(stream) stream.end() if removed_streams: self.end() return else: try: self._invitation.send_response(488) except PJSIPError: self.end() return else: for stream in removed_streams: self.streams.remove(stream) stream.end() notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown')) received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) if removed_media_indexes: notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams)) except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: self.end() else: self.state = 'connected' elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request': self.state = 'received_proposal_request' try: # An empty proposal was received, generate an offer self._invitation.send_response(100) local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip) if local_sdp.connection is not None: local_sdp.connection.address = connection_address for index, stream in enumerate(self.streams): stream.reset(index) media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is not None: media.connection.address = connection_address local_sdp.media[stream.index] = media self._invitation.send_response(200, sdp=local_sdp) notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown')) received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: raise # FIXME else: self.state = 'connected' elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal': if notification.data.originator == 'local' and notification.data.code == 487: proposed_streams = self.proposed_streams self.proposed_streams = None self.state = 'connected' notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() elif notification.data.state == 'disconnected': if self.state == 'incoming': self.state = 'terminated' if notification.data.originator == 'remote': notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown')) notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: # There must have been an error involved notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: self.state = 'terminated' notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator)) for stream in self.streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if notification.data.originator == 'remote': if hasattr(notification.data, 'method'): notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method=notification.data.method, code=200, reason=sip_status_messages[200])) else: notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method='INVITE', code=notification.data.code, reason=notification.data.reason)) self.end_time = ISOTimestamp.now() notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason)) notification.center.remove_observer(self, sender=self._invitation) finally: self.greenlet = None for notification in unhandled_notifications: self.handle_notification(notification) def _NH_SIPInvitationGotSDPUpdate(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidInitialize(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_RTPStreamDidEnableEncryption(self, notification): if notification.sender.type != 'audio': return audio_stream = notification.sender if audio_stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: video_stream = next(stream for stream in self.streams or [] if stream.type=='video') except StopIteration: return if video_stream.encryption.type == 'ZRTP' and not video_stream.encryption.active: video_stream.encryption.zrtp._enable(audio_stream) def _NH_MediaStreamDidStart(self, notification): stream = notification.sender if stream.type == 'audio' and stream.encryption.type == 'ZRTP': stream.encryption.zrtp._enable() elif stream.type == 'video' and stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: audio_stream = next(stream for stream in self.streams or [] if stream.type=='audio') except StopIteration: pass else: if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active: stream.encryption.zrtp._enable(audio_stream) if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if self.greenlet is not None and self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data)) def _NH_MediaStreamDidFail(self, notification): if self.greenlet is not None: if self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) else: stream = notification.sender if self.streams == [stream]: self.end() else: try: self.remove_stream(stream) except IllegalStateError: self.end() @implementer(IObserver) class SessionManager(object, metaclass=Singleton): def __init__(self): self.sessions = [] self.state = None self._channel = coros.queue() def start(self): self.state = 'starting' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillStart', sender=self) notification_center.add_observer(self, 'SIPInvitationChangedState') notification_center.add_observer(self, 'SIPSessionNewIncoming') notification_center.add_observer(self, 'SIPSessionNewOutgoing') notification_center.add_observer(self, 'SIPSessionDidFail') notification_center.add_observer(self, 'SIPSessionDidEnd') self.state = 'started' notification_center.post_notification('SIPSessionManagerDidStart', sender=self) def stop(self): self.state = 'stopping' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillEnd', sender=self) for session in self.sessions: session.end() while self.sessions: self._channel.wait() notification_center.remove_observer(self, 'SIPInvitationChangedState') notification_center.remove_observer(self, 'SIPSessionNewIncoming') notification_center.remove_observer(self, 'SIPSessionNewOutgoing') notification_center.remove_observer(self, 'SIPSessionDidFail') notification_center.remove_observer(self, 'SIPSessionDidEnd') self.state = 'stopped' notification_center.post_notification('SIPSessionManagerDidEnd', sender=self) @run_in_twisted_thread def handle_notification(self, notification): if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming': account_manager = AccountManager() account = account_manager.find_account(notification.data.request_uri) if account is None: notification.sender.send_response(404) return notification.sender.send_response(100) session = Session(account) session.init_incoming(notification.sender, notification.data) elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'): self.sessions.append(notification.sender) elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): self.sessions.remove(notification.sender) if self.state == 'stopping': self._channel.send(notification) diff --git a/sipsimple/streams/rtp/__init__.py b/sipsimple/streams/rtp/__init__.py index e5316c1f..7cc6e73b 100644 --- a/sipsimple/streams/rtp/__init__.py +++ b/sipsimple/streams/rtp/__init__.py @@ -1,618 +1,619 @@ """ Handling of RTP media streams according to RFC3550, RFC3605, RFC3581, RFC2833 and RFC3711, RFC3489 and RFC5245. """ __all__ = ['RTPStream'] import weakref from abc import ABCMeta, abstractmethod from application.notification import IObserver, NotificationCenter, NotificationData, ObserverWeakrefProxy from application.python import Null from threading import RLock from zope.interface import implementer from sipsimple.account import BonjourAccount from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import RTPTransport, SIPCoreError, SIPURI from sipsimple.lookup import DNSLookup from sipsimple.streams import IMediaStream, InvalidStreamError, MediaStreamType, UnknownStreamError from sipsimple.threading import run_in_thread @implementer(IObserver) class ZRTPStreamOptions(object): def __init__(self, stream): self._stream = stream self.__dict__['master'] = None self.__dict__['sas'] = None self.__dict__['verified'] = False self.__dict__['peer_name'] = '' @property def sas(self): if self.master is not None: return self.master.encryption.zrtp.sas return self.__dict__['sas'] @property def verified(self): if self.master is not None: return self.master.encryption.zrtp.verified return self.__dict__['verified'] @verified.setter def verified(self, verified): if self.__dict__['verified'] == verified: return if self.sas is None: raise AttributeError('Cannot verify peer before SAS is received') if self.master is not None: self.master.encryption.zrtp.verified = verified else: rtp_transport = self._stream._rtp_transport if rtp_transport is not None: @run_in_thread('file-io') def update_verified(rtp_transport, verified): rtp_transport.set_zrtp_sas_verified(verified) notification_center = NotificationCenter() notification_center.post_notification('RTPStreamZRTPVerifiedStateChanged', sender=self._stream, data=NotificationData(verified=verified)) self.__dict__['verified'] = verified update_verified(rtp_transport, verified) @property def peer_id(self): if self.master is not None: return self.master.encryption.zrtp.peer_id rtp_transport = self._stream._rtp_transport if rtp_transport is None: return None return rtp_transport.zrtp_peer_id @property def peer_name(self): if self.master is not None: return self.master.encryption.zrtp.peer_name return self.__dict__['peer_name'] @peer_name.setter def peer_name(self, name): if self.__dict__['peer_name'] == name: return if self.master is not None: self.master.encryption.zrtp.peer_name = name else: rtp_transport = self._stream._rtp_transport if rtp_transport is not None: @run_in_thread('file-io') def update_name(rtp_transport, name): rtp_transport.zrtp_peer_name = name notification_center = NotificationCenter() notification_center.post_notification('RTPStreamZRTPPeerNameChanged', sender=self._stream, data=NotificationData(name=name)) self.__dict__['peer_name'] = name update_name(rtp_transport, name) @property def master(self): return self.__dict__['master'] @master.setter def master(self, master): old_master = self.__dict__['master'] if old_master is master: return notification_center = NotificationCenter() if old_master is not None: notification_center.remove_observer(self, sender=old_master) if master is not None: notification_center.add_observer(self, sender=master) self.__dict__['master'] = master def _enable(self, master_stream=None): rtp_transport = self._stream._rtp_transport if rtp_transport is None: return if master_stream is not None and not (master_stream.encryption.active and master_stream.encryption.type == 'ZRTP'): raise RuntimeError('Master stream must have ZRTP encryption activated') rtp_transport.set_zrtp_enabled(True, master_stream) self.master = master_stream def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPStreamZRTPReceivedSAS(self, notification): # ZRTP begins on the audio stream, so this notification will only be processed # by the other streams self.__dict__['sas'] = notification.data.sas self.__dict__['verified'] = notification.data.verified self.__dict__['peer_name'] = notification.data.peer_name notification.center.post_notification(notification.name, sender=self._stream, data=notification.data) def _NH_RTPStreamZRTPVerifiedStateChanged(self, notification): self.__dict__['verified'] = notification.data.verified notification.center.post_notification(notification.name, sender=self._stream, data=notification.data) def _NH_RTPStreamZRTPPeerNameChanged(self, notification): self.__dict__['peer_name'] = notification.data.name notification.center.post_notification(notification.name, sender=self._stream, data=notification.data) def _NH_MediaStreamDidEnd(self, notification): self.master = None @implementer(IObserver) class RTPStreamEncryption(object): def __init__(self, stream): self._stream_ref = weakref.ref(stream) # Keep a weak reference before the stream is initialized to avoid a memory cycle that would delay releasing audio resources self._stream = None # We will store the actual reference once it's initialized and we're guaranteed to get MediaStreamDidEnd and do the cleanup self._rtp_transport = None self.__dict__['type'] = None self.__dict__['zrtp'] = None notification_center = NotificationCenter() notification_center.add_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize') notification_center.add_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize') @property def active(self): rtp_transport = self._rtp_transport if rtp_transport is None: return False if self.type == 'SRTP/SDES': return rtp_transport.srtp_active elif self.type == 'ZRTP': return rtp_transport.zrtp_active return False @property def cipher(self): rtp_transport = self._rtp_transport if rtp_transport is None: return None if self.type == 'SRTP/SDES': return rtp_transport.srtp_cipher elif self.type == 'ZRTP': return rtp_transport.zrtp_cipher return None @property def type(self): return self.__dict__['type'] @property def zrtp(self): zrtp = self.__dict__['zrtp'] if zrtp is None: raise RuntimeError('ZRTP options have not been initialized') return zrtp def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidInitialize(self, notification): stream = notification.sender if stream is self._stream_ref(): self._stream = stream self._rtp_transport = stream._rtp_transport notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize') notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize') notification.center.add_observer(self, sender=self._stream) notification.center.add_observer(self, sender=self._rtp_transport) encryption = stream._srtp_encryption or '' if encryption.startswith('sdes'): self.__dict__['type'] = 'SRTP/SDES' elif encryption == 'zrtp': self.__dict__['type'] = 'ZRTP' self.__dict__['zrtp'] = ZRTPStreamOptions(stream) def _NH_MediaStreamDidNotInitialize(self, notification): if notification.sender is self._stream_ref(): notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize') notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize') self._stream_ref = None self._stream = None def _NH_MediaStreamDidStart(self, notification): if self.type == 'SRTP/SDES': stream = notification.sender if self.active: notification.center.post_notification('RTPStreamDidEnableEncryption', sender=stream) else: reason = 'Not supported by remote' notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason)) def _NH_MediaStreamDidEnd(self, notification): notification.center.remove_observer(self, sender=self._stream) notification.center.remove_observer(self, sender=self._rtp_transport) self._stream = None self._stream_ref = None self._rtp_transport = None self.__dict__['type'] = None self.__dict__['zrtp'] = None def _NH_RTPTransportZRTPSecureOn(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return notification.center.post_notification('RTPStreamDidEnableEncryption', sender=stream) def _NH_RTPTransportZRTPSecureOff(self, notification): # We should never get here because we don't allow disabling encryption -Saul pass def _NH_RTPTransportZRTPReceivedSAS(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return self.zrtp.__dict__['sas'] = sas = notification.data.sas self.zrtp.__dict__['verified'] = verified = notification.data.verified self.zrtp.__dict__['peer_name'] = peer_name = notification.sender.zrtp_peer_name notification.center.post_notification('RTPStreamZRTPReceivedSAS', sender=stream, data=NotificationData(sas=sas, verified=verified, peer_name=peer_name)) def _NH_RTPTransportZRTPLog(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return notification.center.post_notification('RTPStreamZRTPLog', sender=stream, data=notification.data) def _NH_RTPTransportZRTPNegotiationFailed(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return reason = 'Negotiation failed: %s' % notification.data.reason notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason)) def _NH_RTPTransportZRTPNotSupportedByRemote(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return reason = 'ZRTP not supported by remote' notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason)) class RTPStreamType(ABCMeta, MediaStreamType): pass @implementer(IMediaStream, IObserver) class RTPStream(object, metaclass=RTPStreamType): type = None priority = None hold_supported = True def __init__(self): self.notification_center = NotificationCenter() self.on_hold_by_local = False self.on_hold_by_remote = False self.direction = None self.state = "NULL" self.session = None self.encryption = RTPStreamEncryption(self) self._transport = None self._hold_request = None self._ice_state = "NULL" self._lock = RLock() self._rtp_transport = None self._try_ice = False self._srtp_encryption = None self._remote_rtp_address_sdp = None self._remote_rtp_port_sdp = None self._initialized = False self._done = False self._failure_reason = None @property def codec(self): return self._transport.codec if self._transport else None @property def sample_rate(self): return self._transport.sample_rate if self._transport else None @property def statistics(self): return self._transport.statistics if self._transport else None @property def local_rtp_address(self): return self._rtp_transport.local_rtp_address if self._rtp_transport else None @property def local_rtp_port(self): return self._rtp_transport.local_rtp_port if self._rtp_transport else None @property def local_rtp_candidate(self): return self._rtp_transport.local_rtp_candidate if self._rtp_transport else None @property def remote_rtp_address(self): if self._ice_state == "IN_USE": return self._rtp_transport.remote_rtp_address if self._rtp_transport else None return self._remote_rtp_address_sdp if self._rtp_transport else None @property def remote_rtp_port(self): if self._ice_state == "IN_USE": return self._rtp_transport.remote_rtp_port if self._rtp_transport else None return self._remote_rtp_port_sdp if self._rtp_transport else None @property def remote_rtp_candidate(self): return self._rtp_transport.remote_rtp_candidate if self._rtp_transport else None @property def ice_active(self): return self._ice_state == "IN_USE" @property def on_hold(self): return self.on_hold_by_local or self.on_hold_by_remote @abstractmethod def start(self, local_sdp, remote_sdp, stream_index): raise NotImplementedError @abstractmethod def update(self, local_sdp, remote_sdp, stream_index): raise NotImplementedError @abstractmethod def validate_update(self, remote_sdp, stream_index): raise NotImplementedError @abstractmethod def deactivate(self): raise NotImplementedError @abstractmethod def end(self): raise NotImplementedError @abstractmethod def reset(self, stream_index): raise NotImplementedError def hold(self): with self._lock: if self.on_hold_by_local or self._hold_request == 'hold': return if self.state == "ESTABLISHED" and self.direction != "inactive": self._pause() self._hold_request = 'hold' def unhold(self): with self._lock: if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold': return if self.state == "ESTABLISHED" and self._hold_request == 'hold': self._resume() self._hold_request = None if self._hold_request == 'hold' else 'unhold' @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): # TODO: actually validate the SDP settings = SIPSimpleSettings() remote_stream = remote_sdp.media[stream_index] if remote_stream.media != cls.type: raise UnknownStreamError - if remote_stream.transport not in ('RTP/AVP', 'RTP/SAVP'): - raise InvalidStreamError("expected RTP/AVP or RTP/SAVP transport in %s stream, got %s" % (cls.type, remote_stream.transport)) + if remote_stream.transport not in (b'RTP/AVP', b'RTP/SAVP'): + raise InvalidStreamError("expected RTP/AVP or RTP/SAVP transport in %s stream, got %s" % (cls.type, remote_stream.transport.decode())) local_encryption_policy = session.account.rtp.encryption.key_negotiation if session.account.rtp.encryption.enabled else None - if local_encryption_policy == "sdes_mandatory" and not "crypto" in remote_stream.attributes: + if local_encryption_policy == "sdes_mandatory" and not b"crypto" in remote_stream.attributes: raise InvalidStreamError("SRTP/SDES is locally mandatory but it's not remotely enabled") - if remote_stream.transport == 'RTP/SAVP' and "crypto" in remote_stream.attributes and local_encryption_policy not in ("opportunistic", "sdes_optional", "sdes_mandatory"): + if remote_stream.transport == b'RTP/SAVP' and b"crypto" in remote_stream.attributes and local_encryption_policy not in ("opportunistic", "sdes_optional", "sdes_mandatory"): raise InvalidStreamError("SRTP/SDES is remotely mandatory but it's not locally enabled") account_preferred_codecs = getattr(session.account.rtp, '%s_codec_list' % cls.type) general_codecs = getattr(settings.rtp, '%s_codec_list' % cls.type) supported_codecs = account_preferred_codecs or general_codecs if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs): raise InvalidStreamError("no compatible codecs found") stream = cls() stream._incoming_remote_sdp = remote_sdp stream._incoming_stream_index = stream_index return stream def initialize(self, session, direction): with self._lock: if self.state != "NULL": raise RuntimeError("%sStream.initialize() may only be called in the NULL state" % self.type.capitalize()) self.state = "INITIALIZING" self.session = session local_encryption_policy = session.account.rtp.encryption.key_negotiation if session.account.rtp.encryption.enabled else None if hasattr(self, "_incoming_remote_sdp") and hasattr(self, '_incoming_stream_index'): # ICE attributes could come at the session level or at the media level remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index] self._try_ice = self.session.account.nat_traversal.use_ice and ((remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates) if "zrtp-hash" in remote_stream.attributes: incoming_stream_encryption = 'zrtp' elif "crypto" in remote_stream.attributes: incoming_stream_encryption = 'sdes_mandatory' if remote_stream.transport == 'RTP/SAVP' else 'sdes_optional' else: incoming_stream_encryption = None if incoming_stream_encryption is not None and local_encryption_policy == 'opportunistic': self._srtp_encryption = incoming_stream_encryption else: self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy else: self._try_ice = self.session.account.nat_traversal.use_ice self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy if self._try_ice: if self.session.account.nat_traversal.stun_server_list: stun_servers = list((server.host, server.port) for server in self.session.account.nat_traversal.stun_server_list) self._init_rtp_transport(stun_servers) elif not isinstance(self.session.account, BonjourAccount): dns_lookup = DNSLookup() self.notification_center.add_observer(self, sender=dns_lookup) dns_lookup.lookup_service(SIPURI(self.session.account.id.domain), "stun") else: self._init_rtp_transport() def get_local_media(self, remote_sdp=None, index=0): with self._lock: if self.state not in ("INITIALIZED", "WAIT_ICE", "ESTABLISHED"): raise RuntimeError("%sStream.get_local_media() may only be called in the INITIALIZED, WAIT_ICE or ESTABLISHED states" % self.type.capitalize()) if remote_sdp is None: # offer old_direction = self._transport.direction if old_direction is None: new_direction = "sendrecv" - elif "send" in old_direction: + elif b"send" in old_direction: new_direction = ("sendonly" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "sendrecv") else: new_direction = ("inactive" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "recvonly") else: new_direction = None - print('RTP stream init will get_local_media %s %s' % (index, new_direction)) + new_direction = new_direction.encode() if new_direction else None return self._transport.get_local_media(remote_sdp, index, new_direction) # Notifications def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidFail(self, notification): self.notification_center.remove_observer(self, sender=notification.sender) with self._lock: if self.state == "ENDED": return self._init_rtp_transport() def _NH_DNSLookupDidSucceed(self, notification): self.notification_center.remove_observer(self, sender=notification.sender) with self._lock: if self.state == "ENDED": return self._init_rtp_transport(notification.data.result) def _NH_RTPTransportDidInitialize(self, notification): rtp_transport = notification.sender with self._lock: if self.state == "ENDED": self.notification_center.remove_observer(self, sender=rtp_transport) return del self._rtp_args del self._stun_servers remote_sdp = self.__dict__.pop('_incoming_remote_sdp', None) stream_index = self.__dict__.pop('_incoming_stream_index', None) try: if remote_sdp is not None: transport = self._create_transport(rtp_transport, remote_sdp=remote_sdp, stream_index=stream_index) self._save_remote_sdp_rtp_info(remote_sdp, stream_index) else: transport = self._create_transport(rtp_transport) except SIPCoreError as e: self.state = "ENDED" self.notification_center.remove_observer(self, sender=rtp_transport) self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=e.args[0])) return self._rtp_transport = rtp_transport self._transport = transport self.notification_center.add_observer(self, sender=transport) self._initialized = True self.state = "INITIALIZED" self.notification_center.post_notification('MediaStreamDidInitialize', sender=self) def _NH_RTPTransportDidFail(self, notification): self.notification_center.remove_observer(self, sender=notification.sender) with self._lock: if self.state == "ENDED": return self._try_next_rtp_transport(notification.data.reason) def _NH_RTPTransportICENegotiationStateDidChange(self, notification): with self._lock: if self._ice_state != "NULL" or self.state not in ("INITIALIZING", "INITIALIZED", "WAIT_ICE"): return self.notification_center.post_notification('RTPStreamICENegotiationStateDidChange', sender=self, data=notification.data) def _NH_RTPTransportICENegotiationDidSucceed(self, notification): with self._lock: if self.state != "WAIT_ICE": return self._ice_state = "IN_USE" self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidSucceed', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) def _NH_RTPTransportICENegotiationDidFail(self, notification): with self._lock: if self.state != "WAIT_ICE": return self._ice_state = "FAILED" self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidFail', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) # Private methods def _init_rtp_transport(self, stun_servers=None): self._rtp_args = dict() self._rtp_args["encryption"] = self._srtp_encryption self._rtp_args["use_ice"] = self._try_ice self._stun_servers = [(None, None)] if stun_servers: self._stun_servers.extend(reversed(stun_servers)) self._try_next_rtp_transport() def _try_next_rtp_transport(self, failure_reason=None): if self._stun_servers: stun_address, stun_port = self._stun_servers.pop() try: + stun_address = stun_address.encode() if stun_address else None rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args) except SIPCoreError as e: self._try_next_rtp_transport(e.args[0]) else: self.notification_center.add_observer(self, sender=rtp_transport) try: rtp_transport.set_INIT() except SIPCoreError as e: self.notification_center.remove_observer(self, sender=rtp_transport) self._try_next_rtp_transport(e.args[0]) else: self.state = "ENDED" self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=failure_reason)) def _save_remote_sdp_rtp_info(self, remote_sdp, index): connection = remote_sdp.media[index].connection or remote_sdp.connection self._remote_rtp_address_sdp = connection.address self._remote_rtp_port_sdp = remote_sdp.media[index].port @abstractmethod def _create_transport(self, rtp_transport, remote_sdp=None, stream_index=None): raise NotImplementedError @abstractmethod def _check_hold(self, direction, is_initial): raise NotImplementedError @abstractmethod def _pause(self): raise NotImplementedError @abstractmethod def _resume(self): raise NotImplementedError from sipsimple.streams.rtp import audio, video diff --git a/sipsimple/streams/rtp/audio.py b/sipsimple/streams/rtp/audio.py index 98b4c119..1096c905 100644 --- a/sipsimple/streams/rtp/audio.py +++ b/sipsimple/streams/rtp/audio.py @@ -1,239 +1,239 @@ __all__ = ['AudioStream'] from application.notification import NotificationCenter, NotificationData from zope.interface import implementer from sipsimple.audio import AudioBridge, AudioDevice, IAudioPort, WaveRecorder from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import AudioTransport, PJSIPError, SIPCoreError from sipsimple.streams.rtp import RTPStream @implementer(IAudioPort) class AudioStream(RTPStream): type = 'audio' priority = 1 def __init__(self): super(AudioStream, self).__init__() from sipsimple.application import SIPApplication self.mixer = SIPApplication.voice_audio_mixer self.bridge = AudioBridge(self.mixer) self.device = AudioDevice(self.mixer) self._audio_rec = None self.bridge.add(self.device) @property def muted(self): return self.__dict__.get('muted', False) @muted.setter def muted(self, value): if not isinstance(value, bool): raise ValueError("illegal value for muted property: %r" % (value,)) if value == self.muted: return old_producer_slot = self.producer_slot self.__dict__['muted'] = value notification_center = NotificationCenter() data = NotificationData(consumer_slot_changed=False, producer_slot_changed=True, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot) notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=data) @property def consumer_slot(self): return self._transport.slot if self._transport else None @property def producer_slot(self): return self._transport.slot if self._transport and not self.muted else None @property def recorder(self): return self._audio_rec def start(self, local_sdp, remote_sdp, stream_index): with self._lock: if self.state != "INITIALIZED": raise RuntimeError("AudioStream.start() may only be called in the INITIALIZED state") settings = SIPSimpleSettings() self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout) self._save_remote_sdp_rtp_info(remote_sdp, stream_index) - self._check_hold(self._transport.direction, True) + self._check_hold(self._transport.direction.decode(), True) if self._try_ice and self._ice_state == "NULL": self.state = 'WAIT_ICE' else: self.state = 'ESTABLISHED' self.notification_center.post_notification('MediaStreamDidStart', sender=self) def validate_update(self, remote_sdp, stream_index): with self._lock: # TODO: implement return True def update(self, local_sdp, remote_sdp, stream_index): with self._lock: connection = remote_sdp.media[stream_index].connection or remote_sdp.connection if not self._rtp_transport.ice_active and (connection.address != self._remote_rtp_address_sdp or self._remote_rtp_port_sdp != remote_sdp.media[stream_index].port): settings = SIPSimpleSettings() if self._audio_rec is not None: self.bridge.remove(self._audio_rec) old_consumer_slot = self.consumer_slot old_producer_slot = self.producer_slot self.notification_center.remove_observer(self, sender=self._transport) self._transport.stop() available_codecs = self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list codecs = list(c.encode() for c in available_codecs) try: self._transport = AudioTransport(self.mixer, self._rtp_transport, remote_sdp, stream_index, codecs=codecs) except SIPCoreError as e: self.state = "ENDED" self._failure_reason = e.args[0] self.notification_center.post_notification('MediaStreamDidFail', sender=self, data=NotificationData(context='update', reason=self._failure_reason)) return self.notification_center.add_observer(self, sender=self._transport) self._transport.start(local_sdp, remote_sdp, stream_index, timeout=settings.rtp.timeout) self.notification_center.post_notification('AudioPortDidChangeSlots', sender=self, data=NotificationData(consumer_slot_changed=True, producer_slot_changed=True, old_consumer_slot=old_consumer_slot, new_consumer_slot=self.consumer_slot, old_producer_slot=old_producer_slot, new_producer_slot=self.producer_slot)) - if connection.address == '0.0.0.0' and remote_sdp.media[stream_index].direction == 'sendrecv': - self._transport.update_direction('recvonly') - self._check_hold(self._transport.direction, False) + if connection.address == b'0.0.0.0' and remote_sdp.media[stream_index].direction == b'sendrecv': + self._transport.update_direction(b'recvonly') + self._check_hold(self._transport.direction.decode(), False) self.notification_center.post_notification('RTPStreamDidChangeRTPParameters', sender=self) else: new_direction = local_sdp.media[stream_index].direction self._transport.update_direction(new_direction) - self._check_hold(new_direction, False) + self._check_hold(new_direction.decode(), False) self._save_remote_sdp_rtp_info(remote_sdp, stream_index) self._transport.update_sdp(local_sdp, remote_sdp, stream_index) self._hold_request = None def deactivate(self): with self._lock: self.bridge.stop() def end(self): with self._lock: if self.state == "ENDED" or self._done: return self._done = True if not self._initialized: self.state = "ENDED" self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason='Interrupted')) return self.notification_center.post_notification('MediaStreamWillEnd', sender=self) if self._transport is not None: if self._audio_rec is not None: self._stop_recording() self.notification_center.remove_observer(self, sender=self._transport) self.notification_center.remove_observer(self, sender=self._rtp_transport) self._transport.stop() self._transport = None self._rtp_transport = None self.state = "ENDED" self.notification_center.post_notification('MediaStreamDidEnd', sender=self, data=NotificationData(error=self._failure_reason)) self.session = None def reset(self, stream_index): with self._lock: if self.direction == "inactive" and not self.on_hold_by_local: - new_direction = "sendrecv" + new_direction = b"sendrecv" self._transport.update_direction(new_direction) - self._check_hold(new_direction, False) + self._check_hold(new_direction.decode(), False) # TODO: do a full reset, re-creating the AudioTransport, so that a new offer # would contain all codecs and ICE would be renegotiated -Saul def send_dtmf(self, digit): with self._lock: if self.state != "ESTABLISHED": raise RuntimeError("AudioStream.send_dtmf() cannot be used in %s state" % self.state) try: self._transport.send_dtmf(digit) except PJSIPError as e: if not e.args[0].endswith("(PJ_ETOOMANY)"): raise def start_recording(self, filename): with self._lock: if self.state == "ENDED": raise RuntimeError("AudioStream.start_recording() may not be called in the ENDED state") if self._audio_rec is not None: raise RuntimeError("Already recording audio to a file") self._audio_rec = WaveRecorder(self.mixer, filename) if self.state == "ESTABLISHED": self._check_recording() def stop_recording(self): with self._lock: if self._audio_rec is None: raise RuntimeError("Not recording any audio") self._stop_recording() def _NH_RTPAudioStreamGotDTMF(self, notification): notification.center.post_notification('AudioStreamGotDTMF', sender=self, data=NotificationData(digit=notification.data.digit)) def _NH_RTPAudioTransportDidTimeout(self, notification): notification.center.post_notification('RTPStreamDidTimeout', sender=self) # Private methods # def _create_transport(self, rtp_transport, remote_sdp=None, stream_index=None): settings = SIPSimpleSettings() available_codecs = self.session.account.rtp.audio_codec_list or settings.rtp.audio_codec_list codecs = list(c.encode() for c in available_codecs) return AudioTransport(self.mixer, rtp_transport, remote_sdp=remote_sdp, sdp_index=stream_index or 0, codecs=codecs) def _check_hold(self, direction, is_initial): was_on_hold_by_local = self.on_hold_by_local was_on_hold_by_remote = self.on_hold_by_remote was_inactive = self.direction == "inactive" self.direction = direction inactive = self.direction == "inactive" self.on_hold_by_local = was_on_hold_by_local if inactive else direction == "sendonly" self.on_hold_by_remote = "send" not in direction if (is_initial or was_on_hold_by_local or was_inactive) and not inactive and not self.on_hold_by_local and self._hold_request != 'hold': self._resume() if not was_on_hold_by_local and self.on_hold_by_local: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator="local", on_hold=True)) if was_on_hold_by_local and not self.on_hold_by_local: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator="local", on_hold=False)) if not was_on_hold_by_remote and self.on_hold_by_remote: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator="remote", on_hold=True)) if was_on_hold_by_remote and not self.on_hold_by_remote: self.notification_center.post_notification('RTPStreamDidChangeHoldState', sender=self, data=NotificationData(originator="remote", on_hold=False)) if self._audio_rec is not None: self._check_recording() def _check_recording(self): if not self._audio_rec.is_active: self.notification_center.post_notification('AudioStreamWillStartRecording', sender=self, data=NotificationData(filename=self._audio_rec.filename)) try: self._audio_rec.start() except SIPCoreError as e: self._audio_rec = None self.notification_center.post_notification('AudioStreamDidStopRecording', sender=self, data=NotificationData(filename=self._audio_rec.filename, reason=e.args[0])) return self.notification_center.post_notification('AudioStreamDidStartRecording', sender=self, data=NotificationData(filename=self._audio_rec.filename)) if not self.on_hold: self.bridge.add(self._audio_rec) elif self._audio_rec in self.bridge: self.bridge.remove(self._audio_rec) def _stop_recording(self): self.notification_center.post_notification('AudioStreamWillStopRecording', sender=self, data=NotificationData(filename=self._audio_rec.filename)) try: if self._audio_rec.is_active: self._audio_rec.stop() finally: self.notification_center.post_notification('AudioStreamDidStopRecording', sender=self, data=NotificationData(filename=self._audio_rec.filename)) self._audio_rec = None def _pause(self): self.bridge.remove(self) def _resume(self): self.bridge.add(self)